[ASTERIXDB-2796][RT] Fix IndexOutOfBoundsException in hash group-by

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
IndexOutOfBoundsException happens in hash group-by when WITH
clause exists. The variables from the WITH clause get propagated
through the decor variables of the group-by. The number of output
keys are the sum of both the group-by keys and decor variables, but
the hash functions & comparators used are only for the group-by keys.

- pass only the aggregated keys to the comparators and hash functions.

Change-Id: I19dc652872a1f030b6afa509711e7e0700e86856
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8743
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.01.ddl.sqlpp
new file mode 100644
index 0000000..7d2c396
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.01.ddl.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description  : Testing that runtime of hash-group by handles decor variables
+ * Expected     : SUCCESS
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE t1 AS {id: int};
+CREATE DATASET ds1(t1) PRIMARY KEY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.02.update.sqlpp
new file mode 100644
index 0000000..e79903f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.02.update.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+INSERT INTO ds1 {"id": 1, "dept_id": 1, "salary": 10, "area": "N"};
+INSERT INTO ds1 {"id": 2, "dept_id": 1, "salary": 10, "area": "N"};
+INSERT INTO ds1 {"id": 3, "dept_id": 1, "salary": 10, "area": "A"};
+INSERT INTO ds1 {"id": 4, "dept_id": 2, "salary": 20, "area": "N"};
+INSERT INTO ds1 {"id": 5, "dept_id": 2, "salary": 20, "area": "N"};
+INSERT INTO ds1 {"id": 6, "dept_id": 2, "salary": 20, "area": "A"};
+INSERT INTO ds1 {"id": 7, "dept_id": 3, "salary": 30, "area": "N"};
+INSERT INTO ds1 {"id": 8, "dept_id": 3, "salary": 30, "area": "N"};
+INSERT INTO ds1 {"id": 9, "dept_id": 3, "salary": 30, "area": "A"};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.03.query.sqlpp
new file mode 100644
index 0000000..39cd32f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.03.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+WITH a AS "N"
+FROM ds1 v
+WHERE v.area = a
+/*+ hash */
+GROUP BY v.dept_id
+SELECT v.dept_id, SUM(v.salary) AS total, a
+ORDER BY v.dept_id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/hash-group-by-decor/hash-group-by-decor.03.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/hash-group-by-decor/hash-group-by-decor.03.adm
new file mode 100644
index 0000000..c79ed9a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/hash-group-by-decor/hash-group-by-decor.03.adm
@@ -0,0 +1,3 @@
+{ "a": "N", "dept_id": 1, "total": 20 }
+{ "a": "N", "dept_id": 2, "total": 40 }
+{ "a": "N", "dept_id": 3, "total": 60 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 6d56e70..adbdf69 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -5808,6 +5808,11 @@
         <output-dir compare="Text">group-by-all-ASTERIXDB-2611</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="group-by">
+      <compilation-unit name="hash-group-by-decor">
+        <output-dir compare="Text">hash-group-by-decor</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="index-join">
     <test-case FilePath="index-join">
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 945e36c..7515258 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -123,8 +123,8 @@
         GroupByOperator gby = (GroupByOperator) op;
         checkGroupAll(gby);
         List<LogicalVariable> gbyCols = getGroupByColumns();
-        int keys[] = JobGenHelper.variablesToFieldIndexes(gbyCols, inputSchemas[0]);
-        int fdColumns[] = getFdColumns(gby, inputSchemas[0]);
+        int[] gbyColumns = JobGenHelper.variablesToFieldIndexes(gbyCols, inputSchemas[0]);
+        int[] fdColumns = getFdColumns(gby, inputSchemas[0]);
 
         if (gby.getNestedPlans().size() != 1) {
             throw new AlgebricksException(
@@ -161,14 +161,6 @@
                     .add(partialAggregationTypeComputer.getType(aggFun, aggOpInputEnv, context.getMetadataProvider()));
         }
 
-        int[] keyAndDecFields = new int[keys.length + fdColumns.length];
-        for (i = 0; i < keys.length; ++i) {
-            keyAndDecFields[i] = keys[i];
-        }
-        for (i = 0; i < fdColumns.length; i++) {
-            keyAndDecFields[keys.length + i] = fdColumns[i];
-        }
-
         List<LogicalVariable> keyAndDecVariables = new ArrayList<LogicalVariable>();
         for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gby.getGroupByList()) {
             keyAndDecVariables.add(p.first);
@@ -225,15 +217,14 @@
         // Calculates the hash table size (# of unique hash values) based on the budget and a tuple size.
         int frameSize = context.getFrameSize();
         long memoryBudgetInBytes = localMemoryRequirements.getMemoryBudgetInBytes(frameSize);
-        int numFds = gby.getDecorList().size();
-        int groupByColumnsCount = gby.getGroupByList().size() + numFds;
+        int allColumns = gbyColumns.length + fdColumns.length;
         int hashTableSize = ExternalGroupOperatorDescriptor.calculateGroupByTableCardinality(memoryBudgetInBytes,
-                groupByColumnsCount, frameSize);
+                allColumns, frameSize);
 
         int framesLimit = localMemoryRequirements.getMemoryBudgetInFrames();
         long inputSize = framesLimit * (long) frameSize;
         ExternalGroupOperatorDescriptor gbyOpDesc = new ExternalGroupOperatorDescriptor(spec, hashTableSize, inputSize,
-                keyAndDecFields, framesLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory,
+                gbyColumns, fdColumns, framesLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory,
                 mergeFactory, recordDescriptor, recordDescriptor, new HashSpillableTableFactory(hashFunctionFactories));
         gbyOpDesc.setSourceLocation(gby.getSourceLocation());
         contributeOpDesc(builder, gby, gbyOpDesc);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
index 4287b26..0261106 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
@@ -146,7 +146,7 @@
         try {
             context.getPlanStabilityVerifier().recordPlanSignature(opRef);
         } catch (AlgebricksException e) {
-            throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+            throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE,
                     String.format("Illegal state before rule %s. %s", rule.getClass().getName(), e.getMessage()));
         }
     }
@@ -157,7 +157,7 @@
             try {
                 context.getPlanStructureVerifier().verifyPlanStructure(opRef);
             } catch (AlgebricksException e) {
-                throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+                throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE,
                         String.format("Fired rule %s produced illegal %s", rule.getClass().getName(), e.getMessage()));
             }
         } else {
@@ -168,7 +168,7 @@
                     printRuleApplication(rule, "not fired, but failed sanity check: " + e.getMessage(), beforePlan,
                             getPlanString(opRef));
                 }
-                throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+                throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE,
                         String.format("Non-fired rule %s unexpectedly %s", rule.getClass().getName(), e.getMessage()));
             }
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
index ab06488..f93d1f9 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -128,7 +128,7 @@
                     context.getPlanStructureVerifier().verifyPlanStructure(opRef);
                 }
             } catch (AlgebricksException e) {
-                throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+                throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE,
                         String.format("Initial plan contains illegal %s", e.getMessage()));
             }
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushMapOperatorThroughUnionRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushMapOperatorThroughUnionRule.java
index 3c468be..32dca00 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushMapOperatorThroughUnionRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushMapOperatorThroughUnionRule.java
@@ -126,12 +126,12 @@
         for (LogicalVariable opProducedVar : opProducedVars) {
             LogicalVariable leftBranchProducedVar = leftBranchProducedVarMap.get(opProducedVar);
             if (leftBranchProducedVar == null) {
-                throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation(),
+                throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE, op.getSourceLocation(),
                         "Cannot find " + opProducedVar);
             }
             LogicalVariable rightBranchProducedVar = rightBranchProducedVarMap.get(opProducedVar);
             if (rightBranchProducedVar == null) {
-                throw AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE, op.getSourceLocation(),
+                throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE, op.getSourceLocation(),
                         "Cannot find " + opProducedVar);
             }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index a93da48..db60f1a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -159,6 +159,7 @@
     public static final int INSUFFICIENT_MEMORY = 123;
     public static final int PARSING_ERROR = 124;
     public static final int INVALID_INVERTED_LIST_TYPE_TRAITS = 125;
+    public static final int ILLEGAL_STATE = 126;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
@@ -170,7 +171,6 @@
     public static final int INAPPLICABLE_HINT = 10006;
     public static final int CROSS_PRODUCT_JOIN = 10007;
     public static final int GROUP_ALL_DECOR = 10008;
-    public static final int COMPILATION_ILLEGAL_STATE = 10009;
 
     private static class Holder {
         private static final Map<Integer, String> errorMessageMap;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 01895a6..c01f39a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -143,6 +143,7 @@
 123 = Insufficient memory is provided for the join operators, please increase the join memory budget.
 124 = Parsing error at %1$s line %2$s field %3$s: %4$s
 125 = Invalid inverted list type traits: %1$s
+126 = Illegal state. %1$s
 
 10000 = The given rule collection %1$s is not an instance of the List class.
 10001 = Cannot compose partition constraint %1$s with %2$s
@@ -152,5 +153,4 @@
 10005 = Operator is not implemented: %1$s
 10006 = Could not apply %1$s hint: %2$s
 10007 = Encountered a cross product join
-10008 = Inappropriate use of group by all with decor variables
-10009 = Illegal state. %1$s
\ No newline at end of file
+10008 = Inappropriate use of group by all with decor variables
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index 3232527..4f0c304 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -66,7 +66,7 @@
 
     @Override
     public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx, int suggestTableSize,
-            long inputDataBytesSize, final int[] keyFields, final IBinaryComparator[] comparators,
+            long inputDataBytesSize, final int[] gbyFields, final int[] fdFields, final IBinaryComparator[] comparators,
             final INormalizedKeyComputer firstKeyNormalizerFactory, IAggregatorDescriptorFactory aggregateFactory,
             RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, final int framesLimit,
             final int seed) throws HyracksDataException {
@@ -79,26 +79,47 @@
             throw new HyracksDataException("The given frame limit is too small to partition the data.");
         }
 
-        final int[] intermediateResultKeys = new int[keyFields.length];
-        for (int i = 0; i < keyFields.length; i++) {
-            intermediateResultKeys[i] = i;
+        final int[] intermediateResultGbyFields = new int[gbyFields.length];
+        for (int i = 0; i < gbyFields.length; i++) {
+            intermediateResultGbyFields[i] = i;
+        }
+
+        final int[] allFields;
+        final int[] intermediateResultAllFields;
+        if (fdFields == null) {
+            // no need to combine gby and fd
+            allFields = gbyFields;
+            intermediateResultAllFields = intermediateResultGbyFields;
+        } else {
+            allFields = new int[gbyFields.length + fdFields.length];
+            intermediateResultAllFields = new int[gbyFields.length + fdFields.length];
+            int k = 0;
+            int position = 0;
+            for (int i = 0; i < gbyFields.length; i++, position++, k++) {
+                allFields[k] = gbyFields[i];
+                intermediateResultAllFields[k] = position;
+            }
+            for (int i = 0; i < fdFields.length; i++, position++, k++) {
+                allFields[k] = fdFields[i];
+                intermediateResultAllFields[k] = position;
+            }
         }
 
         final FrameTuplePairComparator ftpcInputCompareToAggregate =
-                new FrameTuplePairComparator(keyFields, intermediateResultKeys, comparators);
+                new FrameTuplePairComparator(gbyFields, intermediateResultGbyFields, comparators);
 
         final ITuplePartitionComputer tpc =
-                new FieldHashPartitionComputerFamily(keyFields, hashFunctionFamilies).createPartitioner(seed);
+                new FieldHashPartitionComputerFamily(gbyFields, hashFunctionFamilies).createPartitioner(seed);
 
         // For calculating hash value for the already aggregated tuples (not incoming tuples)
         // This computer is required to calculate the hash value of a aggregated tuple
         // while doing the garbage collection work on Hash Table.
         final ITuplePartitionComputer tpcIntermediate =
-                new FieldHashPartitionComputerFamily(intermediateResultKeys, hashFunctionFamilies)
+                new FieldHashPartitionComputerFamily(intermediateResultGbyFields, hashFunctionFamilies)
                         .createPartitioner(seed);
 
         final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
-                outRecordDescriptor, keyFields, intermediateResultKeys, null, -1);
+                outRecordDescriptor, allFields, intermediateResultAllFields, null, -1);
 
         final AggregateState aggregateState = aggregator.createAggregateStates();
 
@@ -225,8 +246,8 @@
 
             private void initStateTupleBuilder(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
                 stateTupleBuilder.reset();
-                for (int k = 0; k < keyFields.length; k++) {
-                    stateTupleBuilder.addField(accessor, tIndex, keyFields[k]);
+                for (int k = 0; k < allFields.length; k++) {
+                    stateTupleBuilder.addField(accessor, tIndex, allFields[k]);
                 }
                 aggregator.init(stateTupleBuilder, accessor, tIndex, aggregateState);
             }
@@ -246,10 +267,10 @@
                         hashTableForTuplePointer.getTuplePointer(hashEntryPid, tid, pointer);
                         bufferAccessor.reset(pointer);
                         outputTupleBuilder.reset();
-                        for (int k = 0; k < intermediateResultKeys.length; k++) {
+                        for (int k = 0; k < intermediateResultAllFields.length; k++) {
                             outputTupleBuilder.addField(bufferAccessor.getBuffer().array(),
-                                    bufferAccessor.getAbsFieldStartOffset(intermediateResultKeys[k]),
-                                    bufferAccessor.getFieldLength(intermediateResultKeys[k]));
+                                    bufferAccessor.getAbsFieldStartOffset(intermediateResultAllFields[k]),
+                                    bufferAccessor.getFieldLength(intermediateResultAllFields[k]));
                         }
 
                         boolean hasOutput = false;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
index 629f211..c60b29a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
@@ -27,9 +27,11 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface ISpillableTableFactory extends Serializable {
+
     ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int inputSizeInTuple, long dataBytesSize,
-            int[] keyFields, IBinaryComparator[] comparatorFactories, INormalizedKeyComputer firstKeyNormalizerFactory,
-            IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int framesLimit, int seed) throws HyracksDataException;
+            int[] gbyFields, int[] fdFields, IBinaryComparator[] comparatorFactories,
+            INormalizedKeyComputer firstKeyNormalizerFactory, IAggregatorDescriptorFactory aggregateFactory,
+            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int framesLimit, int seed)
+            throws HyracksDataException;
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
index 20d223e..02cee04 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
@@ -47,7 +48,8 @@
     private static final Logger LOGGER = LogManager.getLogger();
     private final IHyracksTaskContext ctx;
     private final Object stateId;
-    private final int[] keyFields;
+    private final int[] gbyFields;
+    private final int[] fdFields; // nullable
     private final IBinaryComparator[] comparators;
     private final INormalizedKeyComputer firstNormalizerComputer;
     private final IAggregatorDescriptorFactory aggregatorFactory;
@@ -63,15 +65,19 @@
     private boolean isFailed = false;
 
     public ExternalGroupBuildOperatorNodePushable(IHyracksTaskContext ctx, Object stateId, int tableSize, long fileSize,
-            int[] keyFields, int framesLimit, IBinaryComparatorFactory[] comparatorFactories,
+            int[] gbyFields, int[] fdFields, int framesLimit, IBinaryComparatorFactory[] comparatorFactories,
             INormalizedKeyComputerFactory firstNormalizerFactory, IAggregatorDescriptorFactory aggregatorFactory,
             RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor,
-            ISpillableTableFactory spillableTableFactory) {
+            ISpillableTableFactory spillableTableFactory) throws HyracksDataException {
+        if (comparatorFactories.length != gbyFields.length) {
+            throw HyracksDataException.create(ErrorCode.ILLEGAL_STATE, "mismatch in group by fields and comparators");
+        }
         this.ctx = ctx;
         this.stateId = stateId;
         this.framesLimit = framesLimit;
         this.aggregatorFactory = aggregatorFactory;
-        this.keyFields = keyFields;
+        this.gbyFields = gbyFields;
+        this.fdFields = fdFields;
         this.comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -88,7 +94,7 @@
     @Override
     public void open() throws HyracksDataException {
         state = new ExternalGroupState(ctx.getJobletContext().getJobId(), stateId);
-        ISpillableTable table = spillableTableFactory.buildSpillableTable(ctx, tableSize, fileSize, keyFields,
+        ISpillableTable table = spillableTableFactory.buildSpillableTable(ctx, tableSize, fileSize, gbyFields, fdFields,
                 comparators, firstNormalizerComputer, aggregatorFactory, inRecordDescriptor, outRecordDescriptor,
                 framesLimit, INIT_SEED);
         RunFileWriter[] runFileWriters = new RunFileWriter[table.getNumPartitions()];
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
index 6dea186..7e23ac3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
@@ -44,11 +44,11 @@
 
     private static final int MERGE_ACTIVITY_ID = 1;
 
-    private static final long serialVersionUID = 1L;
-    private final int[] keyFields;
+    private static final long serialVersionUID = 2L;
+    private final int[] gbyFields;
+    private final int[] fdFields; // nullable
     private final IBinaryComparatorFactory[] comparatorFactories;
     private final INormalizedKeyComputerFactory firstNormalizerFactory;
-
     private final IAggregatorDescriptorFactory partialAggregatorFactory;
     private final IAggregatorDescriptorFactory intermediateAggregateFactory;
 
@@ -60,7 +60,7 @@
     private final long fileSize;
 
     public ExternalGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputSizeInTuple, long inputFileSize,
-            int[] keyFields, int framesLimit, IBinaryComparatorFactory[] comparatorFactories,
+            int[] gbyFields, int[] fdFields, int framesLimit, IBinaryComparatorFactory[] comparatorFactories,
             INormalizedKeyComputerFactory firstNormalizerFactory, IAggregatorDescriptorFactory partialAggregatorFactory,
             IAggregatorDescriptorFactory intermediateAggregateFactory, RecordDescriptor partialAggRecordDesc,
             RecordDescriptor outRecordDesc, ISpillableTableFactory spillableTableFactory) {
@@ -76,7 +76,8 @@
         }
         this.partialAggregatorFactory = partialAggregatorFactory;
         this.intermediateAggregateFactory = intermediateAggregateFactory;
-        this.keyFields = keyFields;
+        this.gbyFields = gbyFields;
+        this.fdFields = fdFields;
         this.comparatorFactories = comparatorFactories;
         this.firstNormalizerFactory = firstNormalizerFactory;
         this.spillableTableFactory = spillableTableFactory;
@@ -93,13 +94,6 @@
         this.fileSize = inputFileSize;
     }
 
-    /*
-     * (non-Javadoc)
-     *
-     * @see
-     * org.apache.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
-     * (org.apache.hyracks.api.dataflow.IActivityGraphBuilder)
-     */
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
         AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID));
@@ -126,7 +120,7 @@
                 final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
                 throws HyracksDataException {
             return new ExternalGroupBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition), tableSize,
-                    fileSize, keyFields, framesLimit, comparatorFactories, firstNormalizerFactory,
+                    fileSize, gbyFields, fdFields, framesLimit, comparatorFactories, firstNormalizerFactory,
                     partialAggregatorFactory, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
                     outRecDescs[0], spillableTableFactory);
         }
@@ -145,8 +139,8 @@
                 throws HyracksDataException {
             return new ExternalGroupWriteOperatorNodePushable(ctx,
                     new TaskId(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID), partition),
-                    spillableTableFactory, partialRecDesc, outRecDesc, framesLimit, keyFields, firstNormalizerFactory,
-                    comparatorFactories, intermediateAggregateFactory);
+                    spillableTableFactory, partialRecDesc, outRecDesc, framesLimit, gbyFields, fdFields,
+                    firstNormalizerFactory, comparatorFactories, intermediateAggregateFactory);
 
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
index e8a1b76..1a6f4ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.common.io.RunFileReader;
@@ -42,6 +43,7 @@
 
 public class ExternalGroupWriteOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable
         implements IRunFileWriterGenerator {
+
     private static final Logger LOGGER = LogManager.getLogger();
     private final IHyracksTaskContext ctx;
     private final Object stateId;
@@ -49,7 +51,8 @@
     private final RecordDescriptor partialAggRecordDesc;
     private final RecordDescriptor outRecordDesc;
     private final IAggregatorDescriptorFactory mergeAggregatorFactory;
-    private final int[] mergeGroupFields;
+    private final int[] gbyFields;
+    private final int[] fdFields; // nullable
     private final IBinaryComparator[] groupByComparators;
     private final int frameLimit;
     private final INormalizedKeyComputer nmkComputer;
@@ -57,29 +60,38 @@
 
     public ExternalGroupWriteOperatorNodePushable(IHyracksTaskContext ctx, Object stateId,
             ISpillableTableFactory spillableTableFactory, RecordDescriptor partialAggRecordDesc,
-            RecordDescriptor outRecordDesc, int framesLimit, int[] groupFields,
+            RecordDescriptor outRecordDesc, int framesLimit, int[] gbyFields, int[] fdFields,
             INormalizedKeyComputerFactory nmkFactory, IBinaryComparatorFactory[] comparatorFactories,
-            IAggregatorDescriptorFactory aggregatorFactory) {
+            IAggregatorDescriptorFactory aggregatorFactory) throws HyracksDataException {
+        if (comparatorFactories.length != gbyFields.length) {
+            throw HyracksDataException.create(ErrorCode.ILLEGAL_STATE, "mismatch in group by fields and comparators");
+        }
         this.ctx = ctx;
         this.stateId = stateId;
         this.spillableTableFactory = spillableTableFactory;
         this.frameLimit = framesLimit;
         this.nmkComputer = nmkFactory == null ? null : nmkFactory.createNormalizedKeyComputer();
-
         this.partialAggRecordDesc = partialAggRecordDesc;
         this.outRecordDesc = outRecordDesc;
-
         this.mergeAggregatorFactory = aggregatorFactory;
 
         //create merge group fields
-        int numGroupFields = groupFields.length;
-        mergeGroupFields = new int[numGroupFields];
-        for (int i = 0; i < numGroupFields; i++) {
-            mergeGroupFields[i] = i;
+        this.gbyFields = new int[gbyFields.length];
+        int position = 0;
+        for (int i = 0; i < this.gbyFields.length; i++, position++) {
+            this.gbyFields[i] = position;
+        }
+        if (fdFields != null) {
+            this.fdFields = new int[fdFields.length];
+            for (int i = 0; i < this.fdFields.length; i++, position++) {
+                this.fdFields[i] = position;
+            }
+        } else {
+            this.fdFields = null;
         }
 
         //setup comparators for grouping
-        groupByComparators = new IBinaryComparator[Math.min(mergeGroupFields.length, comparatorFactories.length)];
+        groupByComparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < groupByComparators.length; i++) {
             groupByComparators[i] = comparatorFactories[i].createBinaryComparator();
         }
@@ -122,12 +134,12 @@
             if (runs[i] != null) {
                 // Calculates the hash table size (# of unique hash values) based on the budget and a tuple size.
                 int memoryBudgetInBytes = ctx.getInitialFrameSize() * frameLimit;
-                int groupByColumnsCount = mergeGroupFields.length;
-                int hashTableCardinality = ExternalGroupOperatorDescriptor.calculateGroupByTableCardinality(
-                        memoryBudgetInBytes, groupByColumnsCount, ctx.getInitialFrameSize());
+                int allFields = gbyFields.length + (fdFields == null ? 0 : fdFields.length);
+                int hashTableCardinality = ExternalGroupOperatorDescriptor
+                        .calculateGroupByTableCardinality(memoryBudgetInBytes, allFields, ctx.getInitialFrameSize());
                 hashTableCardinality = Math.min(hashTableCardinality, numOfTuples[i]);
                 ISpillableTable partitionTable = spillableTableFactory.buildSpillableTable(ctx, hashTableCardinality,
-                        runs[i].getFileSize(), mergeGroupFields, groupByComparators, nmkComputer,
+                        runs[i].getFileSize(), gbyFields, fdFields, groupByComparators, nmkComputer,
                         mergeAggregatorFactory, partialAggRecordDesc, outRecordDesc, frameLimit, level);
                 RunFileWriter[] runFileWriters = new RunFileWriter[partitionTable.getNumPartitions()];
                 int[] sizeInTuplesNextLevel =
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptorTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptorTest.java
index 794ff98..067c9a7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptorTest.java
@@ -33,8 +33,8 @@
 
         // Sets a dummy variable.
         IOperatorDescriptorRegistry spec = new JobSpecification(32768);
-        ExternalGroupOperatorDescriptor eGByOp =
-                new ExternalGroupOperatorDescriptor(spec, 0, 0, null, 4, null, null, null, null, null, null, null);
+        ExternalGroupOperatorDescriptor eGByOp = new ExternalGroupOperatorDescriptor(spec, 0, 0, null, null, 4, null,
+                null, null, null, null, null, null);
 
         // Test 1: compiler.groupmemory: 512 bytes, frame size: 256 bytes, with 1 column group-by
         long memoryBudgetInBytes = 512;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index 0a57232..ae718bb 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -163,17 +163,18 @@
         int tableSize = 8;
         long fileSize = frameLimits * spec.getFrameSize();
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
-                keyFields, frameLimits, new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(3, false),
-                        new FloatSumFieldAggregatorFactory(5, false) }),
-                new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
-                        new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(2, false),
-                        new FloatSumFieldAggregatorFactory(3, false) }),
-                outputRec, outputRec, new HashSpillableTableFactory(
-                        new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
+        ExternalGroupOperatorDescriptor grouper =
+                new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize, keyFields, null, frameLimits,
+                        new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                        new UTF8StringNormalizedKeyComputerFactory(),
+                        new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(3, false),
+                                new FloatSumFieldAggregatorFactory(5, false) }),
+                        new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(2, false),
+                                new FloatSumFieldAggregatorFactory(3, false) }),
+                        outputRec, outputRec, new HashSpillableTableFactory(
+                                new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
@@ -253,7 +254,7 @@
         long fileSize = frameLimits * spec.getFrameSize();
 
         ExternalGroupOperatorDescriptor grouper =
-                new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize, keyFields, frameLimits,
+                new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize, keyFields, null, frameLimits,
                         new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
                         new UTF8StringNormalizedKeyComputerFactory(),
                         new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
@@ -341,17 +342,18 @@
         int tableSize = 8;
         long fileSize = frameLimits * spec.getFrameSize();
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
-                keyFields, frameLimits, new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
-                                new MinMaxStringFieldAggregatorFactory(15, true, true) }),
-                new MultiFieldsAggregatorFactory(
-                        new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
-                                new MinMaxStringFieldAggregatorFactory(2, true, true) }),
-                outputRec, outputRec, new HashSpillableTableFactory(
-                        new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
+        ExternalGroupOperatorDescriptor grouper =
+                new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize, keyFields, null, frameLimits,
+                        new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                        new UTF8StringNormalizedKeyComputerFactory(),
+                        new MultiFieldsAggregatorFactory(
+                                new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                        new MinMaxStringFieldAggregatorFactory(15, true, true) }),
+                        new MultiFieldsAggregatorFactory(
+                                new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false),
+                                        new MinMaxStringFieldAggregatorFactory(2, true, true) }),
+                        outputRec, outputRec, new HashSpillableTableFactory(
+                                new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
@@ -432,7 +434,7 @@
         long fileSize = frameLimits * spec.getFrameSize();
 
         ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
-                keyFields, frameLimits,
+                keyFields, null, frameLimits,
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
                         UTF8StringBinaryComparatorFactory.INSTANCE },
                 new UTF8StringNormalizedKeyComputerFactory(),
@@ -527,7 +529,7 @@
         long fileSize = frameLimits * spec.getFrameSize();
 
         ExternalGroupOperatorDescriptor grouper =
-                new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize, keyFields, frameLimits,
+                new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize, keyFields, null, frameLimits,
                         new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
                                 UTF8StringBinaryComparatorFactory.INSTANCE },
                         new UTF8StringNormalizedKeyComputerFactory(),
@@ -624,7 +626,7 @@
         long fileSize = frameLimits * spec.getFrameSize();
 
         ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
-                keyFields, frameLimits,
+                keyFields, null, frameLimits,
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
                         UTF8StringBinaryComparatorFactory.INSTANCE },
                 new UTF8StringNormalizedKeyComputerFactory(),
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
index c286e52..5a8ec34 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -126,7 +126,7 @@
         int tableSize = 8;
 
         ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
-                keyFields, fileSize / spec.getFrameSize() + 1,
+                keyFields, null, fileSize / spec.getFrameSize() + 1,
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
                 new UTF8StringNormalizedKeyComputerFactory(),
                 new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
@@ -191,7 +191,7 @@
         int tableSize = 8;
 
         ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
-                keyFields, fileSize / spec.getFrameSize() + 1,
+                keyFields, null, fileSize / spec.getFrameSize() + 1,
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
                 new UTF8StringNormalizedKeyComputerFactory(),
                 new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
index f1a4231..75ecc34 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
@@ -34,14 +35,14 @@
     ExternalGroupWriteOperatorNodePushable mergeOperator;
 
     @Override
-    protected void initial(IHyracksTaskContext ctx, int tableSize, int numFrames) {
+    protected void initial(IHyracksTaskContext ctx, int tableSize, int numFrames) throws HyracksDataException {
         ISpillableTableFactory tableFactory = new HashSpillableTableFactory(
                 new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE });
         buildOperator = new ExternalGroupBuildOperatorNodePushable(ctx, this.hashCode(), tableSize,
-                numFrames * ctx.getInitialFrameSize(), keyFields, numFrames, comparatorFactories,
+                numFrames * ctx.getInitialFrameSize(), keyFields, null, numFrames, comparatorFactories,
                 normalizedKeyComputerFactory, partialAggrInPlace, inRecordDesc, outputRec, tableFactory);
         mergeOperator = new ExternalGroupWriteOperatorNodePushable(ctx, this.hashCode(), tableFactory, outputRec,
-                outputRec, numFrames, keyFieldsAfterPartial, normalizedKeyComputerFactory, comparatorFactories,
+                outputRec, numFrames, keyFieldsAfterPartial, null, normalizedKeyComputerFactory, comparatorFactories,
                 finalAggrInPlace);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
index cf04bee..2bc742a 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
@@ -153,7 +153,7 @@
         IOperatorDescriptor gBy;
         int[] keys = new int[] { 0 };
         if ("hash".equalsIgnoreCase(algo)) {
-            gBy = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize, keys, frameLimit,
+            gBy = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize, keys, null, frameLimit,
                     new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
                     new UTF8StringNormalizedKeyComputerFactory(),
                     new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
index 72660c0..a14b44d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
@@ -148,7 +148,7 @@
         AbstractOperatorDescriptor grouper;
 
         if (alg.equalsIgnoreCase("hash")) {// external hash graph
-            grouper = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize, keys, frameLimit,
+            grouper = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize, keys, null, frameLimit,
                     new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
                     new IntegerNormalizedKeyComputerFactory(),
                     new MultiFieldsAggregatorFactory(
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index 209bf34..3bbeca8 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -221,7 +221,7 @@
                     new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE });
 
             ExternalGroupOperatorDescriptor gby = new ExternalGroupOperatorDescriptor(spec, tableSize,
-                    custFileSize + orderFileSize, new int[] { 6 }, memSize,
+                    custFileSize + orderFileSize, new int[] { 6 }, null, memSize,
                     new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
                     new UTF8StringNormalizedKeyComputerFactory(),
                     new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {