[ASTERIXDB-2837][COMP] Improve subplan consolidation

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Improve EliminateIsomorphicSubplanRule so it can
  consolidate subplans with different AGGREGATE and
  ASSIGN operators at the top of their nested plans
- Improve PushAggFuncIntoStandaloneAggregateRule and
  PushAggregateIntoNestedSubplanRule so they can handle
  aggregate operators with multiple output variables

Change-Id: I4c205b962446eaae7b1394b46be3f11c2b3e25c7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10584
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 3cd3e61..fc52f89 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -185,6 +185,8 @@
         normalization.add(new CheckInsertUpsertReturningRule());
         normalization.add(new IntroduceUnnestForCollectionToSequenceRule());
         normalization.add(new EliminateSubplanRule());
+        // The following rule must run before PushAggregateIntoNestedSubplanRule
+        normalization.add(new EliminateIsomorphicSubplanRule());
         normalization.add(new EnforceOrderByAfterSubplan());
         normalization.add(new BreakSelectIntoConjunctsRule());
         normalization.add(new ExtractGbyExpressionsRule());
@@ -236,9 +238,6 @@
         condPushDownAndJoinInference
                 .add(new AsterixPushMapOperatorThroughUnionRule(LogicalOperatorTag.ASSIGN, LogicalOperatorTag.SELECT));
         condPushDownAndJoinInference.add(new SubplanOutOfGroupRule());
-        // The following rule must run before PushAggregateIntoNestedSubplanRule
-        // (before common subplans diverge due to aggregate pushdown)
-        condPushDownAndJoinInference.add(new EliminateIsomorphicSubplanRule());
 
         condPushDownAndJoinInference.add(new AsterixExtractFunctionsFromJoinConditionRule());
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java
index 1fff73a..70b5450 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggFuncIntoStandaloneAggregateRule.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.optimizer.rules;
 
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.asterix.lang.sqlpp.util.SqlppVariableUtil;
@@ -72,13 +71,13 @@
         if (op2.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
             AggregateOperator aggOp = (AggregateOperator) op2;
             // Make sure the agg expr is a listify.
-            return pushAggregateFunction(aggOp, assignOp, context);
+            return pushAggregateFunction(assignOp, aggOp, context);
         } else if (op2.getOperatorTag() == LogicalOperatorTag.INNERJOIN
                 || op2.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
             AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op2;
             // Tries to push aggregates through the join.
             if (containsAggregate(assignOp.getExpressions()) && pushableThroughJoin(join)) {
-                return pushAggregateFunctionThroughJoin(join, assignOp, context);
+                return pushAggregateFunctionThroughJoin(assignOp, join, context);
             }
         }
         return false;
@@ -87,7 +86,6 @@
     /**
      * Recursively check whether the list of expressions contains an aggregate function.
      *
-     * @param exprRefs
      * @return true if the list contains an aggregate function and false otherwise.
      */
     private boolean containsAggregate(List<Mutable<ILogicalExpression>> exprRefs) {
@@ -116,7 +114,6 @@
      * 1) the join condition is true;
      * 2) each join branch produces only one tuple.
      *
-     * @param join
      * @return true if pushable
      */
     private boolean pushableThroughJoin(AbstractBinaryJoinOperator join) {
@@ -144,100 +141,114 @@
     /**
      * Does the actual push of aggregates for qualified joins.
      *
-     * @param join
      * @param assignOp
      *            that contains aggregate function calls.
      * @param context
      * @throws AlgebricksException
      */
-    private boolean pushAggregateFunctionThroughJoin(AbstractBinaryJoinOperator join, AssignOperator assignOp,
+    private boolean pushAggregateFunctionThroughJoin(AssignOperator assignOp, AbstractBinaryJoinOperator join,
             IOptimizationContext context) throws AlgebricksException {
         boolean applied = false;
         for (Mutable<ILogicalOperator> branchRef : join.getInputs()) {
             AbstractLogicalOperator branch = (AbstractLogicalOperator) branchRef.getValue();
             if (branch.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
                 AggregateOperator aggOp = (AggregateOperator) branch;
-                applied |= pushAggregateFunction(aggOp, assignOp, context);
+                applied |= pushAggregateFunction(assignOp, aggOp, context);
             } else if (branch.getOperatorTag() == LogicalOperatorTag.INNERJOIN
                     || branch.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
                 AbstractBinaryJoinOperator childJoin = (AbstractBinaryJoinOperator) branch;
-                applied |= pushAggregateFunctionThroughJoin(childJoin, assignOp, context);
+                applied |= pushAggregateFunctionThroughJoin(assignOp, childJoin, context);
             }
         }
         return applied;
     }
 
-    private boolean pushAggregateFunction(AggregateOperator aggOp, AssignOperator assignOp,
+    private boolean pushAggregateFunction(AssignOperator assignOp, AggregateOperator aggOp,
             IOptimizationContext context) throws AlgebricksException {
-        Mutable<ILogicalOperator> opRef3 = aggOp.getInputs().get(0);
-        AbstractLogicalOperator op3 = (AbstractLogicalOperator) opRef3.getValue();
+        Mutable<ILogicalOperator> aggChilldOpRef = aggOp.getInputs().get(0);
+        AbstractLogicalOperator aggChildOp = (AbstractLogicalOperator) aggChilldOpRef.getValue();
         // If there's a group by below the agg, then we want to have the agg pushed into the group by
-        if (op3.getOperatorTag() == LogicalOperatorTag.GROUP && !((GroupByOperator) op3).getNestedPlans().isEmpty()) {
-            return false;
-        }
-        if (aggOp.getVariables().size() != 1) {
-            return false;
-        }
-        ILogicalExpression aggExpr = aggOp.getExpressions().get(0).getValue();
-        if (aggExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
-            return false;
-        }
-        AbstractFunctionCallExpression origAggFuncExpr = (AbstractFunctionCallExpression) aggExpr;
-        if (origAggFuncExpr.getFunctionIdentifier() != BuiltinFunctions.LISTIFY) {
+        if (aggChildOp.getOperatorTag() == LogicalOperatorTag.GROUP
+                && !((GroupByOperator) aggChildOp).getNestedPlans().isEmpty()) {
             return false;
         }
 
-        LogicalVariable aggVar = aggOp.getVariables().get(0);
-        List<LogicalVariable> used = new LinkedList<LogicalVariable>();
-        VariableUtilities.getUsedVariables(assignOp, used);
-        if (!used.contains(aggVar)) {
+        List<LogicalVariable> assignUsedVars = new ArrayList<>();
+        VariableUtilities.getUsedVariables(assignOp, assignUsedVars);
+
+        List<Mutable<ILogicalExpression>> assignScalarAggExprRefs = new ArrayList<>();
+        List<LogicalVariable> aggAddVars = null;
+        List<Mutable<ILogicalExpression>> aggAddExprs = null;
+
+        for (int i = 0, n = aggOp.getVariables().size(); i < n; i++) {
+            LogicalVariable aggVar = aggOp.getVariables().get(i);
+            Mutable<ILogicalExpression> aggExprRef = aggOp.getExpressions().get(i);
+            ILogicalExpression aggExpr = aggExprRef.getValue();
+            if (aggExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+                continue;
+            }
+            AbstractFunctionCallExpression listifyCandidateExpr = (AbstractFunctionCallExpression) aggExpr;
+            if (listifyCandidateExpr.getFunctionIdentifier() != BuiltinFunctions.LISTIFY) {
+                continue;
+            }
+            if (!assignUsedVars.contains(aggVar)) {
+                continue;
+            }
+            assignScalarAggExprRefs.clear();
+            findScalarAggFuncExprRef(assignOp.getExpressions(), aggVar, assignScalarAggExprRefs);
+            if (assignScalarAggExprRefs.isEmpty()) {
+                continue;
+            }
+            // perform rewrite
+            if (aggAddVars == null) {
+                aggAddVars = new ArrayList<>();
+                aggAddExprs = new ArrayList<>();
+            }
+            for (Mutable<ILogicalExpression> assignScalarAggExprRef : assignScalarAggExprRefs) {
+                AbstractFunctionCallExpression assignScalarAggExpr =
+                        (AbstractFunctionCallExpression) assignScalarAggExprRef.getValue();
+                FunctionIdentifier aggFuncIdent =
+                        BuiltinFunctions.getAggregateFunction(assignScalarAggExpr.getFunctionIdentifier());
+
+                // Push the scalar aggregate function into the aggregate op.
+                int sz = assignScalarAggExpr.getArguments().size();
+                List<Mutable<ILogicalExpression>> aggArgs = new ArrayList<>(sz);
+                aggArgs.add(listifyCandidateExpr.getArguments().get(0));
+                aggArgs.addAll(assignScalarAggExpr.getArguments().subList(1, sz));
+                AggregateFunctionCallExpression aggFuncExpr =
+                        BuiltinFunctions.makeAggregateFunctionExpression(aggFuncIdent, aggArgs);
+                aggFuncExpr.setSourceLocation(assignScalarAggExpr.getSourceLocation());
+
+                LogicalVariable newVar = context.newVar();
+                aggAddVars.add(newVar);
+                aggAddExprs.add(new MutableObject<>(aggFuncExpr));
+                // The assign now just "renames" the variable to make sure the upstream plan still works.
+                VariableReferenceExpression newVarRef = new VariableReferenceExpression(newVar);
+                newVarRef.setSourceLocation(assignScalarAggExpr.getSourceLocation());
+                assignScalarAggExprRef.setValue(newVarRef);
+            }
+        }
+
+        if (aggAddVars == null) {
             return false;
         }
 
-        List<Mutable<ILogicalExpression>> srcAssignExprRefs = new LinkedList<Mutable<ILogicalExpression>>();
-        findAggFuncExprRef(assignOp.getExpressions(), aggVar, srcAssignExprRefs);
-        if (srcAssignExprRefs.isEmpty()) {
-            return false;
-        }
+        // add new variables and expressions to the aggregate operator.
+        aggOp.getVariables().addAll(aggAddVars);
+        aggOp.getExpressions().addAll(aggAddExprs);
 
-        AbstractFunctionCallExpression aggOpExpr =
-                (AbstractFunctionCallExpression) aggOp.getExpressions().get(0).getValue();
-        aggOp.getExpressions().clear();
-        aggOp.getVariables().clear();
-
-        for (Mutable<ILogicalExpression> srcAssignExprRef : srcAssignExprRefs) {
-            AbstractFunctionCallExpression assignFuncExpr =
-                    (AbstractFunctionCallExpression) srcAssignExprRef.getValue();
-            FunctionIdentifier aggFuncIdent =
-                    BuiltinFunctions.getAggregateFunction(assignFuncExpr.getFunctionIdentifier());
-
-            // Push the agg func into the agg op.
-
-            List<Mutable<ILogicalExpression>> aggArgs = new ArrayList<Mutable<ILogicalExpression>>();
-            aggArgs.add(aggOpExpr.getArguments().get(0));
-            int sz = assignFuncExpr.getArguments().size();
-            aggArgs.addAll(assignFuncExpr.getArguments().subList(1, sz));
-            AggregateFunctionCallExpression aggFuncExpr =
-                    BuiltinFunctions.makeAggregateFunctionExpression(aggFuncIdent, aggArgs);
-
-            aggFuncExpr.setSourceLocation(assignFuncExpr.getSourceLocation());
-            LogicalVariable newVar = context.newVar();
-            aggOp.getVariables().add(newVar);
-            aggOp.getExpressions().add(new MutableObject<ILogicalExpression>(aggFuncExpr));
-
-            // The assign now just "renames" the variable to make sure the upstream plan still works.
-            VariableReferenceExpression newVarRef = new VariableReferenceExpression(newVar);
-            newVarRef.setSourceLocation(assignFuncExpr.getSourceLocation());
-            srcAssignExprRef.setValue(newVarRef);
-        }
+        // Note: we retain the original listify() call in the aggregate operator because
+        // the variable it is assigned to might be used upstream by other operators.
+        // If the variable is not used upstream then it'll later be removed
+        // by {@code RemoveUnusedAssignAndAggregateRule}
 
         context.computeAndSetTypeEnvironmentForOperator(aggOp);
         context.computeAndSetTypeEnvironmentForOperator(assignOp);
         return true;
     }
 
-    private void findAggFuncExprRef(List<Mutable<ILogicalExpression>> exprRefs, LogicalVariable aggVar,
-            List<Mutable<ILogicalExpression>> srcAssignExprRefs) {
+    private void findScalarAggFuncExprRef(List<Mutable<ILogicalExpression>> exprRefs, LogicalVariable aggVar,
+            List<Mutable<ILogicalExpression>> outScalarAggExprRefs) {
         for (Mutable<ILogicalExpression> exprRef : exprRefs) {
             ILogicalExpression expr = exprRef.getValue();
             if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
@@ -245,10 +256,10 @@
                 FunctionIdentifier funcIdent = BuiltinFunctions.getAggregateFunction(funcExpr.getFunctionIdentifier());
                 if (funcIdent != null
                         && aggVar.equals(SqlppVariableUtil.getVariable(funcExpr.getArguments().get(0).getValue()))) {
-                    srcAssignExprRefs.add(exprRef);
+                    outScalarAggExprRefs.add(exprRef);
                 } else {
                     // Recursively look in func args.
-                    findAggFuncExprRef(funcExpr.getArguments(), aggVar, srcAssignExprRefs);
+                    findScalarAggFuncExprRef(funcExpr.getArguments(), aggVar, outScalarAggExprRefs);
                 }
             }
         }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java
index f8457cc..d5f7b0a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushAggregateIntoNestedSubplanRule.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.optimizer.rules;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -208,55 +207,38 @@
     private void collectAggregateVars(Map<LogicalVariable, Integer> nspListifyVarsCount,
             Map<LogicalVariable, AbstractOperatorWithNestedPlans> nspWithAgg,
             Map<LogicalVariable, Integer> nspAggVarToPlanIndex, AbstractOperatorWithNestedPlans op) {
-        List<LogicalVariable> vars = collectOneVarPerAggFromOpWithNestedPlans(op);
-        for (int i = 0; i < vars.size(); i++) {
-            LogicalVariable v = vars.get(i);
-            if (v != null) {
-                nspListifyVarsCount.put(v, 0);
-                nspAggVarToPlanIndex.put(v, i);
-                nspWithAgg.put(v, op);
-            }
-        }
-    }
-
-    private List<LogicalVariable> collectOneVarPerAggFromOpWithNestedPlans(AbstractOperatorWithNestedPlans op) {
         List<ILogicalPlan> nPlans = op.getNestedPlans();
-        if (nPlans == null || nPlans.isEmpty()) {
-            return Collections.emptyList();
+        for (int planIdx = 0, planCount = nPlans.size(); planIdx < planCount; planIdx++) {
+            ILogicalPlan nestedPlan = nPlans.get(planIdx);
+            List<Mutable<ILogicalOperator>> roots = nestedPlan.getRoots();
+            if (roots.size() != 1) {
+                continue;
+            }
+            AbstractLogicalOperator rootOp = (AbstractLogicalOperator) roots.get(0).getValue();
+            if (rootOp.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+                continue;
+            }
+            AggregateOperator agg = (AggregateOperator) rootOp;
+            // TODO: for now we only consider aggregate operators with every expression being listify()
+            // in the future we should check whether this can be relaxed (i.e. if some expressions are listify())
+            boolean everyExprIsListify = agg.getExpressions().stream().map(Mutable::getValue)
+                    .allMatch(expr -> expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL
+                            && ((AbstractFunctionCallExpression) expr).getFunctionIdentifier()
+                                    .equals(BuiltinFunctions.LISTIFY));
+            if (everyExprIsListify) {
+                for (LogicalVariable v : agg.getVariables()) {
+                    nspListifyVarsCount.put(v, 0);
+                    nspAggVarToPlanIndex.put(v, planIdx);
+                    nspWithAgg.put(v, op);
+                }
+            }
         }
-
-        List<LogicalVariable> aggVars = new ArrayList<>();
-        // test that the operator computes a "listify" aggregate
-        for (int i = 0; i < nPlans.size(); i++) {
-            AbstractLogicalOperator topOp = (AbstractLogicalOperator) nPlans.get(i).getRoots().get(0).getValue();
-            if (topOp.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
-                continue;
-            }
-            AggregateOperator agg = (AggregateOperator) topOp;
-            if (agg.getVariables().size() != 1) {
-                continue;
-            }
-            ILogicalExpression expr = agg.getExpressions().get(0).getValue();
-            if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
-                continue;
-            }
-            AbstractFunctionCallExpression fceAgg = (AbstractFunctionCallExpression) expr;
-            if (fceAgg.getFunctionIdentifier() != BuiltinFunctions.LISTIFY) {
-                continue;
-            }
-            aggVars.add(agg.getVariables().get(0));
-        }
-        return aggVars;
     }
 
     /**
-     * @param exprRef
-     * @param nspWithAgg
-     * @param context
      * @return a pair whose first member is a boolean which is true iff
      *         something was changed in the expression tree rooted at expr. The
      *         second member is the result of transforming expr.
-     * @throws AlgebricksException
      */
     private Pair<Boolean, ILogicalExpression> extractAggFunctionsFromExpression(Mutable<ILogicalExpression> exprRef,
             Map<LogicalVariable, AbstractOperatorWithNestedPlans> nspWithAgg,
@@ -296,7 +278,7 @@
                 for (Mutable<ILogicalExpression> a : fce.getArguments()) {
                     Pair<Boolean, ILogicalExpression> aggArg =
                             extractAggFunctionsFromExpression(a, nspWithAgg, aggregateExprToVarExpr, context);
-                    if (aggArg.first.booleanValue()) {
+                    if (aggArg.first) {
                         a.setValue(aggArg.second);
                         change = true;
                     }
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/aggregate-subclause/agg_filter_01/agg_filter_01.10.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/aggregate-subclause/agg_filter_01/agg_filter_01.10.sqlpp
new file mode 100644
index 0000000..dca81bd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/aggregate-subclause/agg_filter_01/agg_filter_01.10.sqlpp
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test various combinations of grouping sets
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type tenkType as closed {
+  unique1         : integer,
+  unique2         : integer,
+  two             : integer,
+  four            : integer,
+  ten             : integer,
+  twenty          : integer,
+  hundred         : integer,
+  thousand        : integer,
+  twothous        : integer,
+  fivethous       : integer,
+  tenthous        : integer,
+  odd100          : integer,
+  even100         : integer,
+  stringu1        : string,
+  stringu2        : string,
+  string4         : string
+};
+
+create dataset tenk(tenkType) primary key unique2;
+
+select
+  ten,
+  count(*) filter(where four > 0) as cnt,
+  min(two) filter(where four > 0) as min2,
+  max(two) filter(where four > 0) as max2,
+  sum(twenty) filter(where four > 0) as sum20
+from tenk
+group by ten
+order by ten;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/aggregate-subclause/agg_filter_01/agg_filter_01.9.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/aggregate-subclause/agg_filter_01/agg_filter_01.9.sqlpp
new file mode 100644
index 0000000..686d83f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/aggregate-subclause/agg_filter_01/agg_filter_01.9.sqlpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test various combinations of grouping sets
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type tenkType as closed {
+  unique1         : integer,
+  unique2         : integer,
+  two             : integer,
+  four            : integer,
+  ten             : integer,
+  twenty          : integer,
+  hundred         : integer,
+  thousand        : integer,
+  twothous        : integer,
+  fivethous       : integer,
+  tenthous        : integer,
+  odd100          : integer,
+  even100         : integer,
+  stringu1        : string,
+  stringu2        : string,
+  string4         : string
+};
+
+create dataset tenk(tenkType) primary key unique2;
+
+select
+  count(*) filter(where four > 0) as cnt,
+  min(two) filter(where four > 0) as min2,
+  max(two) filter(where four > 0) as max2,
+  sum(twenty) filter(where four > 0) as sum20
+from tenk;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.10.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.10.plan
new file mode 100644
index 0000000..63d3f7c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.10.plan
@@ -0,0 +1,27 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$ten(ASC) ]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$95]  |PARTITIONED|
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+            -- HASH_PARTITION_EXCHANGE [$$95]  |PARTITIONED|
+              -- PRE_CLUSTERED_GROUP_BY[$$82]  |PARTITIONED|
+                      {
+                        -- AGGREGATE  |LOCAL|
+                          -- STREAM_SELECT  |LOCAL|
+                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                      }
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STABLE_SORT [$$82(ASC)]  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- DATASOURCE_SCAN (test.tenk)  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.3.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.3.plan
index bc5d5aa..574df58 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.3.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.3.plan
@@ -2,29 +2,11 @@
   -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
     -- STREAM_PROJECT  |UNPARTITIONED|
       -- ASSIGN  |UNPARTITIONED|
-        -- STREAM_PROJECT  |UNPARTITIONED|
-          -- SUBPLAN  |UNPARTITIONED|
-                  {
-                    -- AGGREGATE  |LOCAL|
-                      -- AGGREGATE  |LOCAL|
-                        -- STREAM_SELECT  |UNPARTITIONED|
-                          -- ASSIGN  |UNPARTITIONED|
-                            -- ASSIGN  |UNPARTITIONED|
-                              -- UNNEST  |UNPARTITIONED|
-                                -- NESTED_TUPLE_SOURCE  |UNPARTITIONED|
-                  }
-            -- SUBPLAN  |UNPARTITIONED|
-                    {
-                      -- AGGREGATE  |LOCAL|
-                        -- AGGREGATE  |LOCAL|
-                          -- STREAM_SELECT  |UNPARTITIONED|
-                            -- ASSIGN  |UNPARTITIONED|
-                              -- ASSIGN  |UNPARTITIONED|
-                                -- UNNEST  |UNPARTITIONED|
-                                  -- NESTED_TUPLE_SOURCE  |UNPARTITIONED|
-                    }
-              -- AGGREGATE  |UNPARTITIONED|
-                -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        -- AGGREGATE  |UNPARTITIONED|
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            -- AGGREGATE  |PARTITIONED|
+              -- ASSIGN  |PARTITIONED|
+                -- STREAM_SELECT  |PARTITIONED|
                   -- STREAM_PROJECT  |PARTITIONED|
                     -- ASSIGN  |PARTITIONED|
                       -- STREAM_PROJECT  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.4.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.4.plan
index 9071921..c94ef11 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.4.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.4.plan
@@ -3,36 +3,25 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$two(ASC) ]  |PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$69]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$67]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-                  {
-                    -- AGGREGATE  |LOCAL|
-                      -- NESTED_TUPLE_SOURCE  |LOCAL|
-                  }
-            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              -- STABLE_SORT [$$69(ASC)]  |PARTITIONED|
-                -- HASH_PARTITION_EXCHANGE [$$69]  |PARTITIONED|
-                  -- PRE_CLUSTERED_GROUP_BY[$$58]  |PARTITIONED|
-                          {
-                            -- AGGREGATE  |LOCAL|
-                              -- STREAM_SELECT  |LOCAL|
-                                -- NESTED_TUPLE_SOURCE  |LOCAL|
-                          }
-                          {
-                            -- AGGREGATE  |LOCAL|
-                              -- STREAM_SELECT  |LOCAL|
-                                -- NESTED_TUPLE_SOURCE  |LOCAL|
-                          }
+            -- HASH_PARTITION_EXCHANGE [$$67]  |PARTITIONED|
+              -- PRE_CLUSTERED_GROUP_BY[$$58]  |PARTITIONED|
+                      {
+                        -- AGGREGATE  |LOCAL|
+                          -- STREAM_SELECT  |LOCAL|
+                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                      }
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STABLE_SORT [$$58(ASC)]  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- STABLE_SORT [$$58(ASC)]  |PARTITIONED|
-                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
-                            -- ASSIGN  |PARTITIONED|
-                              -- STREAM_PROJECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- DATASOURCE_SCAN (test.tenk)  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- DATASOURCE_SCAN (test.tenk)  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.5.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.5.plan
index 91e2624..9462514 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.5.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.5.plan
@@ -1,90 +1,68 @@
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
-      -- SORT_MERGE_EXCHANGE [$$197(ASC) ]  |PARTITIONED|
-        -- STABLE_SORT [$$197(ASC)]  |PARTITIONED|
+      -- SORT_MERGE_EXCHANGE [$$189(ASC) ]  |PARTITIONED|
+        -- STABLE_SORT [$$189(ASC)]  |PARTITIONED|
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
             -- UNION_ALL  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
                   -- ASSIGN  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- PRE_CLUSTERED_GROUP_BY[$$244]  |PARTITIONED|
+                      -- SORT_GROUP_BY[$$236]  |PARTITIONED|
                               {
                                 -- AGGREGATE  |LOCAL|
                                   -- NESTED_TUPLE_SOURCE  |LOCAL|
                               }
-                              {
-                                -- AGGREGATE  |LOCAL|
-                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
-                              }
-                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- STABLE_SORT [$$244(ASC)]  |PARTITIONED|
-                            -- HASH_PARTITION_EXCHANGE [$$244]  |PARTITIONED|
-                              -- PRE_CLUSTERED_GROUP_BY[$$115]  |PARTITIONED|
-                                      {
-                                        -- AGGREGATE  |LOCAL|
-                                          -- STREAM_SELECT  |LOCAL|
-                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                      }
-                                      {
-                                        -- AGGREGATE  |LOCAL|
-                                          -- STREAM_SELECT  |LOCAL|
-                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                      }
+                        -- HASH_PARTITION_EXCHANGE [$$236]  |PARTITIONED|
+                          -- PRE_CLUSTERED_GROUP_BY[$$115]  |PARTITIONED|
+                                  {
+                                    -- AGGREGATE  |LOCAL|
+                                      -- STREAM_SELECT  |LOCAL|
+                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                  }
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STABLE_SORT [$$115(ASC)]  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- STABLE_SORT [$$115(ASC)]  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ASSIGN  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- REPLICATE  |PARTITIONED|
                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- REPLICATE  |PARTITIONED|
+                                                -- STREAM_PROJECT  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- DATASOURCE_SCAN (test.tenk)  |PARTITIONED|
                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- DATASOURCE_SCAN (test.tenk)  |PARTITIONED|
-                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
                   -- ASSIGN  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- PRE_CLUSTERED_GROUP_BY[$$247]  |PARTITIONED|
+                        -- SORT_GROUP_BY[$$239]  |PARTITIONED|
                                 {
                                   -- AGGREGATE  |LOCAL|
                                     -- NESTED_TUPLE_SOURCE  |LOCAL|
                                 }
-                                {
-                                  -- AGGREGATE  |LOCAL|
-                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                }
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- STABLE_SORT [$$247(ASC)]  |PARTITIONED|
-                              -- HASH_PARTITION_EXCHANGE [$$247]  |PARTITIONED|
-                                -- PRE_CLUSTERED_GROUP_BY[$$116]  |PARTITIONED|
-                                        {
-                                          -- AGGREGATE  |LOCAL|
-                                            -- STREAM_SELECT  |LOCAL|
-                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                        }
-                                        {
-                                          -- AGGREGATE  |LOCAL|
-                                            -- STREAM_SELECT  |LOCAL|
-                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                        }
+                          -- HASH_PARTITION_EXCHANGE [$$239]  |PARTITIONED|
+                            -- PRE_CLUSTERED_GROUP_BY[$$116]  |PARTITIONED|
+                                    {
+                                      -- AGGREGATE  |LOCAL|
+                                        -- STREAM_SELECT  |LOCAL|
+                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                    }
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STABLE_SORT [$$116(ASC)]  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- STABLE_SORT [$$116(ASC)]  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- STREAM_PROJECT  |PARTITIONED|
-                                          -- ASSIGN  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- REPLICATE  |PARTITIONED|
                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- REPLICATE  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                  -- DATASOURCE_SCAN (test.tenk)  |PARTITIONED|
                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- DATASOURCE_SCAN (test.tenk)  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.9.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.9.plan
new file mode 100644
index 0000000..02d8027
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/aggregate-subclause/agg_filter_01/agg_filter_01.9.plan
@@ -0,0 +1,16 @@
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      -- ASSIGN  |UNPARTITIONED|
+        -- AGGREGATE  |UNPARTITIONED|
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            -- AGGREGATE  |PARTITIONED|
+              -- ASSIGN  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- STREAM_SELECT  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- DATASOURCE_SCAN (test.tenk)  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/q01_pricing_summary_report_nt_ps.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/q01_pricing_summary_report_nt_ps.plan
index 0df14c7..0128265 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/q01_pricing_summary_report_nt_ps.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/q01_pricing_summary_report_nt_ps.plan
@@ -22,12 +22,12 @@
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- REPLICATE  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- EXTERNAL_GROUP_BY[$$212, $$213]  |PARTITIONED|
+                      -- EXTERNAL_GROUP_BY[$$206, $$207]  |PARTITIONED|
                               {
                                 -- AGGREGATE  |LOCAL|
                                   -- NESTED_TUPLE_SOURCE  |LOCAL|
                               }
-                        -- HASH_PARTITION_EXCHANGE [$$212, $$213]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$206, $$207]  |PARTITIONED|
                           -- EXTERNAL_GROUP_BY[$$181, $$182]  |PARTITIONED|
                                   {
                                     -- AGGREGATE  |LOCAL|
@@ -51,12 +51,12 @@
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                             -- REPLICATE  |PARTITIONED|
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- EXTERNAL_GROUP_BY[$$212, $$213]  |PARTITIONED|
+                                -- EXTERNAL_GROUP_BY[$$206, $$207]  |PARTITIONED|
                                         {
                                           -- AGGREGATE  |LOCAL|
                                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                                         }
-                                  -- HASH_PARTITION_EXCHANGE [$$212, $$213]  |PARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$206, $$207]  |PARTITIONED|
                                     -- EXTERNAL_GROUP_BY[$$181, $$182]  |PARTITIONED|
                                             {
                                               -- AGGREGATE  |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1806.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1806.plan
index 5d874f3..1feedcd 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1806.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1806.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]  |PARTITIONED|
-          -- SORT_GROUP_BY[$$165, $$166]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$159, $$160]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_EXCHANGE [$$165, $$166]  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$159, $$160]  |PARTITIONED|
               -- SORT_GROUP_BY[$$133, $$134]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1806_ps.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1806_ps.plan
index d918b1b..0d669e0 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1806_ps.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1806_ps.plan
@@ -9,12 +9,12 @@
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- REPLICATE  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- SORT_GROUP_BY[$$165, $$166]  |PARTITIONED|
+                      -- SORT_GROUP_BY[$$159, $$160]  |PARTITIONED|
                               {
                                 -- AGGREGATE  |LOCAL|
                                   -- NESTED_TUPLE_SOURCE  |LOCAL|
                               }
-                        -- HASH_PARTITION_EXCHANGE [$$165, $$166]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$159, $$160]  |PARTITIONED|
                           -- SORT_GROUP_BY[$$133, $$134]  |PARTITIONED|
                                   {
                                     -- AGGREGATE  |LOCAL|
@@ -37,12 +37,12 @@
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                             -- REPLICATE  |PARTITIONED|
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- SORT_GROUP_BY[$$165, $$166]  |PARTITIONED|
+                                -- SORT_GROUP_BY[$$159, $$160]  |PARTITIONED|
                                         {
                                           -- AGGREGATE  |LOCAL|
                                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                                         }
-                                  -- HASH_PARTITION_EXCHANGE [$$165, $$166]  |PARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$159, $$160]  |PARTITIONED|
                                     -- SORT_GROUP_BY[$$133, $$134]  |PARTITIONED|
                                             {
                                               -- AGGREGATE  |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists.plan
index 8985321..cb471a9 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$cntrycode(ASC) ]  |PARTITIONED|
-          -- SORT_GROUP_BY[$$188]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$187]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_EXCHANGE [$$188]  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$187]  |PARTITIONED|
               -- SORT_GROUP_BY[$$162]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
@@ -21,20 +21,20 @@
                         -- STREAM_SELECT  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- SORT_GROUP_BY[$$185]  |PARTITIONED|
+                              -- SORT_GROUP_BY[$$184]  |PARTITIONED|
                                       {
                                         -- AGGREGATE  |LOCAL|
                                           -- NESTED_TUPLE_SOURCE  |LOCAL|
                                       }
-                                -- HASH_PARTITION_EXCHANGE [$$185]  |PARTITIONED|
-                                  -- PRE_CLUSTERED_GROUP_BY[$$179]  |PARTITIONED|
+                                -- HASH_PARTITION_EXCHANGE [$$184]  |PARTITIONED|
+                                  -- PRE_CLUSTERED_GROUP_BY[$$178]  |PARTITIONED|
                                           {
                                             -- AGGREGATE  |LOCAL|
                                               -- STREAM_SELECT  |LOCAL|
                                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                                           }
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- STABLE_SORT [$$179(ASC)]  |PARTITIONED|
+                                      -- STABLE_SORT [$$178(ASC)]  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists_ps.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists_ps.plan
index e98c53a..52d6e25 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists_ps.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists_ps.plan
@@ -9,12 +9,12 @@
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- REPLICATE  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- SORT_GROUP_BY[$$188]  |PARTITIONED|
+                      -- SORT_GROUP_BY[$$187]  |PARTITIONED|
                               {
                                 -- AGGREGATE  |LOCAL|
                                   -- NESTED_TUPLE_SOURCE  |LOCAL|
                               }
-                        -- HASH_PARTITION_EXCHANGE [$$188]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$187]  |PARTITIONED|
                           -- SORT_GROUP_BY[$$162]  |PARTITIONED|
                                   {
                                     -- AGGREGATE  |LOCAL|
@@ -27,20 +27,20 @@
                                     -- STREAM_SELECT  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- SORT_GROUP_BY[$$185]  |PARTITIONED|
+                                          -- SORT_GROUP_BY[$$184]  |PARTITIONED|
                                                   {
                                                     -- AGGREGATE  |LOCAL|
                                                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                   }
-                                            -- HASH_PARTITION_EXCHANGE [$$185]  |PARTITIONED|
-                                              -- PRE_CLUSTERED_GROUP_BY[$$179]  |PARTITIONED|
+                                            -- HASH_PARTITION_EXCHANGE [$$184]  |PARTITIONED|
+                                              -- PRE_CLUSTERED_GROUP_BY[$$178]  |PARTITIONED|
                                                       {
                                                         -- AGGREGATE  |LOCAL|
                                                           -- STREAM_SELECT  |LOCAL|
                                                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                       }
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- STABLE_SORT [$$179(ASC)]  |PARTITIONED|
+                                                  -- STABLE_SORT [$$178(ASC)]  |PARTITIONED|
                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -86,12 +86,12 @@
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                             -- REPLICATE  |PARTITIONED|
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- SORT_GROUP_BY[$$188]  |PARTITIONED|
+                                -- SORT_GROUP_BY[$$187]  |PARTITIONED|
                                         {
                                           -- AGGREGATE  |LOCAL|
                                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                                         }
-                                  -- HASH_PARTITION_EXCHANGE [$$188]  |PARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$187]  |PARTITIONED|
                                     -- SORT_GROUP_BY[$$162]  |PARTITIONED|
                                             {
                                               -- AGGREGATE  |LOCAL|
@@ -104,20 +104,20 @@
                                               -- STREAM_SELECT  |PARTITIONED|
                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- SORT_GROUP_BY[$$185]  |PARTITIONED|
+                                                    -- SORT_GROUP_BY[$$184]  |PARTITIONED|
                                                             {
                                                               -- AGGREGATE  |LOCAL|
                                                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                             }
-                                                      -- HASH_PARTITION_EXCHANGE [$$185]  |PARTITIONED|
-                                                        -- PRE_CLUSTERED_GROUP_BY[$$179]  |PARTITIONED|
+                                                      -- HASH_PARTITION_EXCHANGE [$$184]  |PARTITIONED|
+                                                        -- PRE_CLUSTERED_GROUP_BY[$$178]  |PARTITIONED|
                                                                 {
                                                                   -- AGGREGATE  |LOCAL|
                                                                     -- STREAM_SELECT  |LOCAL|
                                                                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                                 }
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- STABLE_SORT [$$179(ASC)]  |PARTITIONED|
+                                                            -- STABLE_SORT [$$178(ASC)]  |PARTITIONED|
                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists.plan
index 0120576..d631085 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$cntrycode(ASC) ]  |PARTITIONED|
-          -- SORT_GROUP_BY[$$189]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$188]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_EXCHANGE [$$189]  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$188]  |PARTITIONED|
               -- SORT_GROUP_BY[$$163]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
@@ -21,20 +21,20 @@
                         -- STREAM_SELECT  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- SORT_GROUP_BY[$$186]  |PARTITIONED|
+                              -- SORT_GROUP_BY[$$185]  |PARTITIONED|
                                       {
                                         -- AGGREGATE  |LOCAL|
                                           -- NESTED_TUPLE_SOURCE  |LOCAL|
                                       }
-                                -- HASH_PARTITION_EXCHANGE [$$186]  |PARTITIONED|
-                                  -- PRE_CLUSTERED_GROUP_BY[$$180]  |PARTITIONED|
+                                -- HASH_PARTITION_EXCHANGE [$$185]  |PARTITIONED|
+                                  -- PRE_CLUSTERED_GROUP_BY[$$179]  |PARTITIONED|
                                           {
                                             -- AGGREGATE  |LOCAL|
                                               -- STREAM_SELECT  |LOCAL|
                                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                                           }
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- STABLE_SORT [$$180(ASC)]  |PARTITIONED|
+                                      -- STABLE_SORT [$$179(ASC)]  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists_ps.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists_ps.plan
index d1f0a82..5ab7d30 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists_ps.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists_ps.plan
@@ -9,12 +9,12 @@
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- REPLICATE  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- SORT_GROUP_BY[$$189]  |PARTITIONED|
+                      -- SORT_GROUP_BY[$$188]  |PARTITIONED|
                               {
                                 -- AGGREGATE  |LOCAL|
                                   -- NESTED_TUPLE_SOURCE  |LOCAL|
                               }
-                        -- HASH_PARTITION_EXCHANGE [$$189]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$188]  |PARTITIONED|
                           -- SORT_GROUP_BY[$$163]  |PARTITIONED|
                                   {
                                     -- AGGREGATE  |LOCAL|
@@ -27,20 +27,20 @@
                                     -- STREAM_SELECT  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- SORT_GROUP_BY[$$186]  |PARTITIONED|
+                                          -- SORT_GROUP_BY[$$185]  |PARTITIONED|
                                                   {
                                                     -- AGGREGATE  |LOCAL|
                                                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                   }
-                                            -- HASH_PARTITION_EXCHANGE [$$186]  |PARTITIONED|
-                                              -- PRE_CLUSTERED_GROUP_BY[$$180]  |PARTITIONED|
+                                            -- HASH_PARTITION_EXCHANGE [$$185]  |PARTITIONED|
+                                              -- PRE_CLUSTERED_GROUP_BY[$$179]  |PARTITIONED|
                                                       {
                                                         -- AGGREGATE  |LOCAL|
                                                           -- STREAM_SELECT  |LOCAL|
                                                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                       }
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- STABLE_SORT [$$180(ASC)]  |PARTITIONED|
+                                                  -- STABLE_SORT [$$179(ASC)]  |PARTITIONED|
                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -86,12 +86,12 @@
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                             -- REPLICATE  |PARTITIONED|
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- SORT_GROUP_BY[$$189]  |PARTITIONED|
+                                -- SORT_GROUP_BY[$$188]  |PARTITIONED|
                                         {
                                           -- AGGREGATE  |LOCAL|
                                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                                         }
-                                  -- HASH_PARTITION_EXCHANGE [$$189]  |PARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$188]  |PARTITIONED|
                                     -- SORT_GROUP_BY[$$163]  |PARTITIONED|
                                             {
                                               -- AGGREGATE  |LOCAL|
@@ -104,20 +104,20 @@
                                               -- STREAM_SELECT  |PARTITIONED|
                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- SORT_GROUP_BY[$$186]  |PARTITIONED|
+                                                    -- SORT_GROUP_BY[$$185]  |PARTITIONED|
                                                             {
                                                               -- AGGREGATE  |LOCAL|
                                                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                             }
-                                                      -- HASH_PARTITION_EXCHANGE [$$186]  |PARTITIONED|
-                                                        -- PRE_CLUSTERED_GROUP_BY[$$180]  |PARTITIONED|
+                                                      -- HASH_PARTITION_EXCHANGE [$$185]  |PARTITIONED|
+                                                        -- PRE_CLUSTERED_GROUP_BY[$$179]  |PARTITIONED|
                                                                 {
                                                                   -- AGGREGATE  |LOCAL|
                                                                     -- STREAM_SELECT  |LOCAL|
                                                                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                                 }
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- STABLE_SORT [$$180(ASC)]  |PARTITIONED|
+                                                            -- STABLE_SORT [$$179(ASC)]  |PARTITIONED|
                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping.plan
index 9d91445..0be7ab2 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$l_shipmode(ASC) ]  |PARTITIONED|
-          -- SORT_GROUP_BY[$$132]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$131]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_EXCHANGE [$$132]  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$131]  |PARTITIONED|
               -- SORT_GROUP_BY[$$114]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
@@ -17,12 +17,12 @@
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- STREAM_PROJECT  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- HYBRID_HASH_JOIN [$$114][$$121]  |PARTITIONED|
+                      -- HYBRID_HASH_JOIN [$$114][$$120]  |PARTITIONED|
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- HYBRID_HASH_JOIN [$$123][$$118]  |PARTITIONED|
-                                -- HASH_PARTITION_EXCHANGE [$$123]  |PARTITIONED|
+                              -- HYBRID_HASH_JOIN [$$122][$$118]  |PARTITIONED|
+                                -- HASH_PARTITION_EXCHANGE [$$122]  |PARTITIONED|
                                   -- STREAM_PROJECT  |PARTITIONED|
                                     -- STREAM_SELECT  |PARTITIONED|
                                       -- ASSIGN  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast.plan
index 1fe6eb3..9908238 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$l_shipmode(ASC) ]  |PARTITIONED|
-          -- SORT_GROUP_BY[$$132]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$131]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_EXCHANGE [$$132]  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$131]  |PARTITIONED|
               -- SORT_GROUP_BY[$$114]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
@@ -17,11 +17,11 @@
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- STREAM_PROJECT  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- HYBRID_HASH_JOIN [$$114][$$121]  |PARTITIONED|
+                      -- HYBRID_HASH_JOIN [$$114][$$120]  |PARTITIONED|
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- HYBRID_HASH_JOIN [$$122][$$118]  |PARTITIONED|
+                              -- HYBRID_HASH_JOIN [$$121][$$118]  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                   -- STREAM_PROJECT  |PARTITIONED|
                                     -- STREAM_SELECT  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast_ps.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast_ps.plan
index d86b6b0..baeda7a 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast_ps.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_broadcast_ps.plan
@@ -9,12 +9,12 @@
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- REPLICATE  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- SORT_GROUP_BY[$$132]  |PARTITIONED|
+                      -- SORT_GROUP_BY[$$131]  |PARTITIONED|
                               {
                                 -- AGGREGATE  |LOCAL|
                                   -- NESTED_TUPLE_SOURCE  |LOCAL|
                               }
-                        -- HASH_PARTITION_EXCHANGE [$$132]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$131]  |PARTITIONED|
                           -- SORT_GROUP_BY[$$114]  |PARTITIONED|
                                   {
                                     -- AGGREGATE  |LOCAL|
@@ -23,11 +23,11 @@
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                               -- STREAM_PROJECT  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- HYBRID_HASH_JOIN [$$114][$$121]  |PARTITIONED|
+                                  -- HYBRID_HASH_JOIN [$$114][$$120]  |PARTITIONED|
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- HYBRID_HASH_JOIN [$$122][$$118]  |PARTITIONED|
+                                          -- HYBRID_HASH_JOIN [$$121][$$118]  |PARTITIONED|
                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                               -- STREAM_PROJECT  |PARTITIONED|
                                                 -- STREAM_SELECT  |PARTITIONED|
@@ -55,12 +55,12 @@
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                             -- REPLICATE  |PARTITIONED|
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- SORT_GROUP_BY[$$132]  |PARTITIONED|
+                                -- SORT_GROUP_BY[$$131]  |PARTITIONED|
                                         {
                                           -- AGGREGATE  |LOCAL|
                                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                                         }
-                                  -- HASH_PARTITION_EXCHANGE [$$132]  |PARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$131]  |PARTITIONED|
                                     -- SORT_GROUP_BY[$$114]  |PARTITIONED|
                                             {
                                               -- AGGREGATE  |LOCAL|
@@ -69,11 +69,11 @@
                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                         -- STREAM_PROJECT  |PARTITIONED|
                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- HYBRID_HASH_JOIN [$$114][$$121]  |PARTITIONED|
+                                            -- HYBRID_HASH_JOIN [$$114][$$120]  |PARTITIONED|
                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- HYBRID_HASH_JOIN [$$122][$$118]  |PARTITIONED|
+                                                    -- HYBRID_HASH_JOIN [$$121][$$118]  |PARTITIONED|
                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                         -- STREAM_PROJECT  |PARTITIONED|
                                                           -- STREAM_SELECT  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_ps.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_ps.plan
index 9615a18..bbd6cf0 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_ps.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpch/q12_shipping_ps.plan
@@ -9,12 +9,12 @@
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- REPLICATE  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- SORT_GROUP_BY[$$132]  |PARTITIONED|
+                      -- SORT_GROUP_BY[$$131]  |PARTITIONED|
                               {
                                 -- AGGREGATE  |LOCAL|
                                   -- NESTED_TUPLE_SOURCE  |LOCAL|
                               }
-                        -- HASH_PARTITION_EXCHANGE [$$132]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$131]  |PARTITIONED|
                           -- SORT_GROUP_BY[$$114]  |PARTITIONED|
                                   {
                                     -- AGGREGATE  |LOCAL|
@@ -23,12 +23,12 @@
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                               -- STREAM_PROJECT  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- HYBRID_HASH_JOIN [$$114][$$121]  |PARTITIONED|
+                                  -- HYBRID_HASH_JOIN [$$114][$$120]  |PARTITIONED|
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- HYBRID_HASH_JOIN [$$123][$$118]  |PARTITIONED|
-                                            -- HASH_PARTITION_EXCHANGE [$$123]  |PARTITIONED|
+                                          -- HYBRID_HASH_JOIN [$$122][$$118]  |PARTITIONED|
+                                            -- HASH_PARTITION_EXCHANGE [$$122]  |PARTITIONED|
                                               -- STREAM_PROJECT  |PARTITIONED|
                                                 -- STREAM_SELECT  |PARTITIONED|
                                                   -- ASSIGN  |PARTITIONED|
@@ -55,12 +55,12 @@
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                             -- REPLICATE  |PARTITIONED|
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- SORT_GROUP_BY[$$132]  |PARTITIONED|
+                                -- SORT_GROUP_BY[$$131]  |PARTITIONED|
                                         {
                                           -- AGGREGATE  |LOCAL|
                                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                                         }
-                                  -- HASH_PARTITION_EXCHANGE [$$132]  |PARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$131]  |PARTITIONED|
                                     -- SORT_GROUP_BY[$$114]  |PARTITIONED|
                                             {
                                               -- AGGREGATE  |LOCAL|
@@ -69,12 +69,12 @@
                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                         -- STREAM_PROJECT  |PARTITIONED|
                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- HYBRID_HASH_JOIN [$$114][$$121]  |PARTITIONED|
+                                            -- HYBRID_HASH_JOIN [$$114][$$120]  |PARTITIONED|
                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- HYBRID_HASH_JOIN [$$123][$$118]  |PARTITIONED|
-                                                      -- HASH_PARTITION_EXCHANGE [$$123]  |PARTITIONED|
+                                                    -- HYBRID_HASH_JOIN [$$122][$$118]  |PARTITIONED|
+                                                      -- HASH_PARTITION_EXCHANGE [$$122]  |PARTITIONED|
                                                         -- STREAM_PROJECT  |PARTITIONED|
                                                           -- STREAM_SELECT  |PARTITIONED|
                                                             -- ASSIGN  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-subclause/agg_filter_01/agg_filter_01.10.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-subclause/agg_filter_01/agg_filter_01.10.query.sqlpp
new file mode 100644
index 0000000..0fd64d2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-subclause/agg_filter_01/agg_filter_01.10.query.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test several various aggregates with filter with GROUP BY clause
+ */
+
+use test;
+
+select
+  ten,
+  count(*) filter(where four > 0) as cnt,
+  min(two) filter(where four > 0) as min2,
+  max(two) filter(where four > 0) as max2,
+  sum(twenty) filter(where four > 0) as sum20
+from tenk
+group by ten
+order by ten;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-subclause/agg_filter_01/agg_filter_01.9.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-subclause/agg_filter_01/agg_filter_01.9.query.sqlpp
new file mode 100644
index 0000000..c9566c3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/aggregate-subclause/agg_filter_01/agg_filter_01.9.query.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test several various aggregates with filter without GROUP BY clause
+ */
+
+use test;
+
+select
+  count(*) filter(where four > 0) as cnt,
+  min(two) filter(where four > 0) as min2,
+  max(two) filter(where four > 0) as max2,
+  sum(twenty) filter(where four > 0) as sum20
+from tenk;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-subclause/agg_filter_01/agg_filter_01.10.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-subclause/agg_filter_01/agg_filter_01.10.adm
new file mode 100644
index 0000000..3b9c0aa
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-subclause/agg_filter_01/agg_filter_01.10.adm
@@ -0,0 +1,10 @@
+{ "ten": 0, "cnt": 500, "min2": 0, "max2": 0, "sum20": 5000 }
+{ "ten": 1, "cnt": 1000, "min2": 1, "max2": 1, "sum20": 6000 }
+{ "ten": 2, "cnt": 500, "min2": 0, "max2": 0, "sum20": 1000 }
+{ "ten": 3, "cnt": 1000, "min2": 1, "max2": 1, "sum20": 8000 }
+{ "ten": 4, "cnt": 500, "min2": 0, "max2": 0, "sum20": 7000 }
+{ "ten": 5, "cnt": 1000, "min2": 1, "max2": 1, "sum20": 10000 }
+{ "ten": 6, "cnt": 500, "min2": 0, "max2": 0, "sum20": 3000 }
+{ "ten": 7, "cnt": 1000, "min2": 1, "max2": 1, "sum20": 12000 }
+{ "ten": 8, "cnt": 500, "min2": 0, "max2": 0, "sum20": 9000 }
+{ "ten": 9, "cnt": 1000, "min2": 1, "max2": 1, "sum20": 14000 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-subclause/agg_filter_01/agg_filter_01.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-subclause/agg_filter_01/agg_filter_01.9.adm
new file mode 100644
index 0000000..935fe6d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/aggregate-subclause/agg_filter_01/agg_filter_01.9.adm
@@ -0,0 +1 @@
+{ "cnt": 7500, "min2": 0, "max2": 1, "sum20": 75000 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/aggregate-subclause/agg_filter_01/agg_filter_1.09.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/aggregate-subclause/agg_filter_01/agg_filter_1.09.ast
new file mode 100644
index 0000000..3cecd60
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/aggregate-subclause/agg_filter_01/agg_filter_1.09.ast
@@ -0,0 +1,125 @@
+DataverseUse test
+Query:
+SELECT [
+FunctionCall asterix.sql-count@1[
+  (
+    SELECT ELEMENT [
+    LiteralExpr [LONG] [1]
+    ]
+    FROM [      Variable [ Name=#1 ]
+      AS Variable [ Name=#2 ]
+    ]
+    Where
+      OperatorExpr [
+        FieldAccessor [
+          FieldAccessor [
+            Variable [ Name=#2 ]
+            Field=tenk
+          ]
+          Field=four
+        ]
+        >
+        LiteralExpr [LONG] [0]
+      ]
+  )
+]
+cnt
+FunctionCall asterix.sql-min@1[
+  (
+    SELECT ELEMENT [
+    FieldAccessor [
+      FieldAccessor [
+        Variable [ Name=#3 ]
+        Field=tenk
+      ]
+      Field=two
+    ]
+    ]
+    FROM [      Variable [ Name=#1 ]
+      AS Variable [ Name=#3 ]
+    ]
+    Where
+      OperatorExpr [
+        FieldAccessor [
+          FieldAccessor [
+            Variable [ Name=#3 ]
+            Field=tenk
+          ]
+          Field=four
+        ]
+        >
+        LiteralExpr [LONG] [0]
+      ]
+  )
+]
+min2
+FunctionCall asterix.sql-max@1[
+  (
+    SELECT ELEMENT [
+    FieldAccessor [
+      FieldAccessor [
+        Variable [ Name=#4 ]
+        Field=tenk
+      ]
+      Field=two
+    ]
+    ]
+    FROM [      Variable [ Name=#1 ]
+      AS Variable [ Name=#4 ]
+    ]
+    Where
+      OperatorExpr [
+        FieldAccessor [
+          FieldAccessor [
+            Variable [ Name=#4 ]
+            Field=tenk
+          ]
+          Field=four
+        ]
+        >
+        LiteralExpr [LONG] [0]
+      ]
+  )
+]
+max2
+FunctionCall asterix.sql-sum@1[
+  (
+    SELECT ELEMENT [
+    FieldAccessor [
+      FieldAccessor [
+        Variable [ Name=#5 ]
+        Field=tenk
+      ]
+      Field=twenty
+    ]
+    ]
+    FROM [      Variable [ Name=#1 ]
+      AS Variable [ Name=#5 ]
+    ]
+    Where
+      OperatorExpr [
+        FieldAccessor [
+          FieldAccessor [
+            Variable [ Name=#5 ]
+            Field=tenk
+          ]
+          Field=four
+        ]
+        >
+        LiteralExpr [LONG] [0]
+      ]
+  )
+]
+sum20
+]
+FROM [  FunctionCall asterix.dataset@1[
+    LiteralExpr [STRING] [test.tenk]
+  ]
+  AS Variable [ Name=$tenk ]
+]
+Group All
+  GROUP AS Variable [ Name=#1 ]
+  (
+    tenk:=Variable [ Name=$tenk ]
+  )
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/aggregate-subclause/agg_filter_01/agg_filter_1.10.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/aggregate-subclause/agg_filter_01/agg_filter_1.10.ast
new file mode 100644
index 0000000..e82d78e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/aggregate-subclause/agg_filter_01/agg_filter_1.10.ast
@@ -0,0 +1,137 @@
+DataverseUse test
+Query:
+SELECT [
+Variable [ Name=$ten ]
+ten
+FunctionCall asterix.sql-count@1[
+  (
+    SELECT ELEMENT [
+    LiteralExpr [LONG] [1]
+    ]
+    FROM [      Variable [ Name=#1 ]
+      AS Variable [ Name=#2 ]
+    ]
+    Where
+      OperatorExpr [
+        FieldAccessor [
+          FieldAccessor [
+            Variable [ Name=#2 ]
+            Field=tenk
+          ]
+          Field=four
+        ]
+        >
+        LiteralExpr [LONG] [0]
+      ]
+  )
+]
+cnt
+FunctionCall asterix.sql-min@1[
+  (
+    SELECT ELEMENT [
+    FieldAccessor [
+      FieldAccessor [
+        Variable [ Name=#3 ]
+        Field=tenk
+      ]
+      Field=two
+    ]
+    ]
+    FROM [      Variable [ Name=#1 ]
+      AS Variable [ Name=#3 ]
+    ]
+    Where
+      OperatorExpr [
+        FieldAccessor [
+          FieldAccessor [
+            Variable [ Name=#3 ]
+            Field=tenk
+          ]
+          Field=four
+        ]
+        >
+        LiteralExpr [LONG] [0]
+      ]
+  )
+]
+min2
+FunctionCall asterix.sql-max@1[
+  (
+    SELECT ELEMENT [
+    FieldAccessor [
+      FieldAccessor [
+        Variable [ Name=#4 ]
+        Field=tenk
+      ]
+      Field=two
+    ]
+    ]
+    FROM [      Variable [ Name=#1 ]
+      AS Variable [ Name=#4 ]
+    ]
+    Where
+      OperatorExpr [
+        FieldAccessor [
+          FieldAccessor [
+            Variable [ Name=#4 ]
+            Field=tenk
+          ]
+          Field=four
+        ]
+        >
+        LiteralExpr [LONG] [0]
+      ]
+  )
+]
+max2
+FunctionCall asterix.sql-sum@1[
+  (
+    SELECT ELEMENT [
+    FieldAccessor [
+      FieldAccessor [
+        Variable [ Name=#5 ]
+        Field=tenk
+      ]
+      Field=twenty
+    ]
+    ]
+    FROM [      Variable [ Name=#1 ]
+      AS Variable [ Name=#5 ]
+    ]
+    Where
+      OperatorExpr [
+        FieldAccessor [
+          FieldAccessor [
+            Variable [ Name=#5 ]
+            Field=tenk
+          ]
+          Field=four
+        ]
+        >
+        LiteralExpr [LONG] [0]
+      ]
+  )
+]
+sum20
+]
+FROM [  FunctionCall asterix.dataset@1[
+    LiteralExpr [STRING] [test.tenk]
+  ]
+  AS Variable [ Name=$tenk ]
+]
+Groupby
+  Variable [ Name=$ten ]
+  :=
+  FieldAccessor [
+    Variable [ Name=$tenk ]
+    Field=ten
+  ]
+  GROUP AS Variable [ Name=#1 ]
+  (
+    tenk:=Variable [ Name=$tenk ]
+  )
+
+Orderby
+  Variable [ Name=$ten ]
+  ASC
+
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/DataverseName.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/DataverseName.java
index 0eb3b5d..b8124a2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/DataverseName.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/DataverseName.java
@@ -42,9 +42,8 @@
  * <p>
  * E.g. the canonical form for a dataverse name {@code ["a", "b", "c"]} is {@code "a.b.c"}
  * <p>
- * {@link #toString()} returns a display form which is a {@link #CANONICAL_FORM_SEPARATOR_CHAR '.'} separated
- * concatenation of name parts without escaping. In general it's impossible to reconstruct a dataverse name from
- * its display form.
+ * {@link #toString()} returns a display form which is suitable for error messages,
+ * and is a valid SQL++ multi-part identifier parsable by {@code IParser#parseMultipartIdentifier()}
  * <p>
  * Notes:
  * <li>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
index 6a2a333..e4f7790 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
@@ -37,7 +37,13 @@
 
     public static void mapVariablesTopDown(ILogicalOperator op, ILogicalOperator arg,
             Map<LogicalVariable, LogicalVariable> variableMapping) throws AlgebricksException {
-        IsomorphismVariableMappingVisitor visitor = new IsomorphismVariableMappingVisitor(variableMapping);
+        mapVariablesTopDown(op, arg, variableMapping, true);
+    }
+
+    public static void mapVariablesTopDown(ILogicalOperator op, ILogicalOperator arg,
+            Map<LogicalVariable, LogicalVariable> variableMapping, boolean goThroughNts) throws AlgebricksException {
+        IsomorphismVariableMappingVisitor visitor =
+                new IsomorphismVariableMappingVisitor(variableMapping, goThroughNts);
         op.accept(visitor, arg);
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index fe794c8..5cf0b5f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -76,11 +76,14 @@
 
 public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisitor<Void, ILogicalOperator> {
 
-    private final Map<ILogicalOperator, Set<ILogicalOperator>> alreadyMapped = new HashMap<>();
+    private final Map<ILogicalOperator, Set<ILogicalOperator>> alreadyMapped;
     private final Map<LogicalVariable, LogicalVariable> variableMapping;
+    private final boolean goThroughNts;
 
-    IsomorphismVariableMappingVisitor(Map<LogicalVariable, LogicalVariable> variableMapping) {
+    IsomorphismVariableMappingVisitor(Map<LogicalVariable, LogicalVariable> variableMapping, boolean goThroughNts) {
         this.variableMapping = variableMapping;
+        this.goThroughNts = goThroughNts;
+        this.alreadyMapped = goThroughNts ? new HashMap<>() : null;
     }
 
     @Override
@@ -145,6 +148,9 @@
         if (op.getOperatorTag() != arg.getOperatorTag()) {
             return null;
         }
+        if (!goThroughNts) {
+            return null;
+        }
         Set<ILogicalOperator> mappedOps = alreadyMapped.get(op);
         if (mappedOps != null && mappedOps.contains(arg)) {
             return null;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index dd7bc34..059a357 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -131,7 +131,7 @@
      *            a typing context that keeps track of types of each variable.
      * @throws AlgebricksException
      */
-    public static void substituteVariables(ILogicalOperator op, LinkedHashMap<LogicalVariable, LogicalVariable> varMap,
+    public static void substituteVariables(ILogicalOperator op, Map<LogicalVariable, LogicalVariable> varMap,
             ITypingContext ctx) throws AlgebricksException {
         for (Map.Entry<LogicalVariable, LogicalVariable> entry : varMap.entrySet()) {
             VariableUtilities.substituteVariables(op, entry.getKey(), entry.getValue(), ctx);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateIsomorphicSubplanRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateIsomorphicSubplanRule.java
index 1410a3a..7859f92 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateIsomorphicSubplanRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateIsomorphicSubplanRule.java
@@ -1,4 +1,5 @@
 /*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,70 +20,135 @@
 
 package org.apache.hyracks.algebricks.rewriter.rules.subplan;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.IsomorphismUtilities;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 /**
- * <pre>
- * This rules searches for
- *   SUBPLAN_1 { v1 = ... } (directly followed by)
- *   SUBPLAN_2 { v2 = ... }
- * and eliminates nested plans from subplan_1 that are isomorphic to nested plans defined by subplan_2.
- * The variables produced by eliminated nested plans are then ASSIGN-ed to variables produced by
- * matching nested plans in subplan_2:
- *   ASSIGN { v1 = v2 }
- *   SUBPLAN_2 { v2 = ...}
+ * This rule consolidates two subplan operators into a single subplan operator.
+ * It searches for two adjacent subplan operators in the plan
  *
- * Note: SUBPLAN_1 will remain in the plan (below ASSIGN) if some of its nested plans could not be eliminated.
+ * <pre>
+ * SUBPLAN_1 {
+ *   AGGREGATE_1 [v1=...]
+ *   ASSIGN_i_j (zero or more)
+ *   rest_ops_1
+ * }
+ * SUBPLAN_2 {
+ *   AGGREGATE_2 [v2=...]
+ *   ASSIGN_m_n (zero or more)
+ *   rest_ops_2
+ * }
+ * </pre>
+ *
+ * If {@code rest_ops_1} segment is isomorphic with {@code rest_ops_2} segment then
+ * this rule consolidates both subplans into a single (lower) one.
+ * Variables produced {@code rest_ops_1} and used by AGGREGATE_1 / ASSIGN_1_i
+ * are replaced with variables produced by {@code rest_ops_2}
+ *
+ * <pre>
+ * SUBPLAN_2 {
+ *   AGGREGATE [v1=..., v2=...]
+ *   ASSIGN_i_j (zero or more)
+ *   ASSIGN_m_n (zero or more)
+ *   rest_ops_2
+ * }
+ * </pre>
+ *
+ * Note: this rule keeps {@code SUBPLAN_1} if it had several nested plans and
+ * some of those nested plans could not be moved into the lower subplan operator.
  * </pre>
  */
-public class EliminateIsomorphicSubplanRule implements IAlgebraicRewriteRule {
+public final class EliminateIsomorphicSubplanRule implements IAlgebraicRewriteRule {
+
+    private List<AggregateOperator> targetSubplan1Roots;
+
+    private List<AggregateOperator> targetSubplan2Roots;
+
+    private List<Map<LogicalVariable, LogicalVariable>> targetVarMaps;
+
+    private Map<LogicalVariable, LogicalVariable> tmpVarMap;
+
+    private Mutable<AggregateOperator> tmpAggOpRef;
+
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
-        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
-        if (op1.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
+        if (opRef.getValue().getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
             return false;
         }
-        SubplanOperator subplan1 = (SubplanOperator) op1;
+        if (context.checkIfInDontApplySet(this, opRef.getValue())) {
+            return false;
+        }
 
+        boolean applied = false;
+        for (;;) {
+            context.addToDontApplySet(this, opRef.getValue());
+            Pair<Boolean, Mutable<ILogicalOperator>> p = mergeSubplanIntoChildSubplan(opRef, context);
+            if (p == null) {
+                break;
+            }
+            applied |= p.first;
+            opRef = p.second;
+        }
+
+        return applied;
+    }
+
+    /**
+     * Returns {@code null} if given operator's child operator is not a SUBPLAN.
+     * Otherwise attempts to merge the given SUBPLAN into its child and returns a pair of values.
+     * The first value in the pair is a boolean indicating whether the rewriting succeeded or not,
+     * the second value is a reference to the lower subplan (always returned even if the rewriting did not happen)
+     */
+    private Pair<Boolean, Mutable<ILogicalOperator>> mergeSubplanIntoChildSubplan(Mutable<ILogicalOperator> op1Ref,
+            IOptimizationContext context) throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) op1Ref.getValue();
+        SubplanOperator subplan1 = (SubplanOperator) op1;
         Mutable<ILogicalOperator> op2Ref = subplan1.getInputs().get(0);
         ILogicalOperator op2 = op2Ref.getValue();
         if (op2.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
-            return false;
+            return null;
         }
         SubplanOperator subplan2 = (SubplanOperator) op2;
 
-        Map<LogicalVariable, LogicalVariable> assignVarMap = new LinkedHashMap<>();
-        Map<LogicalVariable, LogicalVariable> tmpVarMap = new LinkedHashMap<>();
-        List<LogicalVariable> tmpVarList = new ArrayList<>();
+        reset();
 
         for (Iterator<ILogicalPlan> nestedPlanIter = subplan1.getNestedPlans().iterator(); nestedPlanIter.hasNext();) {
             ILogicalPlan nestedPlan = nestedPlanIter.next();
             for (Iterator<Mutable<ILogicalOperator>> rootOpIter = nestedPlan.getRoots().iterator(); rootOpIter
                     .hasNext();) {
                 ILogicalOperator rootOp = rootOpIter.next().getValue();
-                if (findIsomorphicPlanRoot(rootOp, subplan2, assignVarMap, tmpVarList, tmpVarMap)) {
+                if (findIsomorphicPlanSegment(rootOp, subplan2, tmpAggOpRef, tmpVarMap)) {
+                    targetSubplan1Roots.add((AggregateOperator) rootOp);
+                    targetSubplan2Roots.add(Objects.requireNonNull(tmpAggOpRef.getValue()));
+                    targetVarMaps.add(new HashMap<>(tmpVarMap));
                     rootOpIter.remove();
                 }
             }
@@ -91,72 +157,155 @@
             }
         }
 
-        int assignVarCount = assignVarMap.size();
-        if (assignVarCount == 0) {
-            return false;
+        if (targetSubplan1Roots.isEmpty()) {
+            return new Pair<>(false, op2Ref);
         }
 
-        List<LogicalVariable> assignVars = new ArrayList<>(assignVarCount);
-        List<Mutable<ILogicalExpression>> assignExprs = new ArrayList<>(assignVarCount);
-
-        for (Map.Entry<LogicalVariable, LogicalVariable> me : assignVarMap.entrySet()) {
-            LogicalVariable subplan1Var = me.getKey();
-
-            LogicalVariable subplan2Var = me.getValue();
-            VariableReferenceExpression subplan2VarRef = new VariableReferenceExpression(subplan2Var);
-            subplan2VarRef.setSourceLocation(subplan2.getSourceLocation());
-
-            assignVars.add(subplan1Var);
-            assignExprs.add(new MutableObject<>(subplan2VarRef));
+        for (int i = 0, n = targetSubplan1Roots.size(); i < n; i++) {
+            AggregateOperator targetSubplan1Root = targetSubplan1Roots.get(i);
+            AggregateOperator targetSubplan2Root = targetSubplan2Roots.get(i);
+            Map<LogicalVariable, LogicalVariable> targetVarMap = targetVarMaps.get(i);
+            consolidateSubplans(targetSubplan1Root, targetSubplan2Root, targetVarMap, context);
         }
 
-        Mutable<ILogicalOperator> assignInputOp;
+        context.computeAndSetTypeEnvironmentForOperator(subplan2);
+
         if (subplan1.getNestedPlans().isEmpty()) {
-            assignInputOp = op2Ref;
+            // remove subplan1 from the tree
+            op1Ref.setValue(subplan2);
+            return new Pair<>(true, op1Ref);
         } else {
             // some nested plans were removed from subplan1 -> recompute its type environment
             context.computeAndSetTypeEnvironmentForOperator(subplan1);
-            assignInputOp = new MutableObject<>(subplan1);
+            return new Pair<>(true, op2Ref);
         }
-
-        AssignOperator assignOp = new AssignOperator(assignVars, assignExprs);
-        assignOp.setSourceLocation(subplan1.getSourceLocation());
-        assignOp.getInputs().add(assignInputOp);
-
-        context.computeAndSetTypeEnvironmentForOperator(assignOp);
-        opRef.setValue(assignOp);
-
-        return true;
     }
 
     /**
      * Finds nested plan root in given subplan that is isomorphic to given operator
      * and returns their variable mappings
      */
-    private boolean findIsomorphicPlanRoot(ILogicalOperator op, SubplanOperator subplanOp,
-            Map<LogicalVariable, LogicalVariable> outVarMap, List<LogicalVariable> tmpVarList,
-            Map<LogicalVariable, LogicalVariable> tmpVarMap) throws AlgebricksException {
+    private static boolean findIsomorphicPlanSegment(ILogicalOperator op, SubplanOperator subplanOp,
+            Mutable<AggregateOperator> outSubplanRootOpRef, Map<LogicalVariable, LogicalVariable> outVarMap)
+            throws AlgebricksException {
+        if (op.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+            return false;
+        }
+        AggregateOperator aggOp = (AggregateOperator) op;
+        if (aggOp.getMergeExpressions() != null) {
+            return false;
+        }
+
+        Set<LogicalVariable> freeVars = new ListSet<>();
+        OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc(aggOp, freeVars);
+
+        // find first non-ASSIGN child. It'll be the root for the isomorphic segment search.
+        ILogicalOperator opChildIsomorphicCandidate = aggOp.getInputs().get(0).getValue();
+        while (opChildIsomorphicCandidate.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+            if (!OperatorPropertiesUtil.isMovable(opChildIsomorphicCandidate)) {
+                return false;
+            }
+            opChildIsomorphicCandidate = opChildIsomorphicCandidate.getInputs().get(0).getValue();
+        }
+
         for (ILogicalPlan nestedPlan : subplanOp.getNestedPlans()) {
             for (Mutable<ILogicalOperator> rootOpRef : nestedPlan.getRoots()) {
                 ILogicalOperator rootOp = rootOpRef.getValue();
-                if (IsomorphismUtilities.isOperatorIsomorphicPlanSegment(op, rootOp)) {
-                    tmpVarList.clear();
-                    VariableUtilities.getProducedVariables(op, tmpVarList);
-                    tmpVarMap.clear();
-                    IsomorphismUtilities.mapVariablesTopDown(rootOp, op, tmpVarMap);
-                    tmpVarMap.keySet().retainAll(tmpVarList);
-                    if (tmpVarMap.size() == tmpVarList.size()) {
-                        outVarMap.putAll(tmpVarMap);
-                        return true;
-                    } else {
-                        return false;
-                    }
+                if (rootOp.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
+                    continue;
+                }
+                AggregateOperator aggRootOp = (AggregateOperator) rootOp;
+                if (aggRootOp.getMergeExpressions() != null) {
+                    continue;
+                }
+                if (!OperatorPropertiesUtil.disjoint(freeVars, aggRootOp.getVariables())) {
+                    // upper subplan uses variables from this subplan -> exit
+                    continue;
+                }
+
+                // find first non-ASSIGN child. It'll be the root for the isomorphic segment search.
+                ILogicalOperator rootOpChildIsomorphicCandidate = aggRootOp.getInputs().get(0).getValue();
+                while (rootOpChildIsomorphicCandidate.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+                    rootOpChildIsomorphicCandidate = rootOpChildIsomorphicCandidate.getInputs().get(0).getValue();
+                }
+
+                if (IsomorphismUtilities.isOperatorIsomorphicPlanSegment(opChildIsomorphicCandidate,
+                        rootOpChildIsomorphicCandidate)) {
+                    outSubplanRootOpRef.setValue(aggRootOp);
+                    IsomorphismUtilities.mapVariablesTopDown(rootOpChildIsomorphicCandidate, opChildIsomorphicCandidate,
+                            outVarMap, false);
+                    return true;
                 }
             }
         }
         return false;
     }
 
+    private static void consolidateSubplans(AggregateOperator upperAggRootOp, AggregateOperator targetAggRootOp,
+            Map<LogicalVariable, LogicalVariable> varMap, IOptimizationContext context) throws AlgebricksException {
+        ILogicalOperator upperChildOp = upperAggRootOp.getInputs().get(0).getValue();
+        if (upperChildOp.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+            Deque<AssignOperator> upperAssignQueue = new ArrayDeque<>();
+            do {
+                upperAssignQueue.push((AssignOperator) upperChildOp);
+                upperChildOp = upperChildOp.getInputs().get(0).getValue();
+            } while (upperChildOp.getOperatorTag() == LogicalOperatorTag.ASSIGN);
+
+            ILogicalOperator targetChildOp = targetAggRootOp.getInputs().get(0).getValue();
+
+            AssignOperator upperAssignOp;
+            while ((upperAssignOp = upperAssignQueue.poll()) != null) {
+                AssignOperator upperAssignOpCopy = (AssignOperator) OperatorManipulationUtil.deepCopy(upperAssignOp);
+                upperAssignOpCopy.getInputs().clear();
+                VariableUtilities.substituteVariables(upperAssignOpCopy, varMap, null);
+
+                upperAssignOpCopy.getInputs().add(new MutableObject<>(targetChildOp));
+                context.computeAndSetTypeEnvironmentForOperator(upperAssignOpCopy);
+                targetChildOp = upperAssignOpCopy;
+            }
+
+            targetAggRootOp.getInputs().clear();
+            targetAggRootOp.getInputs().add(new MutableObject<>(targetChildOp));
+        }
+
+        AggregateOperator upperAggRootOpCopy = (AggregateOperator) OperatorManipulationUtil.deepCopy(upperAggRootOp);
+        upperAggRootOpCopy.getInputs().clear();
+        VariableUtilities.substituteVariables(upperAggRootOpCopy, varMap, null);
+
+        targetAggRootOp.getVariables().addAll(upperAggRootOpCopy.getVariables());
+        targetAggRootOp.getExpressions().addAll(upperAggRootOpCopy.getExpressions());
+
+        context.computeAndSetTypeEnvironmentForOperator(targetAggRootOp);
+    }
+
+    private void reset() {
+        if (targetSubplan1Roots == null) {
+            targetSubplan1Roots = new ArrayList<>();
+        } else {
+            targetSubplan1Roots.clear();
+        }
+        if (targetSubplan2Roots == null) {
+            targetSubplan2Roots = new ArrayList<>();
+        } else {
+            targetSubplan2Roots.clear();
+        }
+        if (targetVarMaps == null) {
+            targetVarMaps = new ArrayList<>();
+        } else {
+            targetVarMaps.clear();
+        }
+        if (tmpVarMap == null) {
+            tmpVarMap = new HashMap<>();
+        } else {
+            tmpVarMap.clear();
+        }
+        if (tmpAggOpRef == null) {
+            tmpAggOpRef = new MutableObject<>();
+        } else {
+            tmpAggOpRef.setValue(null);
+        }
+    }
+
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {