diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index efffda2..4314b3a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -167,10 +167,11 @@
                                         serialAggExpr.setSourceLocation(expr.getSourceLocation());
                                         aggOp.getExpressions().get(i).setValue(serialAggExpr);
                                     }
-                                    ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
-                                            gby.getGroupByList(), physicalOptimizationConfig.getMaxFramesForGroupBy(),
-                                            (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
-                                                    * physicalOptimizationConfig.getFrameSize());
+                                    ExternalGroupByPOperator externalGby =
+                                            new ExternalGroupByPOperator(gby.getGroupByVarList(),
+                                                    physicalOptimizationConfig.getMaxFramesForGroupBy(),
+                                                    (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
+                                                            * physicalOptimizationConfig.getFrameSize());
                                     generateMergeAggregationExpressions(gby, context);
                                     op.setPhysicalOperator(externalGby);
                                     setToExternalGby = true;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 5d19134..404a8dc 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -31,7 +31,6 @@
     EXTERNAL_GROUP_BY,
     EXTERNAL_LOOKUP,
     FORWARD,
-    HASH_GROUP_BY,
     HASH_PARTITION_EXCHANGE,
     HASH_PARTITION_MERGE_EXCHANGE,
     HDFS_READER,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
index 49bf062..0b280f4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
@@ -89,7 +89,7 @@
         return gByList;
     }
 
-    public List<LogicalVariable> getGbyVarList() {
+    public List<LogicalVariable> getGroupByVarList() {
         List<LogicalVariable> varList = new ArrayList<>(gByList.size());
         for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : gByList) {
             ILogicalExpression expr = ve.second.getValue();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index 8535204..64d1b61 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -114,7 +114,7 @@
         }
         ILogicalOperator inputOp = op.getInputs().get(0).getValue();
         long inputCardinality = inputOp.accept(this, arg);
-        List<LogicalVariable> gbyVar = op.getGbyVarList();
+        List<LogicalVariable> gbyVar = op.getGroupByVarList();
         if (inputCardinality == ONE_GROUP && keyVariables.containsAll(gbyVar)) {
             keyVariables.clear();
             return ONE;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index f5f5f96..eaac45d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -388,7 +388,7 @@
         if (op1.getOperatorTag() == LogicalOperatorTag.GROUP) {
             GroupByOperator gby = (GroupByOperator) op1;
             LinkedList<LogicalVariable> tail = new LinkedList<LogicalVariable>();
-            for (LogicalVariable v : gby.getGbyVarList()) {
+            for (LogicalVariable v : gby.getGroupByVarList()) {
                 tail.add(v);
                 // all values for gby vars. are the same
             }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractDistinctByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractDistinctByPOperator.java
new file mode 100644
index 0000000..2352338
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractDistinctByPOperator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+
+public abstract class AbstractDistinctByPOperator extends AbstractPhysicalOperator {
+
+    protected List<LogicalVariable> columnList;
+
+    protected AbstractDistinctByPOperator(List<LogicalVariable> columnList) {
+        this.columnList = columnList;
+    }
+
+    public List<LogicalVariable> getDistinctByColumns() {
+        return columnList;
+    }
+
+    public void setDistinctByColumns(List<LogicalVariable> distinctByColumns) {
+        this.columnList = distinctByColumns;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
+
+    protected int[] getKeysAndDecs(IOperatorSchema inputSchema) {
+        int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchema);
+        int sz = inputSchema.getSize();
+        int fdSz = sz - columnList.size();
+        int[] fdColumns = new int[fdSz];
+        int j = 0;
+        for (LogicalVariable v : inputSchema) {
+            if (!columnList.contains(v)) {
+                fdColumns[j++] = inputSchema.findVariable(v);
+            }
+        }
+        int[] keysAndDecs = new int[keys.length + fdColumns.length];
+        for (int i = 0; i < keys.length; i++) {
+            keysAndDecs[i] = keys[i];
+        }
+        for (int i = 0; i < fdColumns.length; i++) {
+            keysAndDecs[i + keys.length] = fdColumns[i];
+        }
+        return keysAndDecs;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
new file mode 100644
index 0000000..ce6dedc
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+
+public abstract class AbstractGroupByPOperator extends AbstractPhysicalOperator {
+
+    protected List<LogicalVariable> columnList;
+
+    protected final int framesLimit;
+
+    protected AbstractGroupByPOperator(List<LogicalVariable> columnList, int framesLimit) {
+        this.columnList = columnList;
+        this.framesLimit = framesLimit;
+    }
+
+    public List<LogicalVariable> getGroupByColumns() {
+        return columnList;
+    }
+
+    public void setGroupByColumns(List<LogicalVariable> columnList) {
+        this.columnList = columnList;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return getOperatorTag().toString() + columnList;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
index c604e5c..a81bf97 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreSortedDistinctByPOperator.java
@@ -27,7 +27,6 @@
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
@@ -38,18 +37,11 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 
-public abstract class AbstractPreSortedDistinctByPOperator extends AbstractPhysicalOperator {
+public abstract class AbstractPreSortedDistinctByPOperator extends AbstractDistinctByPOperator {
 
-    protected List<LogicalVariable> columnList;
-
-    public AbstractPreSortedDistinctByPOperator(List<LogicalVariable> columnList) {
-        this.columnList = columnList;
-    }
-
-    public void setDistinctByColumns(List<LogicalVariable> distinctByColumns) {
-        this.columnList = distinctByColumns;
+    protected AbstractPreSortedDistinctByPOperator(List<LogicalVariable> columnList) {
+        super(columnList);
     }
 
     @Override
@@ -78,31 +70,4 @@
         pv[0] = new StructuralPropertiesVector(pp, localProps);
         return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
     }
-
-    @Override
-    public boolean expensiveThanMaterialization() {
-        return true;
-    }
-
-    protected int[] getKeysAndDecs(IOperatorSchema inputSchema) {
-        int keys[] = JobGenHelper.variablesToFieldIndexes(columnList, inputSchema);
-        int sz = inputSchema.getSize();
-        int fdSz = sz - columnList.size();
-        int[] fdColumns = new int[fdSz];
-        int j = 0;
-        for (LogicalVariable v : inputSchema) {
-            if (!columnList.contains(v)) {
-                fdColumns[j++] = inputSchema.findVariable(v);
-            }
-        }
-        int[] keysAndDecs = new int[keys.length + fdColumns.length];
-        for (int i = 0; i < keys.length; i++) {
-            keysAndDecs[i] = keys[i];
-        }
-        for (int i = 0; i < fdColumns.length; i++) {
-            keysAndDecs[i + keys.length] = fdColumns[i];
-        }
-        return keysAndDecs;
-    }
-
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index c18d76c..23a411c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -57,25 +57,10 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 
-public abstract class AbstractPreclusteredGroupByPOperator extends AbstractPhysicalOperator {
+public abstract class AbstractPreclusteredGroupByPOperator extends AbstractGroupByPOperator {
 
-    protected List<LogicalVariable> columnList;
-
-    public AbstractPreclusteredGroupByPOperator(List<LogicalVariable> columnList) {
-        this.columnList = columnList;
-    }
-
-    @Override
-    public String toString() {
-        return getOperatorTag().toString() + columnList;
-    }
-
-    public List<LogicalVariable> getGbyColumns() {
-        return columnList;
-    }
-
-    public void setGbyColumns(List<LogicalVariable> gByColumns) {
-        this.columnList = gByColumns;
+    protected AbstractPreclusteredGroupByPOperator(List<LogicalVariable> columnList, int framesLimit) {
+        super(columnList, framesLimit);
     }
 
     // Obs: We don't propagate properties corresponding to decors, since they
@@ -170,7 +155,7 @@
                     IPhysicalOperator pop2 = op2.getPhysicalOperator();
                     if (pop2 instanceof AbstractPreclusteredGroupByPOperator) {
                         List<LogicalVariable> gbyColumns =
-                                ((AbstractPreclusteredGroupByPOperator) pop2).getGbyColumns();
+                                ((AbstractPreclusteredGroupByPOperator) pop2).getGroupByColumns();
                         List<LogicalVariable> sndOrder = new ArrayList<>();
                         sndOrder.addAll(gbyColumns);
                         Set<LogicalVariable> freeVars = new HashSet<>();
@@ -230,7 +215,7 @@
                 }
                 List<FunctionalDependency> fdList = new ArrayList<>();
                 for (Pair<LogicalVariable, Mutable<ILogicalExpression>> decorPair : gby.getDecorList()) {
-                    List<LogicalVariable> hd = gby.getGbyVarList();
+                    List<LogicalVariable> hd = gby.getGroupByVarList();
                     List<LogicalVariable> tl = new ArrayList<>();
                     tl.add(((VariableReferenceExpression) decorPair.second.getValue()).getVariableReference());
                     fdList.add(new FunctionalDependency(hd, tl));
@@ -301,18 +286,13 @@
         return null;
     }
 
-    @Override
-    public boolean expensiveThanMaterialization() {
-        return true;
-    }
-
     // Returns the local structure property that is propagated from an input local structure property
     // through a pre-clustered GROUP BY physical operator.
     private ILocalStructuralProperty getPropagatedProperty(ILocalStructuralProperty lsp, GroupByOperator gby) {
         PropertyType propertyType = lsp.getPropertyType();
         if (propertyType == PropertyType.LOCAL_GROUPING_PROPERTY) {
             // A new grouping property is generated.
-            return new LocalGroupingProperty(new ListSet<>(gby.getGbyVarList()));
+            return new LocalGroupingProperty(new ListSet<>(gby.getGroupByVarList()));
         } else {
             LocalOrderProperty lop = (LocalOrderProperty) lsp;
             List<OrderColumn> orderColumns = new ArrayList<>();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 652196d..927beae5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -66,28 +66,13 @@
 import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
 import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
 
-public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
+public class ExternalGroupByPOperator extends AbstractGroupByPOperator {
 
     private final long inputSize;
-    private final int frameLimit;
-    private List<LogicalVariable> columnSet = new ArrayList<LogicalVariable>();
 
-    public ExternalGroupByPOperator(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList, int frameLimit,
-            long fileSize) {
-        this.frameLimit = frameLimit;
+    public ExternalGroupByPOperator(List<LogicalVariable> columnList, int framesLimit, long fileSize) {
+        super(columnList, framesLimit);
         this.inputSize = fileSize;
-        computeColumnSet(gbyList);
-    }
-
-    public void computeColumnSet(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList) {
-        columnSet.clear();
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
-            ILogicalExpression expr = p.second.getValue();
-            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                VariableReferenceExpression v = (VariableReferenceExpression) expr;
-                columnSet.add(v.getVariableReference());
-            }
-        }
     }
 
     @Override
@@ -96,19 +81,10 @@
     }
 
     @Override
-    public String toString() {
-        return getOperatorTag().toString() + columnSet;
-    }
-
-    @Override
     public boolean isMicroOperator() {
         return false;
     }
 
-    public List<LogicalVariable> getGbyColumns() {
-        return columnSet;
-    }
-
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
         List<ILocalStructuralProperty> propsLocal = new LinkedList<ILocalStructuralProperty>();
@@ -138,7 +114,7 @@
         if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
             StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
             pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(
-                    new ListSet<LogicalVariable>(columnSet), context.getComputationNodeDomain()), null);
+                    new ListSet<LogicalVariable>(columnList), context.getComputationNodeDomain()), null);
             return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
         } else {
             return emptyUnaryRequirements();
@@ -149,7 +125,7 @@
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
-        List<LogicalVariable> gbyCols = getGbyColumns();
+        List<LogicalVariable> gbyCols = getGroupByColumns();
         int keys[] = JobGenHelper.variablesToFieldIndexes(gbyCols, inputSchemas[0]);
         GroupByOperator gby = (GroupByOperator) op;
         int numFds = gby.getDecorList().size();
@@ -262,14 +238,14 @@
                 JobGenHelper.variablesToAscNormalizedKeyComputerFactory(gbyCols, aggOpInputEnv, context);
 
         // Calculates the hash table size (# of unique hash values) based on the budget and a tuple size.
-        int memoryBudgetInBytes = context.getFrameSize() * frameLimit;
+        int memoryBudgetInBytes = context.getFrameSize() * framesLimit;
         int groupByColumnsCount = gby.getGroupByList().size() + numFds;
         int hashTableSize = ExternalGroupOperatorDescriptor.calculateGroupByTableCardinality(memoryBudgetInBytes,
                 groupByColumnsCount, context.getFrameSize());
 
         ExternalGroupOperatorDescriptor gbyOpDesc = new ExternalGroupOperatorDescriptor(spec, hashTableSize, inputSize,
-                keyAndDecFields, frameLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory, mergeFactory,
-                recordDescriptor, recordDescriptor, new HashSpillableTableFactory(hashFunctionFactories));
+                keyAndDecFields, framesLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory,
+                mergeFactory, recordDescriptor, recordDescriptor, new HashSpillableTableFactory(hashFunctionFactories));
         gbyOpDesc.setSourceLocation(gby.getSourceLocation());
         contributeOpDesc(builder, gby, gbyOpDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
@@ -282,10 +258,4 @@
         int[] outputDependencyLabels = new int[] { 1 };
         return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
     }
-
-    @Override
-    public boolean expensiveThanMaterialization() {
-        return true;
-    }
-
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreSortedDistinctByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreSortedDistinctByPOperator.java
index fda879c..109b4f7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreSortedDistinctByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreSortedDistinctByPOperator.java
@@ -73,7 +73,7 @@
 
         /* make fd columns part of the key but the comparator only compares the distinct key columns */
         MicroPreClusteredGroupRuntimeFactory runtime = new MicroPreClusteredGroupRuntimeFactory(keysAndDecs,
-                comparatorFactories, aggregatorFactory, inputRecordDesc, recordDescriptor, null);
+                comparatorFactories, aggregatorFactory, inputRecordDesc, recordDescriptor, null, -1);
         runtime.setSourceLocation(op.getSourceLocation());
         builder.contributeMicroOperator(op, runtime, recordDescriptor);
         ILogicalOperator src = op.getInputs().get(0).getValue();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
index 13308a1..350bcfb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroPreclusteredGroupByPOperator.java
@@ -39,8 +39,8 @@
 
 public class MicroPreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOperator {
 
-    public MicroPreclusteredGroupByPOperator(List<LogicalVariable> columnList) {
-        super(columnList);
+    public MicroPreclusteredGroupByPOperator(List<LogicalVariable> columnList, int framesLimit) {
+        super(columnList, framesLimit);
     }
 
     @Override
@@ -73,11 +73,10 @@
         RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
         MicroPreClusteredGroupRuntimeFactory runtime = new MicroPreClusteredGroupRuntimeFactory(keys,
-                comparatorFactories, aggregatorFactory, inputRecordDesc, recordDescriptor, null);
+                comparatorFactories, aggregatorFactory, inputRecordDesc, recordDescriptor, null, framesLimit);
         runtime.setSourceLocation(gby.getSourceLocation());
         builder.contributeMicroOperator(gby, runtime, recordDescriptor);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
     }
-
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
index 7971e78..1f8451c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
@@ -72,7 +72,7 @@
         if (originalLocalProperties != null) {
             newLocalProperties = new ArrayList<>();
             for (ILocalStructuralProperty lsp : originalLocalProperties) {
-                ILocalStructuralProperty groupLocalLsp = lsp.regardToGroup(gby.getGbyVarList());
+                ILocalStructuralProperty groupLocalLsp = lsp.regardToGroup(gby.getGroupByVarList());
                 if (groupLocalLsp != null) {
                     // Adds the property that is satisfied in the context of a particular group.
                     newLocalProperties.add(groupLocalLsp);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
index dd4c65f..61fc51f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
@@ -72,7 +72,7 @@
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
         /* make fd columns part of the key but the comparator only compares the distinct key columns */
         PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keysAndDecs,
-                comparatorFactories, aggregatorFactory, recordDescriptor);
+                comparatorFactories, aggregatorFactory, recordDescriptor, false, -1);
         opDesc.setSourceLocation(op.getSourceLocation());
 
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index e5076ce..b6faa36 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -43,12 +43,10 @@
 public class PreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOperator {
 
     private final boolean groupAll;
-    private final int framesLimit;
 
     public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, boolean groupAll, int framesLimit) {
-        super(columnList);
+        super(columnList, framesLimit);
         this.groupAll = groupAll;
-        this.framesLimit = framesLimit;
     }
 
     @Override
@@ -101,5 +99,4 @@
     public String toString() {
         return getOperatorTag().toString() + (groupAll ? "(ALL)" : "") + columnList;
     }
-
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
index f2a0b71..af8161f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
@@ -65,28 +65,13 @@
 import org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor;
 
-public class SortGroupByPOperator extends AbstractPhysicalOperator {
+public class SortGroupByPOperator extends AbstractGroupByPOperator {
 
-    private final int frameLimit;
     private final OrderColumn[] orderColumns;
-    private final List<LogicalVariable> columnSet = new ArrayList<LogicalVariable>();
 
-    public SortGroupByPOperator(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList, int frameLimit,
-            OrderColumn[] orderColumns) {
-        this.frameLimit = frameLimit;
+    public SortGroupByPOperator(List<LogicalVariable> columnList, int framesLimit, OrderColumn[] orderColumns) {
+        super(columnList, framesLimit);
         this.orderColumns = orderColumns;
-        computeColumnSet(gbyList);
-    }
-
-    private void computeColumnSet(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList) {
-        columnSet.clear();
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
-            ILogicalExpression expr = p.second.getValue();
-            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                VariableReferenceExpression v = (VariableReferenceExpression) expr;
-                columnSet.add(v.getVariableReference());
-            }
-        }
     }
 
     @Override
@@ -95,19 +80,10 @@
     }
 
     @Override
-    public String toString() {
-        return getOperatorTag().toString() + columnSet;
-    }
-
-    @Override
     public boolean isMicroOperator() {
         return false;
     }
 
-    private List<LogicalVariable> getGbyColumns() {
-        return columnSet;
-    }
-
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
         List<ILocalStructuralProperty> propsLocal = new LinkedList<ILocalStructuralProperty>();
@@ -145,7 +121,7 @@
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
-        List<LogicalVariable> gbyCols = getGbyColumns();
+        List<LogicalVariable> gbyCols = getGroupByColumns();
         int keys[] = JobGenHelper.variablesToFieldIndexes(gbyCols, inputSchemas[0]);
         GroupByOperator gby = (GroupByOperator) op;
         int numFds = gby.getDecorList().size();
@@ -273,7 +249,7 @@
         normalizedKeyFactory =
                 orderColumns[0].getOrder() == OrderKind.ASC ? nkcfProvider.getNormalizedKeyComputerFactory(type, true)
                         : nkcfProvider.getNormalizedKeyComputerFactory(type, false);
-        SortGroupByOperatorDescriptor gbyOpDesc = new SortGroupByOperatorDescriptor(spec, frameLimit, keys,
+        SortGroupByOperatorDescriptor gbyOpDesc = new SortGroupByOperatorDescriptor(spec, framesLimit, keys,
                 keyAndDecFields, normalizedKeyFactory, compFactories, aggregatorFactory, mergeFactory,
                 partialAggRecordDescriptor, recordDescriptor, false);
         gbyOpDesc.setSourceLocation(gby.getSourceLocation());
@@ -289,9 +265,4 @@
         int[] outputDependencyLabels = new int[] { 1 };
         return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
     }
-
-    @Override
-    public boolean expensiveThanMaterialization() {
-        return true;
-    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
index 66ee453..61a843c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -43,13 +43,13 @@
     private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites;
     private final ILogicalPlan plan;
 
-    private static final PhysicalOperatorTag[] hyracksOperators = new PhysicalOperatorTag[] {
-            PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH,
-            PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HASH_GROUP_BY, PhysicalOperatorTag.HDFS_READER,
-            PhysicalOperatorTag.HYBRID_HASH_JOIN, PhysicalOperatorTag.IN_MEMORY_HASH_JOIN,
-            PhysicalOperatorTag.NESTED_LOOP, PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY,
-            PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY, PhysicalOperatorTag.REPLICATE, PhysicalOperatorTag.STABLE_SORT,
-            PhysicalOperatorTag.UNION_ALL, PhysicalOperatorTag.FORWARD };
+    private static final PhysicalOperatorTag[] hyracksOperators =
+            new PhysicalOperatorTag[] { PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH,
+                    PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HDFS_READER,
+                    PhysicalOperatorTag.HYBRID_HASH_JOIN, PhysicalOperatorTag.IN_MEMORY_HASH_JOIN,
+                    PhysicalOperatorTag.NESTED_LOOP, PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY,
+                    PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY, PhysicalOperatorTag.REPLICATE,
+                    PhysicalOperatorTag.STABLE_SORT, PhysicalOperatorTag.UNION_ALL, PhysicalOperatorTag.FORWARD };
     public static final PhysicalOperatorTag[] hyraxOperatorsBelowWhichJobGenIsDisabled = new PhysicalOperatorTag[] {};
 
     public HeuristicOptimizer(ILogicalPlan plan,
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
index be26702..d9746b2 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
@@ -140,7 +140,7 @@
         Map<String, Object> annotations = newGbyOp.getAnnotations();
         annotations.putAll(gbyOp.getAnnotations());
 
-        List<LogicalVariable> gbyVars = gbyOp.getGbyVarList();
+        List<LogicalVariable> gbyVars = gbyOp.getGroupByVarList();
 
         // Backup nested plans since tryToPushSubplan(...) may mutate them.
         List<ILogicalPlan> gbyNestedPlans = gbyOp.getNestedPlans();
@@ -306,7 +306,7 @@
             return true;
         } else {
             GroupByOperator nestedGby = (GroupByOperator) op3;
-            List<LogicalVariable> gbyVars2 = nestedGby.getGbyVarList();
+            List<LogicalVariable> gbyVars2 = nestedGby.getGroupByVarList();
             Set<LogicalVariable> freeVars = new HashSet<>();
             // Removes non-free variables defined in the nested plan.
             OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc(nestedGby, freeVars);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java
index 19dc21e..4368d39 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java
@@ -59,7 +59,7 @@
         if (!groupOp.isGroupAll()) {
             return false;
         }
-        List<LogicalVariable> groupVars = groupOp.getGbyVarList();
+        List<LogicalVariable> groupVars = groupOp.getGroupByVarList();
         List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorList = groupOp.getDecorList();
         if (!groupVars.isEmpty() || !decorList.isEmpty()) {
             return false;
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 7dc596c..67c1f1d 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -59,12 +59,11 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.FDsAndEquivClassesVisitor;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPreSortedDistinctByPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPreclusteredGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
@@ -387,18 +386,12 @@
     private void optimizeUsingConstraintsAndEquivClasses(AbstractLogicalOperator op) {
         IPhysicalOperator pOp = op.getPhysicalOperator();
         switch (pOp.getOperatorTag()) {
-            case HASH_GROUP_BY:
-            case EXTERNAL_GROUP_BY: {
-                GroupByOperator gby = (GroupByOperator) op;
-                ExternalGroupByPOperator hgbyOp = (ExternalGroupByPOperator) pOp;
-                hgbyOp.computeColumnSet(gby.getGroupByList());
-                break;
-            }
+            case EXTERNAL_GROUP_BY:
             case PRE_CLUSTERED_GROUP_BY:
             case MICRO_PRE_CLUSTERED_GROUP_BY: {
                 GroupByOperator gby = (GroupByOperator) op;
-                AbstractPreclusteredGroupByPOperator preSortedGby = (AbstractPreclusteredGroupByPOperator) pOp;
-                preSortedGby.setGbyColumns(gby.getGbyVarList());
+                AbstractGroupByPOperator gbyPhysOp = (AbstractGroupByPOperator) pOp;
+                gbyPhysOp.setGroupByColumns(gby.getGroupByVarList());
                 break;
             }
             case PRE_SORTED_DISTINCT_BY:
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
index 192e318..4c57f21 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java
@@ -103,7 +103,7 @@
 
                         //replace preclustered gby with sort gby
                         if (!groupByOperator.isGroupAll()) {
-                            op.setPhysicalOperator(new SortGroupByPOperator(groupByOperator.getGroupByList(),
+                            op.setPhysicalOperator(new SortGroupByPOperator(groupByOperator.getGroupByVarList(),
                                     context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy(),
                                     sortPhysicalOperator.getSortColumns()));
                         }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index a7bf11e..1d5a7e9 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -124,7 +124,6 @@
         }
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
     private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, boolean topLevelOp,
             IOptimizationContext context) throws AlgebricksException {
         PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
@@ -175,10 +174,11 @@
 
                                 boolean hasIntermediateAgg = generateMergeAggregationExpressions(gby, context);
                                 if (hasIntermediateAgg) {
-                                    ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
-                                            gby.getGroupByList(), physicalOptimizationConfig.getMaxFramesForGroupBy(),
-                                            (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
-                                                    * physicalOptimizationConfig.getFrameSize());
+                                    ExternalGroupByPOperator externalGby =
+                                            new ExternalGroupByPOperator(gby.getGroupByVarList(),
+                                                    physicalOptimizationConfig.getMaxFramesForGroupBy(),
+                                                    (long) physicalOptimizationConfig.getMaxFramesForGroupBy()
+                                                            * physicalOptimizationConfig.getFrameSize());
                                     op.setPhysicalOperator(externalGby);
                                     break;
                                 }
@@ -186,20 +186,12 @@
                         }
                     }
 
-                    List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
-                    List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
-                    for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
-                        ILogicalExpression expr = p.second.getValue();
-                        if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                            VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
-                            columnList.add(varRef.getVariableReference());
-                        }
-                    }
                     if (topLevelOp) {
-                        op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
-                                context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
+                        op.setPhysicalOperator(new PreclusteredGroupByPOperator(gby.getGroupByVarList(),
+                                gby.isGroupAll(), context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
                     } else {
-                        op.setPhysicalOperator(new MicroPreclusteredGroupByPOperator(columnList));
+                        op.setPhysicalOperator(new MicroPreclusteredGroupByPOperator(gby.getGroupByVarList(),
+                                context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy()));
                     }
                     break;
                 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
index 4678887..b778d06 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/group/MicroPreClusteredGroupRuntimeFactory.java
@@ -39,10 +39,11 @@
     private final IAggregatorDescriptorFactory aggregatorFactory;
     private final RecordDescriptor inRecordDesc;
     private final RecordDescriptor outRecordDesc;
+    private final int framesLimit;
 
     public MicroPreClusteredGroupRuntimeFactory(int[] groupFields, IBinaryComparatorFactory[] comparatorFactories,
             IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
-            RecordDescriptor outRecordDesc, int[] projectionList) {
+            RecordDescriptor outRecordDesc, int[] projectionList, int framesLimit) {
         super(projectionList);
         // Obs: the projection list is currently ignored.
         if (projectionList != null) {
@@ -53,6 +54,7 @@
         this.aggregatorFactory = aggregatorFactory;
         this.inRecordDesc = inRecordDesc;
         this.outRecordDesc = outRecordDesc;
+        this.framesLimit = framesLimit;
     }
 
     @Override
@@ -70,7 +72,7 @@
             @Override
             public void open() throws HyracksDataException {
                 pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc,
-                        outRecordDesc, writer);
+                        outRecordDesc, writer, false, false, framesLimit);
                 pgw.open();
             }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 395b553..99967c1 100644
--- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -443,7 +443,7 @@
         RecordDescriptor gbyDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
         PreclusteredGroupOperatorDescriptor gby = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 3 },
-                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, npaaf, gbyDesc);
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, npaaf, gbyDesc, false, -1);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, gby,
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
@@ -853,7 +853,7 @@
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
         MicroPreClusteredGroupRuntimeFactory gby = new MicroPreClusteredGroupRuntimeFactory(new int[] { 3 },
                 new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE }, npaaf, sortDesc, gbyDesc,
-                null);
+                null, -1);
 
         // the algebricks op.
         IScalarEvaluatorFactory cond =
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
index 739de74..8cf8401 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
@@ -39,18 +39,6 @@
 
     public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
             IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
-            RecordDescriptor recordDescriptor) {
-        this(spec, groupFields, comparatorFactories, aggregatorFactory, recordDescriptor, false, -1);
-    }
-
-    public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
-            IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
-            RecordDescriptor recordDescriptor, int framesLimit) {
-        this(spec, groupFields, comparatorFactories, aggregatorFactory, recordDescriptor, false, framesLimit);
-    }
-
-    public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields,
-            IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
             RecordDescriptor recordDescriptor, boolean groupAll, int framesLimit) {
         super(spec, 1, 1);
         this.groupFields = groupFields;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index bb731fe..14ce5ac 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -60,12 +60,6 @@
 
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
             IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
-            RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
-        this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc, writer, false, false, -1);
-    }
-
-    public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
-            IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc,
             RecordDescriptor outRecordDesc, IFrameWriter writer, boolean outputPartial) throws HyracksDataException {
         this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc, writer, outputPartial,
                 false, -1);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index feba97e..da85e74 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -125,7 +125,7 @@
                 new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
                         new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
                         new FloatSumFieldAggregatorFactory(5, true) }),
-                outputRec);
+                outputRec, false, -1);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
@@ -214,7 +214,7 @@
                         new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
                                 new IntSumFieldAggregatorFactory(1, true), new CountFieldAggregatorFactory(true),
                                 new AvgFieldGroupAggregatorFactory(1, true) }),
-                        outputRec);
+                        outputRec, false, -1);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
@@ -303,7 +303,7 @@
                 new MultiFieldsAggregatorFactory(
                         new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, true),
                                 new MinMaxStringFieldAggregatorFactory(15, true, false) }),
-                outputRec);
+                outputRec, false, -1);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
@@ -391,7 +391,7 @@
                         UTF8StringBinaryComparatorFactory.INSTANCE },
                 new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
                         new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true) }),
-                outputRec);
+                outputRec, false, -1);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
@@ -485,7 +485,7 @@
                 new MultiFieldsAggregatorFactory(
                         new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, true),
                                 new CountFieldAggregatorFactory(true), new AvgFieldGroupAggregatorFactory(1, true) }),
-                outputRec);
+                outputRec, false, -1);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
@@ -583,7 +583,7 @@
                 new MultiFieldsAggregatorFactory(
                         new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, true),
                                 new MinMaxStringFieldAggregatorFactory(15, true, false) }),
-                outputRec);
+                outputRec, false, -1);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
index 508d017..367fc51d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/CountOfCountsTest.java
@@ -78,12 +78,11 @@
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
                 new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE });
-        PreclusteredGroupOperatorDescriptor group =
-                new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
-                        new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                        new MultiFieldsAggregatorFactory(
-                                new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
-                        desc2);
+        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+                desc2, false, -1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC2_ID);
 
         InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
@@ -92,12 +91,11 @@
 
         RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-        PreclusteredGroupOperatorDescriptor group2 =
-                new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                        new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
-                        new MultiFieldsAggregatorFactory(
-                                new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
-                        desc3);
+        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+                desc3, false, -1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -149,12 +147,11 @@
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
                 new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE });
-        PreclusteredGroupOperatorDescriptor group =
-                new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
-                        new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                        new MultiFieldsAggregatorFactory(
-                                new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
-                        desc2);
+        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+                desc2, false, -1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
 
         InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
@@ -163,12 +160,11 @@
 
         RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-        PreclusteredGroupOperatorDescriptor group2 =
-                new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                        new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
-                        new MultiFieldsAggregatorFactory(
-                                new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
-                        desc3);
+        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+                desc3, false, -1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -221,12 +217,11 @@
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
                 new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE });
-        PreclusteredGroupOperatorDescriptor group =
-                new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
-                        new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                        new MultiFieldsAggregatorFactory(
-                                new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
-                        desc2);
+        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+                desc2, false, -1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group, NC1_ID, NC2_ID, NC1_ID, NC2_ID);
 
         InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
@@ -235,12 +230,11 @@
 
         RecordDescriptor desc3 = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-        PreclusteredGroupOperatorDescriptor group2 =
-                new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                        new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
-                        new MultiFieldsAggregatorFactory(
-                                new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
-                        desc3);
+        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
+                new IBinaryComparatorFactory[] { IntegerBinaryComparatorFactory.INSTANCE },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+                desc3, false, -1);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
index 43598a5..cf04bee 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
@@ -192,7 +192,7 @@
                     new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
                     new MultiFieldsAggregatorFactory(
                             new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
-                    groupResultDesc);
+                    groupResultDesc, false, -1);
             createPartitionConstraint(spec, gBy, outSplits);
             OneToOneConnectorDescriptor sortGroupConn = new OneToOneConnectorDescriptor(spec);
             spec.connect(sortGroupConn, sorter, 0, gBy, 0);
