Merge branch 'gerrit/cheshire-cat'

Change-Id: Idf8b88e7282c247f7a6355f2056c1d61685fd2a2
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveLeftOuterUnnestForLeftOuterJoinRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveLeftOuterUnnestForLeftOuterJoinRule.java
index 364816b..775a1df 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveLeftOuterUnnestForLeftOuterJoinRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveLeftOuterUnnestForLeftOuterJoinRule.java
@@ -91,7 +91,8 @@
         Triple<Boolean, ILogicalExpression, ILogicalExpression> checkGbyResult =
                 checkUnnestAndGby(outerUnnest, gbyOperator);
         // The argument for listify and not(is-missing(...)) check should be variables.
-        if (!isVariableReference(checkGbyResult.second) || !isVariableReference(checkGbyResult.third)) {
+        if (!checkGbyResult.first || checkGbyResult.second == null || !isVariableReference(checkGbyResult.second)
+                || checkGbyResult.third == null || !isVariableReference(checkGbyResult.third)) {
             return false;
         }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
index 78c4c5e..f7cbb9d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
@@ -626,10 +626,11 @@
     @Override
     public ILogicalOperator visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
         visitSingleInputOperator(op);
-        List<LogicalVariable> distinctVarList = op.getDistinctByVarList();
         for (LogicalVariable keyVar : correlatedKeyVars) {
-            if (!distinctVarList.contains(keyVar)) {
-                distinctVarList.add(keyVar);
+            if (!op.isDistinctByVar(keyVar)) {
+                VariableReferenceExpression keyVarRef = new VariableReferenceExpression(keyVar);
+                keyVarRef.setSourceLocation(op.getSourceLocation());
+                op.getExpressions().add(new MutableObject<>(keyVarRef));
             }
         }
         context.computeAndSetTypeEnvironmentForOperator(op);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
index 12596ff..8326f85 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
@@ -361,10 +361,11 @@
         if (!rewritten || !underJoin) {
             return op;
         }
-        List<LogicalVariable> distinctVarList = op.getDistinctByVarList();
         for (LogicalVariable keyVar : liveVarsFromSubplanInput) {
-            if (!distinctVarList.contains(keyVar)) {
-                distinctVarList.add(keyVar);
+            if (!op.isDistinctByVar(keyVar)) {
+                VariableReferenceExpression keyVarRef = new VariableReferenceExpression(keyVar);
+                keyVarRef.setSourceLocation(op.getSourceLocation());
+                op.getExpressions().add(new MutableObject<>(keyVarRef));
             }
         }
         context.computeAndSetTypeEnvironmentForOperator(op);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
index 62f89f1..e7de31d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
@@ -291,6 +291,10 @@
         if (op.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
             return changedAndVarMap;
         }
+        SubplanOperator subplanOp = (SubplanOperator) op;
+        if (subplanOp.getNumberOfRoots() != 1) {
+            return changedAndVarMap;
+        }
 
         /**
          * Apply the special join-based rewriting.
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
index 288b01a..a0bb4b6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
@@ -220,7 +220,7 @@
 
     @Override
     public Boolean visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
-        return visitInputs(op);
+        return visitTupleDiscardingOperator(op);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index e14fd02..baf083d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -128,18 +128,19 @@
     public static final String PREFIX_INTERNAL_PARAMETERS = "_internal";
 
     // A white list of supported configurable parameters.
-    private static final Set<String> CONFIGURABLE_PARAMETER_NAMES =
-            ImmutableSet.of(CompilerProperties.COMPILER_JOINMEMORY_KEY, CompilerProperties.COMPILER_GROUPMEMORY_KEY,
-                    CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_WINDOWMEMORY_KEY,
-                    CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY, CompilerProperties.COMPILER_PARALLELISM_KEY,
-                    CompilerProperties.COMPILER_SORT_PARALLEL_KEY, CompilerProperties.COMPILER_SORT_SAMPLES_KEY,
-                    CompilerProperties.COMPILER_INDEXONLY_KEY, CompilerProperties.COMPILER_INTERNAL_SANITYCHECK_KEY,
-                    CompilerProperties.COMPILER_EXTERNAL_FIELD_PUSHDOWN_KEY, FunctionUtil.IMPORT_PRIVATE_FUNCTIONS,
-                    FuzzyUtils.SIM_FUNCTION_PROP_NAME, FuzzyUtils.SIM_THRESHOLD_PROP_NAME,
-                    StartFeedStatement.WAIT_FOR_COMPLETION, FeedActivityDetails.FEED_POLICY_NAME,
-                    FeedActivityDetails.COLLECT_LOCATIONS, SqlppQueryRewriter.INLINE_WITH_OPTION,
-                    SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, "hash_merge", "output-record-type",
-                    DisjunctivePredicateToJoinRule.REWRITE_OR_AS_JOIN_OPTION);
+    private static final Set<String> CONFIGURABLE_PARAMETER_NAMES = ImmutableSet.of(
+            CompilerProperties.COMPILER_JOINMEMORY_KEY, CompilerProperties.COMPILER_GROUPMEMORY_KEY,
+            CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_WINDOWMEMORY_KEY,
+            CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY, CompilerProperties.COMPILER_PARALLELISM_KEY,
+            CompilerProperties.COMPILER_SORT_PARALLEL_KEY, CompilerProperties.COMPILER_SORT_SAMPLES_KEY,
+            CompilerProperties.COMPILER_INDEXONLY_KEY, CompilerProperties.COMPILER_INTERNAL_SANITYCHECK_KEY,
+            CompilerProperties.COMPILER_EXTERNAL_FIELD_PUSHDOWN_KEY, CompilerProperties.COMPILER_SUBPLAN_MERGE_KEY,
+            CompilerProperties.COMPILER_SUBPLAN_NESTEDPUSHDOWN_KEY, FunctionUtil.IMPORT_PRIVATE_FUNCTIONS,
+            FuzzyUtils.SIM_FUNCTION_PROP_NAME, FuzzyUtils.SIM_THRESHOLD_PROP_NAME,
+            StartFeedStatement.WAIT_FOR_COMPLETION, FeedActivityDetails.FEED_POLICY_NAME,
+            FeedActivityDetails.COLLECT_LOCATIONS, SqlppQueryRewriter.INLINE_WITH_OPTION,
+            SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, "hash_merge", "output-record-type",
+            DisjunctivePredicateToJoinRule.REWRITE_OR_AS_JOIN_OPTION);
 
     private final IRewriterFactory rewriterFactory;
     private final IAstPrintVisitorFactory astPrintVisitorFactory;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/in_let_3.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/in_let_3.sqlpp
new file mode 100644
index 0000000..95cc0fd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/in_let_3.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type test.TestType as
+{
+  id : integer
+};
+
+create dataset cart(TestType) primary key id;
+
+select c1.cid, i1.pid, i1.ts
+from cart c1 unnest c1.items i1
+where i1.ts >= 2000 and i1.pid in
+(
+  select value i2.pid
+  from cart c2 unnest c2.items i2
+  where i2.ts >= 2000
+  group by i2.pid
+  having count(*) > 1
+)
+order by c1.cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/in_let_4.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/in_let_4.sqlpp
new file mode 100644
index 0000000..58e3a61
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/in_let_4.sqlpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type test.TestType as
+{
+  id : integer
+};
+
+create dataset cart(TestType) primary key id;
+
+--- test with subplan into subplan pushdown enabled (default)
+--- set `compiler.subplan.merge` "true";
+
+select c1.cid, i1.pid, i1.ts
+from cart c1 unnest c1.items i1
+let dup = (
+  select value i2.pid
+  from cart c2 unnest c2.items i2
+  where i2.ts >= 2000
+  group by i2.pid
+  having count(*) > 1
+)
+where i1.ts >= 2000 and i1.pid in dup
+order by c1.cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/in_let_5.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/in_let_5.sqlpp
new file mode 100644
index 0000000..8419f59
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/in_let_5.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type test.TestType as
+{
+  id : integer
+};
+
+create dataset cart(TestType) primary key id;
+
+--- test with subplan into subplan pushdown enabled (default)
+--- and hash-bcast join hint
+--- set `compiler.subplan.merge` "true";
+
+select c1.cid, i1.pid, i1.ts
+from cart c1 unnest c1.items i1
+let dup = (
+  select value i2.pid
+  from cart c2 unnest c2.items i2
+  where i2.ts >= 2000
+  group by i2.pid
+  having count(*) > 1
+)
+where i1.ts >= 2000 and i1.pid /* +hash-bcast */ in dup
+order by c1.cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/in_let_6.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/in_let_6.sqlpp
new file mode 100644
index 0000000..d20db85
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/in_let_6.sqlpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type test.TestType as
+{
+  id : integer
+};
+
+create dataset cart(TestType) primary key id;
+
+--- test with subplan into subplan pushdown disabled
+set `compiler.subplan.merge` "false";
+
+select c1.cid, i1.pid, i1.ts
+from cart c1 unnest c1.items i1
+let dup = (
+  select value i2.pid
+  from cart c2 unnest c2.items i2
+  where i2.ts >= 2000
+  group by i2.pid
+  having count(*) > 1
+)
+where i1.ts >= 2000 and i1.pid in dup
+order by c1.cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/in_let_7.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/in_let_7.sqlpp
new file mode 100644
index 0000000..51ec2ba
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/in_let_7.sqlpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type test.TestType as
+{
+  id : integer
+};
+
+create dataset cart(TestType) primary key id;
+
+--- test with nested subplan pushdown disabled
+--- (in this case same result as in compiler.subplan.merge=false)
+set `compiler.subplan.nestedpushdown` "false";
+
+select c1.cid, i1.pid, i1.ts
+from cart c1 unnest c1.items i1
+let dup = (
+  select value i2.pid
+  from cart c2 unnest c2.items i2
+  where i2.ts >= 2000
+  group by i2.pid
+  having count(*) > 1
+)
+where i1.ts >= 2000 and i1.pid in dup
+order by c1.cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/query-ASTERIXDB-2815.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/query-ASTERIXDB-2815.sqlpp
new file mode 100644
index 0000000..9fd4e02
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/subquery/query-ASTERIXDB-2815.sqlpp
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type t1 as {
+ _id: uuid
+};
+
+create dataset RawTweet(t1) primary key _id autogenerated;
+
+create dataset Evidence(t1) primary key _id autogenerated;
+
+create dataset Verification(t1) primary key _id autogenerated;
+
+select t.id, urls
+from RawTweet t
+let urls = (
+  select distinct value e.url
+  from Verification v, v.evidence ve, Evidence e
+  where t.id = v.tweet_id and ve = e.ev_id
+)
+where array_count(urls) > 2
+order by t.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpcds/query-ASTERIXDB-1581-2.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpcds/query-ASTERIXDB-1581-2.sqlpp
new file mode 100644
index 0000000..8438976
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpcds/query-ASTERIXDB-1581-2.sqlpp
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpcds if exists;
+create dataverse tpcds;
+
+use tpcds;
+
+create type tpcds.store_sales_type as closed {
+    ss_sold_date_sk:           bigint?,
+    ss_sold_time_sk:           bigint?,
+    ss_item_sk:                bigint,
+    ss_customer_sk:            bigint?,
+    ss_cdemo_sk:               bigint?,
+    ss_hdemo_sk:               bigint?,
+    ss_addr_sk:                bigint?,
+    ss_store_sk:               bigint?,
+    ss_promo_sk:               bigint?,
+    ss_ticket_number:          bigint,
+    ss_quantity:               bigint?,
+    ss_wholesale_cost:         double?,
+    ss_list_price:             double?,
+    ss_sales_price:            double?,
+    ss_ext_discount_amt:       double?,
+    ss_ext_sales_price:        double?,
+    ss_ext_wholesale_cost:     double?,
+    ss_ext_list_price:         double?,
+    ss_ext_tax:                double?,
+    ss_coupon_amt:             double?,
+    ss_net_paid:               double?,
+    ss_net_paid_inc_tax:       double?,
+    ss_net_profit:             double?
+};
+
+
+create type tpcds.item_type as closed {
+    i_item_sk:                 bigint,
+    i_item_id:                 string,
+    i_rec_start_date:          string?,
+    i_rec_end_date:            string?,
+    i_item_desc:               string?,
+    i_current_price:           double?,
+    i_wholesale_cost:          double?,
+    i_brand_id:                bigint? ,
+    i_brand:                   string?,
+    i_class_id:                bigint? ,
+    i_class:                   string?,
+    i_category_id:             bigint? ,
+    i_category:                string?,
+    i_manufact_id:             bigint? ,
+    i_manufact:                string?,
+    i_size:                    string?,
+    i_formulation:             string?,
+    i_color:                   string?,
+    i_units:                   string?,
+    i_container:               string?,
+    i_manager_id:              bigint?,
+    i_product_name:            string?
+};
+
+create dataset store_sales (store_sales_type)
+primary key ss_item_sk, ss_ticket_number;
+
+create dataset item (item_type)
+primary key i_item_sk;
+
+--- test with subplan into subplan pushdown enabled (default)
+--- set `compiler.subplan.merge` "true";
+
+select case when (select value count(ss)
+                  from store_sales ss
+                  where ss_quantity >= 1 and ss_quantity <= 20)[0] < 25437
+            then (select avg(ss_ext_discount_amt)
+                  from store_sales
+                  where ss_quantity >= 1 and ss_quantity <= 20)
+            else (select avg(ss_net_profit)
+                  from store_sales
+                  where ss_quantity >= 1 and ss_quantity <= 20)
+            end bucket1
+from item
+where i_item_sk = 1;
+
+drop dataverse tpcds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpcds/query-ASTERIXDB-1581-correlated-2.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpcds/query-ASTERIXDB-1581-correlated-2.sqlpp
new file mode 100644
index 0000000..80fe87c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpcds/query-ASTERIXDB-1581-correlated-2.sqlpp
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpcds if exists;
+create dataverse tpcds;
+
+use tpcds;
+
+create type tpcds.store_sales_type as closed {
+    ss_sold_date_sk:           bigint?,
+    ss_sold_time_sk:           bigint?,
+    ss_item_sk:                bigint,
+    ss_customer_sk:            bigint?,
+    ss_cdemo_sk:               bigint?,
+    ss_hdemo_sk:               bigint?,
+    ss_addr_sk:                bigint?,
+    ss_store_sk:               bigint?,
+    ss_promo_sk:               bigint?,
+    ss_ticket_number:          bigint,
+    ss_quantity:               bigint?,
+    ss_wholesale_cost:         double?,
+    ss_list_price:             double?,
+    ss_sales_price:            double?,
+    ss_ext_discount_amt:       double?,
+    ss_ext_sales_price:        double?,
+    ss_ext_wholesale_cost:     double?,
+    ss_ext_list_price:         double?,
+    ss_ext_tax:                double?,
+    ss_coupon_amt:             double?,
+    ss_net_paid:               double?,
+    ss_net_paid_inc_tax:       double?,
+    ss_net_profit:             double?
+};
+
+
+create type tpcds.item_type as closed {
+    i_item_sk:                 bigint,
+    i_item_id:                 string,
+    i_rec_start_date:          string?,
+    i_rec_end_date:            string?,
+    i_item_desc:               string?,
+    i_current_price:           double?,
+    i_wholesale_cost:          double?,
+    i_brand_id:                bigint? ,
+    i_brand:                   string?,
+    i_class_id:                bigint? ,
+    i_class:                   string?,
+    i_category_id:             bigint? ,
+    i_category:                string?,
+    i_manufact_id:             bigint? ,
+    i_manufact:                string?,
+    i_size:                    string?,
+    i_formulation:             string?,
+    i_color:                   string?,
+    i_units:                   string?,
+    i_container:               string?,
+    i_manager_id:              bigint?,
+    i_product_name:            string?
+};
+
+create dataset store_sales (store_sales_type)
+primary key ss_item_sk, ss_ticket_number;
+
+create dataset item (item_type)
+primary key i_item_sk;
+
+--- test with subplan into subplan pushdown enabled (default)
+--- set `compiler.subplan.merge` "true";
+
+select case when (select value count(ss)
+                  from store_sales ss
+                  where ss_quantity = item.i_item_sk)[0] < 25437
+            then (select avg(ss_ext_discount_amt)
+                  from store_sales
+                  where ss_quantity = item.i_item_sk)
+            else (select avg(ss_net_profit)
+                  from store_sales
+                  where ss_quantity = item.i_item_sk)
+            end bucket1
+from item
+where i_item_sk = 1;
+
+drop dataverse tpcds;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpcds/query-ASTERIXDB-1581-correlated.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpcds/query-ASTERIXDB-1581-correlated.sqlpp
index a41f864..9284080 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpcds/query-ASTERIXDB-1581-correlated.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpcds/query-ASTERIXDB-1581-correlated.sqlpp
@@ -80,6 +80,9 @@
 create dataset item (item_type)
 primary key i_item_sk;
 
+--- test with subplan into subplan pushdown disabled
+set `compiler.subplan.merge` "false";
+
 select case when (select value count(ss)
                   from store_sales ss
                   where ss_quantity = item.i_item_sk)[0] < 25437
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpcds/query-ASTERIXDB-1581.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpcds/query-ASTERIXDB-1581.sqlpp
index 5c94fcf..ed5b4b0 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpcds/query-ASTERIXDB-1581.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/tpcds/query-ASTERIXDB-1581.sqlpp
@@ -80,6 +80,9 @@
 create dataset item (item_type)
 primary key i_item_sk;
 
+--- test with subplan into subplan pushdown disabled
+set `compiler.subplan.merge` "false";
+
 select case when (select value count(ss)
                   from store_sales ss
                   where ss_quantity >= 1 and ss_quantity <= 20)[0] < 25437
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists.plan
index 7af2f40..8985321 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$cntrycode(ASC) ]  |PARTITIONED|
-          -- SORT_GROUP_BY[$$185]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$188]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_EXCHANGE [$$185]  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$188]  |PARTITIONED|
               -- SORT_GROUP_BY[$$162]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
@@ -21,25 +21,25 @@
                         -- STREAM_SELECT  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- SORT_GROUP_BY[$$182]  |PARTITIONED|
+                              -- SORT_GROUP_BY[$$185]  |PARTITIONED|
                                       {
                                         -- AGGREGATE  |LOCAL|
                                           -- NESTED_TUPLE_SOURCE  |LOCAL|
                                       }
-                                -- HASH_PARTITION_EXCHANGE [$$182]  |PARTITIONED|
-                                  -- PRE_CLUSTERED_GROUP_BY[$$176]  |PARTITIONED|
+                                -- HASH_PARTITION_EXCHANGE [$$185]  |PARTITIONED|
+                                  -- PRE_CLUSTERED_GROUP_BY[$$179]  |PARTITIONED|
                                           {
                                             -- AGGREGATE  |LOCAL|
                                               -- STREAM_SELECT  |LOCAL|
                                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                                           }
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- STABLE_SORT [$$176(ASC)]  |PARTITIONED|
+                                      -- STABLE_SORT [$$179(ASC)]  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- HYBRID_HASH_JOIN [$$171][$$168]  |PARTITIONED|
-                                                -- HASH_PARTITION_EXCHANGE [$$171]  |PARTITIONED|
+                                              -- HYBRID_HASH_JOIN [$$174][$$171]  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$174]  |PARTITIONED|
                                                   -- ASSIGN  |PARTITIONED|
                                                     -- STREAM_PROJECT  |PARTITIONED|
                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -66,7 +66,7 @@
                                                                                 -- DATASOURCE_SCAN (test.Customer)  |PARTITIONED|
                                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                     -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                -- HASH_PARTITION_EXCHANGE [$$168]  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$171]  |PARTITIONED|
                                                   -- ASSIGN  |PARTITIONED|
                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                       -- DATASOURCE_SCAN (test.Orders)  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists_ps.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists_ps.plan
index e9d78ee..e98c53a 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists_ps.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists_ps.plan
@@ -9,12 +9,12 @@
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- REPLICATE  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- SORT_GROUP_BY[$$185]  |PARTITIONED|
+                      -- SORT_GROUP_BY[$$188]  |PARTITIONED|
                               {
                                 -- AGGREGATE  |LOCAL|
                                   -- NESTED_TUPLE_SOURCE  |LOCAL|
                               }
-                        -- HASH_PARTITION_EXCHANGE [$$185]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$188]  |PARTITIONED|
                           -- SORT_GROUP_BY[$$162]  |PARTITIONED|
                                   {
                                     -- AGGREGATE  |LOCAL|
@@ -27,25 +27,25 @@
                                     -- STREAM_SELECT  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- SORT_GROUP_BY[$$182]  |PARTITIONED|
+                                          -- SORT_GROUP_BY[$$185]  |PARTITIONED|
                                                   {
                                                     -- AGGREGATE  |LOCAL|
                                                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                   }
-                                            -- HASH_PARTITION_EXCHANGE [$$182]  |PARTITIONED|
-                                              -- PRE_CLUSTERED_GROUP_BY[$$176]  |PARTITIONED|
+                                            -- HASH_PARTITION_EXCHANGE [$$185]  |PARTITIONED|
+                                              -- PRE_CLUSTERED_GROUP_BY[$$179]  |PARTITIONED|
                                                       {
                                                         -- AGGREGATE  |LOCAL|
                                                           -- STREAM_SELECT  |LOCAL|
                                                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                       }
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- STABLE_SORT [$$176(ASC)]  |PARTITIONED|
+                                                  -- STABLE_SORT [$$179(ASC)]  |PARTITIONED|
                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- HYBRID_HASH_JOIN [$$171][$$168]  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$171]  |PARTITIONED|
+                                                          -- HYBRID_HASH_JOIN [$$174][$$171]  |PARTITIONED|
+                                                            -- HASH_PARTITION_EXCHANGE [$$174]  |PARTITIONED|
                                                               -- ASSIGN  |PARTITIONED|
                                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -72,7 +72,7 @@
                                                                                             -- DATASOURCE_SCAN (test.Customer)  |PARTITIONED|
                                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                 -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$168]  |PARTITIONED|
+                                                            -- HASH_PARTITION_EXCHANGE [$$171]  |PARTITIONED|
                                                               -- ASSIGN  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                   -- DATASOURCE_SCAN (test.Orders)  |PARTITIONED|
@@ -86,12 +86,12 @@
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                             -- REPLICATE  |PARTITIONED|
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- SORT_GROUP_BY[$$185]  |PARTITIONED|
+                                -- SORT_GROUP_BY[$$188]  |PARTITIONED|
                                         {
                                           -- AGGREGATE  |LOCAL|
                                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                                         }
-                                  -- HASH_PARTITION_EXCHANGE [$$185]  |PARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$188]  |PARTITIONED|
                                     -- SORT_GROUP_BY[$$162]  |PARTITIONED|
                                             {
                                               -- AGGREGATE  |LOCAL|
@@ -104,25 +104,25 @@
                                               -- STREAM_SELECT  |PARTITIONED|
                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- SORT_GROUP_BY[$$182]  |PARTITIONED|
+                                                    -- SORT_GROUP_BY[$$185]  |PARTITIONED|
                                                             {
                                                               -- AGGREGATE  |LOCAL|
                                                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                             }
-                                                      -- HASH_PARTITION_EXCHANGE [$$182]  |PARTITIONED|
-                                                        -- PRE_CLUSTERED_GROUP_BY[$$176]  |PARTITIONED|
+                                                      -- HASH_PARTITION_EXCHANGE [$$185]  |PARTITIONED|
+                                                        -- PRE_CLUSTERED_GROUP_BY[$$179]  |PARTITIONED|
                                                                 {
                                                                   -- AGGREGATE  |LOCAL|
                                                                     -- STREAM_SELECT  |LOCAL|
                                                                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                                 }
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- STABLE_SORT [$$176(ASC)]  |PARTITIONED|
+                                                            -- STABLE_SORT [$$179(ASC)]  |PARTITIONED|
                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- HYBRID_HASH_JOIN [$$171][$$168]  |PARTITIONED|
-                                                                      -- HASH_PARTITION_EXCHANGE [$$171]  |PARTITIONED|
+                                                                    -- HYBRID_HASH_JOIN [$$174][$$171]  |PARTITIONED|
+                                                                      -- HASH_PARTITION_EXCHANGE [$$174]  |PARTITIONED|
                                                                         -- ASSIGN  |PARTITIONED|
                                                                           -- STREAM_PROJECT  |PARTITIONED|
                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -149,7 +149,7 @@
                                                                                                       -- DATASOURCE_SCAN (test.Customer)  |PARTITIONED|
                                                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                           -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                      -- HASH_PARTITION_EXCHANGE [$$168]  |PARTITIONED|
+                                                                      -- HASH_PARTITION_EXCHANGE [$$171]  |PARTITIONED|
                                                                         -- ASSIGN  |PARTITIONED|
                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                             -- DATASOURCE_SCAN (test.Orders)  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/in_let_3.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/in_let_3.plan
new file mode 100644
index 0000000..3464eae
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/in_let_3.plan
@@ -0,0 +1,74 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$101(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$101(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- STREAM_SELECT  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- PRE_CLUSTERED_GROUP_BY[$$104]  |PARTITIONED|
+                              {
+                                -- AGGREGATE  |LOCAL|
+                                  -- STREAM_SELECT  |LOCAL|
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                              }
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- STABLE_SORT [$$104(ASC)]  |PARTITIONED|
+                            -- HASH_PARTITION_EXCHANGE [$$104]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- HYBRID_HASH_JOIN [$$97][$$pid]  |PARTITIONED|
+                                    -- HASH_PARTITION_EXCHANGE [$$97]  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- STREAM_SELECT  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- UNNEST  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- REPLICATE  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN (test.cart)  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- STREAM_SELECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- SORT_GROUP_BY[$$108]  |PARTITIONED|
+                                                      {
+                                                        -- AGGREGATE  |LOCAL|
+                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                      }
+                                                -- HASH_PARTITION_EXCHANGE [$$108]  |PARTITIONED|
+                                                  -- SORT_GROUP_BY[$$92]  |PARTITIONED|
+                                                          {
+                                                            -- AGGREGATE  |LOCAL|
+                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                          }
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- UNNEST  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ASSIGN  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- REPLICATE  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- DATASOURCE_SCAN (test.cart)  |PARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/in_let_4.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/in_let_4.plan
new file mode 100644
index 0000000..a500b92
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/in_let_4.plan
@@ -0,0 +1,74 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$120(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$120(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- STREAM_SELECT  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- PRE_CLUSTERED_GROUP_BY[$$123]  |PARTITIONED|
+                              {
+                                -- AGGREGATE  |LOCAL|
+                                  -- STREAM_SELECT  |LOCAL|
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                              }
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- STABLE_SORT [$$123(ASC)]  |PARTITIONED|
+                            -- HASH_PARTITION_EXCHANGE [$$123]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- HYBRID_HASH_JOIN [$$116][$$108]  |PARTITIONED|
+                                    -- HASH_PARTITION_EXCHANGE [$$116]  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- STREAM_SELECT  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- UNNEST  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- REPLICATE  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN (test.cart)  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- STREAM_SELECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- SORT_GROUP_BY[$$127]  |PARTITIONED|
+                                                      {
+                                                        -- AGGREGATE  |LOCAL|
+                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                      }
+                                                -- HASH_PARTITION_EXCHANGE [$$127]  |PARTITIONED|
+                                                  -- SORT_GROUP_BY[$$110]  |PARTITIONED|
+                                                          {
+                                                            -- AGGREGATE  |LOCAL|
+                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                          }
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- UNNEST  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ASSIGN  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- REPLICATE  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- DATASOURCE_SCAN (test.cart)  |PARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/in_let_5.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/in_let_5.plan
new file mode 100644
index 0000000..88cacfa
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/in_let_5.plan
@@ -0,0 +1,74 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$120(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$120(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- STREAM_SELECT  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- PRE_CLUSTERED_GROUP_BY[$$123]  |PARTITIONED|
+                              {
+                                -- AGGREGATE  |LOCAL|
+                                  -- STREAM_SELECT  |LOCAL|
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                              }
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- STABLE_SORT [$$123(ASC)]  |PARTITIONED|
+                            -- HASH_PARTITION_EXCHANGE [$$123]  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- HYBRID_HASH_JOIN [$$116][$$108]  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- STREAM_SELECT  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- UNNEST  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- REPLICATE  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN (test.cart)  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- STREAM_SELECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- SORT_GROUP_BY[$$127]  |PARTITIONED|
+                                                      {
+                                                        -- AGGREGATE  |LOCAL|
+                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                      }
+                                                -- HASH_PARTITION_EXCHANGE [$$127]  |PARTITIONED|
+                                                  -- SORT_GROUP_BY[$$110]  |PARTITIONED|
+                                                          {
+                                                            -- AGGREGATE  |LOCAL|
+                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                          }
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- UNNEST  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ASSIGN  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- REPLICATE  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- DATASOURCE_SCAN (test.cart)  |PARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/in_let_6.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/in_let_6.plan
new file mode 100644
index 0000000..512a0e7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/in_let_6.plan
@@ -0,0 +1,70 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$111(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$111(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- STREAM_SELECT  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- SUBPLAN  |PARTITIONED|
+                            {
+                              -- AGGREGATE  |LOCAL|
+                                -- STREAM_SELECT  |LOCAL|
+                                  -- UNNEST  |LOCAL|
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                            }
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- NESTED_LOOP  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- UNNEST  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- REPLICATE  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- DATASOURCE_SCAN (test.cart)  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                            -- AGGREGATE  |UNPARTITIONED|
+                              -- SORT_MERGE_EXCHANGE [$$pid(ASC) ]  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- STREAM_SELECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- SORT_GROUP_BY[$$117]  |PARTITIONED|
+                                              {
+                                                -- AGGREGATE  |LOCAL|
+                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                              }
+                                        -- HASH_PARTITION_EXCHANGE [$$117]  |PARTITIONED|
+                                          -- SORT_GROUP_BY[$$102]  |PARTITIONED|
+                                                  {
+                                                    -- AGGREGATE  |LOCAL|
+                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                  }
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- STREAM_SELECT  |PARTITIONED|
+                                                  -- ASSIGN  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- UNNEST  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- REPLICATE  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- DATASOURCE_SCAN (test.cart)  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/in_let_7.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/in_let_7.plan
new file mode 100644
index 0000000..512a0e7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/in_let_7.plan
@@ -0,0 +1,70 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$111(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$111(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- STREAM_SELECT  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- SUBPLAN  |PARTITIONED|
+                            {
+                              -- AGGREGATE  |LOCAL|
+                                -- STREAM_SELECT  |LOCAL|
+                                  -- UNNEST  |LOCAL|
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                            }
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- NESTED_LOOP  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- UNNEST  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- REPLICATE  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- DATASOURCE_SCAN (test.cart)  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                            -- AGGREGATE  |UNPARTITIONED|
+                              -- SORT_MERGE_EXCHANGE [$$pid(ASC) ]  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- STREAM_SELECT  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- SORT_GROUP_BY[$$117]  |PARTITIONED|
+                                              {
+                                                -- AGGREGATE  |LOCAL|
+                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                              }
+                                        -- HASH_PARTITION_EXCHANGE [$$117]  |PARTITIONED|
+                                          -- SORT_GROUP_BY[$$102]  |PARTITIONED|
+                                                  {
+                                                    -- AGGREGATE  |LOCAL|
+                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                  }
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- STREAM_SELECT  |PARTITIONED|
+                                                  -- ASSIGN  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- UNNEST  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- REPLICATE  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- DATASOURCE_SCAN (test.cart)  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists.plan
index afdc38c..0120576 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$cntrycode(ASC) ]  |PARTITIONED|
-          -- SORT_GROUP_BY[$$186]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$189]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_EXCHANGE [$$186]  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$189]  |PARTITIONED|
               -- SORT_GROUP_BY[$$163]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
@@ -21,25 +21,25 @@
                         -- STREAM_SELECT  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- SORT_GROUP_BY[$$183]  |PARTITIONED|
+                              -- SORT_GROUP_BY[$$186]  |PARTITIONED|
                                       {
                                         -- AGGREGATE  |LOCAL|
                                           -- NESTED_TUPLE_SOURCE  |LOCAL|
                                       }
-                                -- HASH_PARTITION_EXCHANGE [$$183]  |PARTITIONED|
-                                  -- PRE_CLUSTERED_GROUP_BY[$$177]  |PARTITIONED|
+                                -- HASH_PARTITION_EXCHANGE [$$186]  |PARTITIONED|
+                                  -- PRE_CLUSTERED_GROUP_BY[$$180]  |PARTITIONED|
                                           {
                                             -- AGGREGATE  |LOCAL|
                                               -- STREAM_SELECT  |LOCAL|
                                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                                           }
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- STABLE_SORT [$$177(ASC)]  |PARTITIONED|
+                                      -- STABLE_SORT [$$180(ASC)]  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- HYBRID_HASH_JOIN [$$172][$$169]  |PARTITIONED|
-                                                -- HASH_PARTITION_EXCHANGE [$$172]  |PARTITIONED|
+                                              -- HYBRID_HASH_JOIN [$$175][$$172]  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$175]  |PARTITIONED|
                                                   -- ASSIGN  |PARTITIONED|
                                                     -- STREAM_PROJECT  |PARTITIONED|
                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -66,7 +66,7 @@
                                                                                 -- DATASOURCE_SCAN (test.Customer)  |PARTITIONED|
                                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                     -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                -- HASH_PARTITION_EXCHANGE [$$169]  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$172]  |PARTITIONED|
                                                   -- ASSIGN  |PARTITIONED|
                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                       -- DATASOURCE_SCAN (test.Orders)  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists_ps.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists_ps.plan
index 784ecdb..d1f0a82 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists_ps.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists_ps.plan
@@ -9,12 +9,12 @@
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- REPLICATE  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                      -- SORT_GROUP_BY[$$186]  |PARTITIONED|
+                      -- SORT_GROUP_BY[$$189]  |PARTITIONED|
                               {
                                 -- AGGREGATE  |LOCAL|
                                   -- NESTED_TUPLE_SOURCE  |LOCAL|
                               }
-                        -- HASH_PARTITION_EXCHANGE [$$186]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$189]  |PARTITIONED|
                           -- SORT_GROUP_BY[$$163]  |PARTITIONED|
                                   {
                                     -- AGGREGATE  |LOCAL|
@@ -27,25 +27,25 @@
                                     -- STREAM_SELECT  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- SORT_GROUP_BY[$$183]  |PARTITIONED|
+                                          -- SORT_GROUP_BY[$$186]  |PARTITIONED|
                                                   {
                                                     -- AGGREGATE  |LOCAL|
                                                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                   }
-                                            -- HASH_PARTITION_EXCHANGE [$$183]  |PARTITIONED|
-                                              -- PRE_CLUSTERED_GROUP_BY[$$177]  |PARTITIONED|
+                                            -- HASH_PARTITION_EXCHANGE [$$186]  |PARTITIONED|
+                                              -- PRE_CLUSTERED_GROUP_BY[$$180]  |PARTITIONED|
                                                       {
                                                         -- AGGREGATE  |LOCAL|
                                                           -- STREAM_SELECT  |LOCAL|
                                                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                       }
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- STABLE_SORT [$$177(ASC)]  |PARTITIONED|
+                                                  -- STABLE_SORT [$$180(ASC)]  |PARTITIONED|
                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- HYBRID_HASH_JOIN [$$172][$$169]  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$172]  |PARTITIONED|
+                                                          -- HYBRID_HASH_JOIN [$$175][$$172]  |PARTITIONED|
+                                                            -- HASH_PARTITION_EXCHANGE [$$175]  |PARTITIONED|
                                                               -- ASSIGN  |PARTITIONED|
                                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -72,7 +72,7 @@
                                                                                             -- DATASOURCE_SCAN (test.Customer)  |PARTITIONED|
                                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                 -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$169]  |PARTITIONED|
+                                                            -- HASH_PARTITION_EXCHANGE [$$172]  |PARTITIONED|
                                                               -- ASSIGN  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                   -- DATASOURCE_SCAN (test.Orders)  |PARTITIONED|
@@ -86,12 +86,12 @@
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                             -- REPLICATE  |PARTITIONED|
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- SORT_GROUP_BY[$$186]  |PARTITIONED|
+                                -- SORT_GROUP_BY[$$189]  |PARTITIONED|
                                         {
                                           -- AGGREGATE  |LOCAL|
                                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                                         }
-                                  -- HASH_PARTITION_EXCHANGE [$$186]  |PARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$189]  |PARTITIONED|
                                     -- SORT_GROUP_BY[$$163]  |PARTITIONED|
                                             {
                                               -- AGGREGATE  |LOCAL|
@@ -104,25 +104,25 @@
                                               -- STREAM_SELECT  |PARTITIONED|
                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- SORT_GROUP_BY[$$183]  |PARTITIONED|
+                                                    -- SORT_GROUP_BY[$$186]  |PARTITIONED|
                                                             {
                                                               -- AGGREGATE  |LOCAL|
                                                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                             }
-                                                      -- HASH_PARTITION_EXCHANGE [$$183]  |PARTITIONED|
-                                                        -- PRE_CLUSTERED_GROUP_BY[$$177]  |PARTITIONED|
+                                                      -- HASH_PARTITION_EXCHANGE [$$186]  |PARTITIONED|
+                                                        -- PRE_CLUSTERED_GROUP_BY[$$180]  |PARTITIONED|
                                                                 {
                                                                   -- AGGREGATE  |LOCAL|
                                                                     -- STREAM_SELECT  |LOCAL|
                                                                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                                 }
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- STABLE_SORT [$$177(ASC)]  |PARTITIONED|
+                                                            -- STABLE_SORT [$$180(ASC)]  |PARTITIONED|
                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- HYBRID_HASH_JOIN [$$172][$$169]  |PARTITIONED|
-                                                                      -- HASH_PARTITION_EXCHANGE [$$172]  |PARTITIONED|
+                                                                    -- HYBRID_HASH_JOIN [$$175][$$172]  |PARTITIONED|
+                                                                      -- HASH_PARTITION_EXCHANGE [$$175]  |PARTITIONED|
                                                                         -- ASSIGN  |PARTITIONED|
                                                                           -- STREAM_PROJECT  |PARTITIONED|
                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -149,7 +149,7 @@
                                                                                                       -- DATASOURCE_SCAN (test.Customer)  |PARTITIONED|
                                                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                           -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                      -- HASH_PARTITION_EXCHANGE [$$169]  |PARTITIONED|
+                                                                      -- HASH_PARTITION_EXCHANGE [$$172]  |PARTITIONED|
                                                                         -- ASSIGN  |PARTITIONED|
                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                             -- DATASOURCE_SCAN (test.Orders)  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/query-ASTERIXDB-2815.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/query-ASTERIXDB-2815.plan
new file mode 100644
index 0000000..363b2bd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/query-ASTERIXDB-2815.plan
@@ -0,0 +1,74 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$75(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$75(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- STREAM_SELECT  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- PRE_CLUSTERED_GROUP_BY[$$70]  |PARTITIONED|
+                              {
+                                -- AGGREGATE  |LOCAL|
+                                  -- STREAM_SELECT  |LOCAL|
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                              }
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- HYBRID_HASH_JOIN [$$70][$$82]  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ASSIGN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- REPLICATE  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ASSIGN  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- DATASOURCE_SCAN (test.RawTweet)  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                -- HASH_PARTITION_EXCHANGE [$$82]  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- STABLE_SORT [$$62(ASC), $$82(ASC)]  |PARTITIONED|
+                                            -- HASH_PARTITION_EXCHANGE [$$62, $$82]  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- HYBRID_HASH_JOIN [$$ve][$$73]  |PARTITIONED|
+                                                    -- HASH_PARTITION_EXCHANGE [$$ve]  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- UNNEST  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- HYBRID_HASH_JOIN [$$80][$$76]  |PARTITIONED|
+                                                                -- HASH_PARTITION_EXCHANGE [$$80]  |PARTITIONED|
+                                                                  -- REPLICATE  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- ASSIGN  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- DATASOURCE_SCAN (test.RawTweet)  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                -- HASH_PARTITION_EXCHANGE [$$76]  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ASSIGN  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- DATASOURCE_SCAN (test.Verification)  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                    -- HASH_PARTITION_EXCHANGE [$$73]  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- DATASOURCE_SCAN (test.Evidence)  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-2.plan
new file mode 100644
index 0000000..6e589bc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-2.plan
@@ -0,0 +1,142 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- UNNEST  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ASSIGN  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- NESTED_LOOP  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- NESTED_LOOP  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- NESTED_LOOP  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- BTREE_SEARCH (tpcds.item.item)  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                              -- AGGREGATE  |UNPARTITIONED|
+                                -- AGGREGATE  |UNPARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                    -- NESTED_LOOP  |UNPARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                        -- STREAM_PROJECT  |UNPARTITIONED|
+                                          -- STREAM_SELECT  |UNPARTITIONED|
+                                            -- STREAM_PROJECT  |UNPARTITIONED|
+                                              -- ASSIGN  |UNPARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                  -- REPLICATE  |UNPARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                      -- AGGREGATE  |UNPARTITIONED|
+                                                        -- AGGREGATE  |UNPARTITIONED|
+                                                          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                            -- AGGREGATE  |PARTITIONED|
+                                                              -- STREAM_SELECT  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ASSIGN  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- REPLICATE  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                        -- STREAM_PROJECT  |UNPARTITIONED|
+                                          -- ASSIGN  |UNPARTITIONED|
+                                            -- AGGREGATE  |UNPARTITIONED|
+                                              -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                -- AGGREGATE  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- STREAM_SELECT  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- REPLICATE  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                          -- AGGREGATE  |UNPARTITIONED|
+                            -- AGGREGATE  |UNPARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                -- NESTED_LOOP  |UNPARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                    -- STREAM_PROJECT  |UNPARTITIONED|
+                                      -- STREAM_SELECT  |UNPARTITIONED|
+                                        -- STREAM_PROJECT  |UNPARTITIONED|
+                                          -- ASSIGN  |UNPARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                              -- REPLICATE  |UNPARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                  -- AGGREGATE  |UNPARTITIONED|
+                                                    -- AGGREGATE  |UNPARTITIONED|
+                                                      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                        -- AGGREGATE  |PARTITIONED|
+                                                          -- STREAM_SELECT  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ASSIGN  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- REPLICATE  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                    -- STREAM_PROJECT  |UNPARTITIONED|
+                                      -- ASSIGN  |UNPARTITIONED|
+                                        -- AGGREGATE  |UNPARTITIONED|
+                                          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                            -- AGGREGATE  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- STREAM_SELECT  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- REPLICATE  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                      -- STREAM_PROJECT  |UNPARTITIONED|
+                        -- ASSIGN  |UNPARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                            -- REPLICATE  |UNPARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                -- AGGREGATE  |UNPARTITIONED|
+                                  -- AGGREGATE  |UNPARTITIONED|
+                                    -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                      -- AGGREGATE  |PARTITIONED|
+                                        -- STREAM_SELECT  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- REPLICATE  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated-2.plan
new file mode 100644
index 0000000..972e59e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated-2.plan
@@ -0,0 +1,336 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- UNNEST  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ASSIGN  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- PRE_CLUSTERED_GROUP_BY[$$227]  |PARTITIONED|
+                            {
+                              -- AGGREGATE  |LOCAL|
+                                -- AGGREGATE  |LOCAL|
+                                  -- STREAM_SELECT  |LOCAL|
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                            }
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- HYBRID_HASH_JOIN [$$227][$$175]  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- PRE_CLUSTERED_GROUP_BY[$$189]  |PARTITIONED|
+                                        {
+                                          -- AGGREGATE  |LOCAL|
+                                            -- AGGREGATE  |LOCAL|
+                                              -- STREAM_SELECT  |LOCAL|
+                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                        }
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- STABLE_SORT [$$189(ASC)]  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- HYBRID_HASH_JOIN [$$189][$$225]  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- PRE_CLUSTERED_GROUP_BY[$$137]  |PARTITIONED|
+                                                        {
+                                                          -- AGGREGATE  |LOCAL|
+                                                            -- AGGREGATE  |LOCAL|
+                                                              -- STREAM_SELECT  |LOCAL|
+                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                        }
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- HYBRID_HASH_JOIN [$$137][$$187]  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- REPLICATE  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- BTREE_SEARCH (tpcds.item.item)  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- ASSIGN  |PARTITIONED|
+                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- SORT_GROUP_BY[$$234]  |PARTITIONED|
+                                                                          {
+                                                                            -- AGGREGATE  |LOCAL|
+                                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                          }
+                                                                    -- HASH_PARTITION_EXCHANGE [$$234]  |PARTITIONED|
+                                                                      -- PRE_CLUSTERED_GROUP_BY[$$186]  |PARTITIONED|
+                                                                              {
+                                                                                -- AGGREGATE  |LOCAL|
+                                                                                  -- STREAM_SELECT  |LOCAL|
+                                                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                              }
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- STABLE_SORT [$$186(ASC)]  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- HYBRID_HASH_JOIN [$$170][$$169]  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- REPLICATE  |PARTITIONED|
+                                                                                                -- HASH_PARTITION_EXCHANGE [$$204]  |PARTITIONED|
+                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                            -- ASSIGN  |PARTITIONED|
+                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                -- PRE_CLUSTERED_GROUP_BY[$$208]  |PARTITIONED|
+                                                                                                                        {
+                                                                                                                          -- AGGREGATE  |LOCAL|
+                                                                                                                            -- AGGREGATE  |LOCAL|
+                                                                                                                              -- STREAM_SELECT  |LOCAL|
+                                                                                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                                        }
+                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                        -- HYBRID_HASH_JOIN [$$208][$$209]  |PARTITIONED|
+                                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                            -- REPLICATE  |PARTITIONED|
+                                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                -- BTREE_SEARCH (tpcds.item.item)  |PARTITIONED|
+                                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                  -- REPLICATE  |PARTITIONED|
+                                                                                                                                    -- HASH_PARTITION_EXCHANGE [$$207]  |PARTITIONED|
+                                                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                        -- ASSIGN  |PARTITIONED|
+                                                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                            -- ASSIGN  |PARTITIONED|
+                                                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                -- REPLICATE  |PARTITIONED|
+                                                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                        -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                      -- HASH_PARTITION_EXCHANGE [$$169]  |PARTITIONED|
+                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- REPLICATE  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  -- ASSIGN  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- SORT_GROUP_BY[$$240]  |PARTITIONED|
+                                                              {
+                                                                -- AGGREGATE  |LOCAL|
+                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                              }
+                                                        -- HASH_PARTITION_EXCHANGE [$$240]  |PARTITIONED|
+                                                          -- PRE_CLUSTERED_GROUP_BY[$$224]  |PARTITIONED|
+                                                                  {
+                                                                    -- AGGREGATE  |LOCAL|
+                                                                      -- STREAM_SELECT  |LOCAL|
+                                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                  }
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- STABLE_SORT [$$224(ASC)]  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- HYBRID_HASH_JOIN [$$174][$$173]  |PARTITIONED|
+                                                                          -- HASH_PARTITION_EXCHANGE [$$174]  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- STREAM_SELECT  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- PRE_CLUSTERED_GROUP_BY[$$194]  |PARTITIONED|
+                                                                                              {
+                                                                                                -- AGGREGATE  |LOCAL|
+                                                                                                  -- AGGREGATE  |LOCAL|
+                                                                                                    -- STREAM_SELECT  |LOCAL|
+                                                                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                              }
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- STABLE_SORT [$$194(ASC)]  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- HYBRID_HASH_JOIN [$$194][$$171]  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- PRE_CLUSTERED_GROUP_BY[$$195]  |PARTITIONED|
+                                                                                                              {
+                                                                                                                -- AGGREGATE  |LOCAL|
+                                                                                                                  -- AGGREGATE  |LOCAL|
+                                                                                                                    -- STREAM_SELECT  |LOCAL|
+                                                                                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                              }
+                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                              -- HYBRID_HASH_JOIN [$$195][$$197]  |PARTITIONED|
+                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                        -- REPLICATE  |PARTITIONED|
+                                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                            -- BTREE_SEARCH (tpcds.item.item)  |PARTITIONED|
+                                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                      -- SORT_GROUP_BY[$$237]  |PARTITIONED|
+                                                                                                                              {
+                                                                                                                                -- AGGREGATE  |LOCAL|
+                                                                                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                                              }
+                                                                                                                        -- HASH_PARTITION_EXCHANGE [$$237]  |PARTITIONED|
+                                                                                                                          -- PRE_CLUSTERED_GROUP_BY[$$203]  |PARTITIONED|
+                                                                                                                                  {
+                                                                                                                                    -- AGGREGATE  |LOCAL|
+                                                                                                                                      -- STREAM_SELECT  |LOCAL|
+                                                                                                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                                                  }
+                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                              -- STABLE_SORT [$$203(ASC)]  |PARTITIONED|
+                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                        -- HYBRID_HASH_JOIN [$$204][$$207]  |PARTITIONED|
+                                                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                            -- REPLICATE  |PARTITIONED|
+                                                                                                                                              -- HASH_PARTITION_EXCHANGE [$$204]  |PARTITIONED|
+                                                                                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                      -- STREAM_SELECT  |PARTITIONED|
+                                                                                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                              -- PRE_CLUSTERED_GROUP_BY[$$208]  |PARTITIONED|
+                                                                                                                                                                      {
+                                                                                                                                                                        -- AGGREGATE  |LOCAL|
+                                                                                                                                                                          -- AGGREGATE  |LOCAL|
+                                                                                                                                                                            -- STREAM_SELECT  |LOCAL|
+                                                                                                                                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                                                                                      }
+                                                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                      -- HYBRID_HASH_JOIN [$$208][$$209]  |PARTITIONED|
+                                                                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                          -- REPLICATE  |PARTITIONED|
+                                                                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                              -- BTREE_SEARCH (tpcds.item.item)  |PARTITIONED|
+                                                                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                                            -- ASSIGN  |PARTITIONED|
+                                                                                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                                -- REPLICATE  |PARTITIONED|
+                                                                                                                                                                                  -- HASH_PARTITION_EXCHANGE [$$207]  |PARTITIONED|
+                                                                                                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                                                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                                              -- REPLICATE  |PARTITIONED|
+                                                                                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                                                      -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                            -- REPLICATE  |PARTITIONED|
+                                                                                                                                              -- HASH_PARTITION_EXCHANGE [$$207]  |PARTITIONED|
+                                                                                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                          -- REPLICATE  |PARTITIONED|
+                                                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                  -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                        -- ASSIGN  |PARTITIONED|
+                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                            -- REPLICATE  |PARTITIONED|
+                                                                                                              -- HASH_PARTITION_EXCHANGE [$$207]  |PARTITIONED|
+                                                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                          -- REPLICATE  |PARTITIONED|
+                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                  -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                          -- HASH_PARTITION_EXCHANGE [$$173]  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- REPLICATE  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- REPLICATE  |PARTITIONED|
+                                        -- HASH_PARTITION_EXCHANGE [$$207]  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- REPLICATE  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan
index 419c95f..1a70178 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan
@@ -8,35 +8,31 @@
               -- ASSIGN  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- PRE_CLUSTERED_GROUP_BY[$$169]  |PARTITIONED|
+                    -- PRE_CLUSTERED_GROUP_BY[$$173]  |PARTITIONED|
                             {
                               -- AGGREGATE  |LOCAL|
                                 -- AGGREGATE  |LOCAL|
-                                  -- ASSIGN  |LOCAL|
-                                    -- AGGREGATE  |LOCAL|
-                                      -- STREAM_SELECT  |LOCAL|
-                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                  -- STREAM_SELECT  |LOCAL|
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
                             }
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- HYBRID_HASH_JOIN [$$169][$$170]  |PARTITIONED|
+                            -- HYBRID_HASH_JOIN [$$173][$$215]  |PARTITIONED|
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                 -- PRE_CLUSTERED_GROUP_BY[$$157]  |PARTITIONED|
                                         {
                                           -- AGGREGATE  |LOCAL|
                                             -- AGGREGATE  |LOCAL|
-                                              -- ASSIGN  |LOCAL|
-                                                -- AGGREGATE  |LOCAL|
-                                                  -- STREAM_SELECT  |LOCAL|
-                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                              -- STREAM_SELECT  |LOCAL|
+                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
                                         }
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                     -- STABLE_SORT [$$157(ASC)]  |PARTITIONED|
                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                         -- STREAM_PROJECT  |PARTITIONED|
                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- HYBRID_HASH_JOIN [$$157][$$158]  |PARTITIONED|
+                                            -- HYBRID_HASH_JOIN [$$157][$$171]  |PARTITIONED|
                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                 -- PRE_CLUSTERED_GROUP_BY[$$137]  |PARTITIONED|
                                                         {
@@ -64,7 +60,7 @@
                                                               -- ASSIGN  |PARTITIONED|
                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                   -- REPLICATE  |PARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$192]  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$198]  |PARTITIONED|
                                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                                         -- ASSIGN  |PARTITIONED|
                                                                           -- STREAM_PROJECT  |PARTITIONED|
@@ -77,144 +73,35 @@
                                                                                         -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
                                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                             -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                              -- HASH_PARTITION_EXCHANGE [$$158]  |PARTITIONED|
-                                                -- ASSIGN  |PARTITIONED|
-                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  -- ASSIGN  |PARTITIONED|
                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- HYBRID_HASH_JOIN [$$150][$$149]  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                            -- ASSIGN  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- REPLICATE  |PARTITIONED|
-                                                                  -- HASH_PARTITION_EXCHANGE [$$188]  |PARTITIONED|
-                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                      -- ASSIGN  |PARTITIONED|
-                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- STREAM_SELECT  |PARTITIONED|
-                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- REPLICATE  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- PRE_CLUSTERED_GROUP_BY[$$191]  |PARTITIONED|
-                                                                                          {
-                                                                                            -- AGGREGATE  |LOCAL|
-                                                                                              -- AGGREGATE  |LOCAL|
-                                                                                                -- STREAM_SELECT  |LOCAL|
-                                                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                                          }
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- HYBRID_HASH_JOIN [$$191][$$192]  |PARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- REPLICATE  |PARTITIONED|
-                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                  -- BTREE_SEARCH (tpcds.item.item)  |PARTITIONED|
-                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                      -- ASSIGN  |PARTITIONED|
-                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- REPLICATE  |PARTITIONED|
-                                                                                                -- HASH_PARTITION_EXCHANGE [$$192]  |PARTITIONED|
-                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                    -- ASSIGN  |PARTITIONED|
-                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                        -- ASSIGN  |PARTITIONED|
-                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                            -- REPLICATE  |PARTITIONED|
-                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                    -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
-                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                        -- HASH_PARTITION_EXCHANGE [$$149]  |PARTITIONED|
-                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                            -- ASSIGN  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- REPLICATE  |PARTITIONED|
-                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                              -- HASH_PARTITION_EXCHANGE [$$170]  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- HYBRID_HASH_JOIN [$$152][$$151]  |PARTITIONED|
-                                        -- HASH_PARTITION_EXCHANGE [$$152]  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                -- STREAM_SELECT  |PARTITIONED|
-                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                    -- ASSIGN  |PARTITIONED|
-                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- PRE_CLUSTERED_GROUP_BY[$$171]  |PARTITIONED|
-                                                                {
-                                                                  -- AGGREGATE  |LOCAL|
+                                                      -- SORT_GROUP_BY[$$221]  |PARTITIONED|
+                                                              {
+                                                                -- AGGREGATE  |LOCAL|
+                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                              }
+                                                        -- HASH_PARTITION_EXCHANGE [$$221]  |PARTITIONED|
+                                                          -- PRE_CLUSTERED_GROUP_BY[$$161]  |PARTITIONED|
+                                                                  {
                                                                     -- AGGREGATE  |LOCAL|
-                                                                      -- AGGREGATE  |LOCAL|
-                                                                        -- STREAM_SELECT  |LOCAL|
-                                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                }
-                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- STABLE_SORT [$$171(ASC)]  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- HYBRID_HASH_JOIN [$$171][$$174]  |PARTITIONED|
+                                                                      -- STREAM_SELECT  |LOCAL|
+                                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                  }
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- STABLE_SORT [$$161(ASC)]  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
                                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- ASSIGN  |PARTITIONED|
-                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- REPLICATE  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- PRE_CLUSTERED_GROUP_BY[$$191]  |PARTITIONED|
-                                                                                          {
-                                                                                            -- AGGREGATE  |LOCAL|
-                                                                                              -- AGGREGATE  |LOCAL|
-                                                                                                -- STREAM_SELECT  |LOCAL|
-                                                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                                          }
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- HYBRID_HASH_JOIN [$$191][$$192]  |PARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- REPLICATE  |PARTITIONED|
-                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                  -- BTREE_SEARCH (tpcds.item.item)  |PARTITIONED|
-                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                      -- ASSIGN  |PARTITIONED|
-                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- REPLICATE  |PARTITIONED|
-                                                                                                -- HASH_PARTITION_EXCHANGE [$$192]  |PARTITIONED|
-                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                    -- ASSIGN  |PARTITIONED|
-                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                        -- ASSIGN  |PARTITIONED|
-                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                            -- REPLICATE  |PARTITIONED|
-                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                    -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
-                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                      -- HASH_PARTITION_EXCHANGE [$$174]  |PARTITIONED|
-                                                                        -- ASSIGN  |PARTITIONED|
-                                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- HYBRID_HASH_JOIN [$$188][$$187]  |PARTITIONED|
+                                                                        -- HYBRID_HASH_JOIN [$$150][$$149]  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              -- ASSIGN  |PARTITIONED|
                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                   -- REPLICATE  |PARTITIONED|
-                                                                                    -- HASH_PARTITION_EXCHANGE [$$188]  |PARTITIONED|
+                                                                                    -- HASH_PARTITION_EXCHANGE [$$196]  |PARTITIONED|
                                                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                                                         -- ASSIGN  |PARTITIONED|
                                                                                           -- STREAM_PROJECT  |PARTITIONED|
@@ -222,7 +109,7 @@
                                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                 -- REPLICATE  |PARTITIONED|
                                                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                    -- PRE_CLUSTERED_GROUP_BY[$$191]  |PARTITIONED|
+                                                                                                    -- PRE_CLUSTERED_GROUP_BY[$$199]  |PARTITIONED|
                                                                                                             {
                                                                                                               -- AGGREGATE  |LOCAL|
                                                                                                                 -- AGGREGATE  |LOCAL|
@@ -232,7 +119,7 @@
                                                                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                         -- STREAM_PROJECT  |PARTITIONED|
                                                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                            -- HYBRID_HASH_JOIN [$$191][$$192]  |PARTITIONED|
+                                                                                                            -- HYBRID_HASH_JOIN [$$199][$$200]  |PARTITIONED|
                                                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                                 -- REPLICATE  |PARTITIONED|
                                                                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -241,41 +128,213 @@
                                                                                                                         -- ASSIGN  |PARTITIONED|
                                                                                                                           -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                      -- REPLICATE  |PARTITIONED|
+                                                                                                                        -- HASH_PARTITION_EXCHANGE [$$198]  |PARTITIONED|
+                                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                            -- ASSIGN  |PARTITIONED|
+                                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                    -- REPLICATE  |PARTITIONED|
+                                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                            -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                          -- HASH_PARTITION_EXCHANGE [$$149]  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- REPLICATE  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- SORT_GROUP_BY[$$227]  |PARTITIONED|
+                                              {
+                                                -- AGGREGATE  |LOCAL|
+                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                              }
+                                        -- HASH_PARTITION_EXCHANGE [$$227]  |PARTITIONED|
+                                          -- PRE_CLUSTERED_GROUP_BY[$$177]  |PARTITIONED|
+                                                  {
+                                                    -- AGGREGATE  |LOCAL|
+                                                      -- STREAM_SELECT  |LOCAL|
+                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                  }
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- STABLE_SORT [$$177(ASC)]  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- HYBRID_HASH_JOIN [$$152][$$151]  |PARTITIONED|
+                                                          -- HASH_PARTITION_EXCHANGE [$$152]  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- STREAM_SELECT  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- PRE_CLUSTERED_GROUP_BY[$$178]  |PARTITIONED|
+                                                                                  {
+                                                                                    -- AGGREGATE  |LOCAL|
+                                                                                      -- AGGREGATE  |LOCAL|
+                                                                                        -- STREAM_SELECT  |LOCAL|
+                                                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                  }
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- STABLE_SORT [$$178(ASC)]  |PARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- HYBRID_HASH_JOIN [$$178][$$181]  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                            -- ASSIGN  |PARTITIONED|
+                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                -- REPLICATE  |PARTITIONED|
+                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                    -- PRE_CLUSTERED_GROUP_BY[$$199]  |PARTITIONED|
+                                                                                                            {
+                                                                                                              -- AGGREGATE  |LOCAL|
+                                                                                                                -- AGGREGATE  |LOCAL|
+                                                                                                                  -- STREAM_SELECT  |LOCAL|
+                                                                                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                            }
+                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                            -- HYBRID_HASH_JOIN [$$199][$$200]  |PARTITIONED|
+                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                                 -- REPLICATE  |PARTITIONED|
-                                                                                                                  -- HASH_PARTITION_EXCHANGE [$$192]  |PARTITIONED|
-                                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                    -- BTREE_SEARCH (tpcds.item.item)  |PARTITIONED|
+                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                        -- ASSIGN  |PARTITIONED|
+                                                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                      -- REPLICATE  |PARTITIONED|
+                                                                                                                        -- HASH_PARTITION_EXCHANGE [$$198]  |PARTITIONED|
+                                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                            -- ASSIGN  |PARTITIONED|
+                                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                    -- REPLICATE  |PARTITIONED|
+                                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                            -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- SORT_GROUP_BY[$$224]  |PARTITIONED|
+                                                                                                      {
+                                                                                                        -- AGGREGATE  |LOCAL|
+                                                                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                      }
+                                                                                                -- HASH_PARTITION_EXCHANGE [$$224]  |PARTITIONED|
+                                                                                                  -- PRE_CLUSTERED_GROUP_BY[$$194]  |PARTITIONED|
+                                                                                                          {
+                                                                                                            -- AGGREGATE  |LOCAL|
+                                                                                                              -- STREAM_SELECT  |LOCAL|
+                                                                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                          }
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- STABLE_SORT [$$194(ASC)]  |PARTITIONED|
+                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                -- HYBRID_HASH_JOIN [$$196][$$198]  |PARTITIONED|
+                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                    -- REPLICATE  |PARTITIONED|
+                                                                                                                      -- HASH_PARTITION_EXCHANGE [$$196]  |PARTITIONED|
                                                                                                                         -- STREAM_PROJECT  |PARTITIONED|
                                                                                                                           -- ASSIGN  |PARTITIONED|
-                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                              -- REPLICATE  |PARTITIONED|
+                                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                              -- STREAM_SELECT  |PARTITIONED|
                                                                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                  -- REPLICATE  |PARTITIONED|
                                                                                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                                      -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                                                      -- PRE_CLUSTERED_GROUP_BY[$$199]  |PARTITIONED|
+                                                                                                                                              {
+                                                                                                                                                -- AGGREGATE  |LOCAL|
+                                                                                                                                                  -- AGGREGATE  |LOCAL|
+                                                                                                                                                    -- STREAM_SELECT  |LOCAL|
+                                                                                                                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                                                              }
                                                                                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                                -- HASH_PARTITION_EXCHANGE [$$187]  |PARTITIONED|
-                                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                                    -- ASSIGN  |PARTITIONED|
-                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                        -- REPLICATE  |PARTITIONED|
-                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
-                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                        -- HASH_PARTITION_EXCHANGE [$$151]  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ASSIGN  |PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- REPLICATE  |PARTITIONED|
-                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                              -- HYBRID_HASH_JOIN [$$199][$$200]  |PARTITIONED|
+                                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                  -- REPLICATE  |PARTITIONED|
+                                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                      -- BTREE_SEARCH (tpcds.item.item)  |PARTITIONED|
+                                                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                        -- REPLICATE  |PARTITIONED|
+                                                                                                                                                          -- HASH_PARTITION_EXCHANGE [$$198]  |PARTITIONED|
+                                                                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                      -- REPLICATE  |PARTITIONED|
+                                                                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                              -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                    -- REPLICATE  |PARTITIONED|
+                                                                                                                      -- HASH_PARTITION_EXCHANGE [$$198]  |PARTITIONED|
+                                                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                  -- REPLICATE  |PARTITIONED|
+                                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                          -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                          -- HASH_PARTITION_EXCHANGE [$$151]  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ASSIGN  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- REPLICATE  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.1.ddl.sqlpp
new file mode 100644
index 0000000..0f2bec8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.1.ddl.sqlpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type test.TestType as
+{
+  id : integer
+};
+
+create dataset cart(TestType) primary key id;
+
+select c1.cid, i1.pid, i1.ts
+from cart c1 unnest c1.items i1
+let dup = (
+  select value i2.pid
+  from cart c2 unnest c2.items i2
+  where i2.ts >= 2000
+  group by i2.pid
+  having count(*) > 1
+)
+where i1.ts >= 2000 and i1.pid in dup
+order by c1.cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.2.update.sqlpp
new file mode 100644
index 0000000..0e2d620
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.2.update.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into cart([
+  {
+    "id":1,
+    "cid":"c1",
+    "items":[
+      { "pid":"p1","qty":1,"price":10,"ts":1000 },
+      { "pid":"p2","qty":1,"price":20,"ts":2000 },
+      { "pid":"p3","qty":1,"price":30,"ts":3001 }
+    ]
+  },
+  {
+    "id":2,
+    "cid":"c2",
+    "items":[
+      { "pid":"p1","qty":1,"price":11,"ts":1100 },
+      { "pid":"p2","qty":1,"price":21,"ts":2100 },
+      { "pid":"p4","qty":1,"price":41,"ts":4101 }
+    ]
+  }
+]);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.3.query.sqlpp
new file mode 100644
index 0000000..d69363f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.3.query.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+select c1.cid, i1.pid, i1.ts
+from cart c1 unnest c1.items i1
+where i1.ts >= 2000 and i1.pid in
+(
+  select value i2.pid
+  from cart c2 unnest c2.items i2
+  where i2.ts >= 2000
+  group by i2.pid
+  having count(*) > 1
+)
+order by c1.cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.4.query.sqlpp
new file mode 100644
index 0000000..5086e50
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.4.query.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+--- test with subplan into subplan pushdown enabled (default)
+--- set `compiler.subplan.merge` "true";
+
+select c1.cid, i1.pid, i1.ts
+from cart c1 unnest c1.items i1
+let dup = (
+  select value i2.pid
+  from cart c2 unnest c2.items i2
+  where i2.ts >= 2000
+  group by i2.pid
+  having count(*) > 1
+)
+where i1.ts >= 2000 and i1.pid in dup
+order by c1.cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.5.query.sqlpp
new file mode 100644
index 0000000..7a0a065
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.5.query.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+--- test with subplan into subplan pushdown enabled (default)
+--- and hash-bcast join hint
+--- set `compiler.subplan.merge` "true";
+
+select c1.cid, i1.pid, i1.ts
+from cart c1 unnest c1.items i1
+let dup = (
+  select value i2.pid
+  from cart c2 unnest c2.items i2
+  where i2.ts >= 2000
+  group by i2.pid
+  having count(*) > 1
+)
+where i1.ts >= 2000 and i1.pid /* +hash-bcast */ in dup
+order by c1.cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.6.query.sqlpp
new file mode 100644
index 0000000..45b5441
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.6.query.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+--- test with subplan into subplan pushdown disabled
+set `compiler.subplan.merge` "false";
+
+select c1.cid, i1.pid, i1.ts
+from cart c1 unnest c1.items i1
+let dup = (
+  select value i2.pid
+  from cart c2 unnest c2.items i2
+  where i2.ts >= 2000
+  group by i2.pid
+  having count(*) > 1
+)
+where i1.ts >= 2000 and i1.pid in dup
+order by c1.cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.7.query.sqlpp
new file mode 100644
index 0000000..80dd4a1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/in_let/in_let.7.query.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+--- test with nested subplan pushdown disabled
+--- (in this case same result as in compiler.subplan.merge=false)
+set `compiler.subplan.nestedpushdown` "false";
+
+select c1.cid, i1.pid, i1.ts
+from cart c1 unnest c1.items i1
+let dup = (
+  select value i2.pid
+  from cart c2 unnest c2.items i2
+  where i2.ts >= 2000
+  group by i2.pid
+  having count(*) > 1
+)
+where i1.ts >= 2000 and i1.pid in dup
+order by c1.cid;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2815/query-ASTERIXDB-2815.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2815/query-ASTERIXDB-2815.1.ddl.sqlpp
new file mode 100644
index 0000000..7de1e4e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2815/query-ASTERIXDB-2815.1.ddl.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type t1 as {
+ _id: uuid
+};
+
+create dataset RawTweet(t1) primary key _id autogenerated;
+
+create dataset Evidence(t1) primary key _id autogenerated;
+
+create dataset Verification(t1) primary key _id autogenerated;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2815/query-ASTERIXDB-2815.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2815/query-ASTERIXDB-2815.2.update.sqlpp
new file mode 100644
index 0000000..29a4ff6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2815/query-ASTERIXDB-2815.2.update.sqlpp
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into RawTweet ([
+  { "id" : 1, "full_text": "text_1" }, -- total evidence : 2
+  { "id" : 2, "full_text": "text_2" }, -- total evidence : 4 (*) -- satisfies the query
+  { "id" : 3, "full_text": "text_3" }, -- total evidence : 0
+  { "id" : 4, "full_text": "text_4" }, -- total evidence : 6 (*) -- satisfies the query
+  { "id" : 5, "full_text": "text_5" }  -- total evidence : 1
+  ]);
+
+insert into Verification ([
+  { "ver_id" : 1001, "tweet_id": 1, "evidence": [ 2001, 2002 ] },
+  { "ver_id" : 1002, "tweet_id": 2, "evidence": [ 2003, 2004 ] },
+  { "ver_id" : 1003, "tweet_id": 2, "evidence": [ 2009, 2010 ] },
+  { "ver_id" : 1004, "tweet_id": 4, "evidence": [ 2004, 2005, 2006 ] },
+  { "ver_id" : 1005, "tweet_id": 4, "evidence": [ 2007, 2008, 2009 ] },
+  { "ver_id" : 1006, "tweet_id": 5, "evidence": [ 2001 ] }
+]);
+
+insert into Evidence ([
+  { "ev_id" : 2001, "url": "http://example.org/2001" },
+  { "ev_id" : 2002, "url": "http://example.org/2002" },
+  { "ev_id" : 2003, "url": "http://example.org/2003" },
+  { "ev_id" : 2004, "url": "http://example.org/2004" },
+  { "ev_id" : 2005, "url": "http://example.org/2005" },
+  { "ev_id" : 2006, "url": "http://example.org/2006" },
+  { "ev_id" : 2007, "url": "http://example.org/2007" },
+  { "ev_id" : 2008, "url": "http://example.org/2008" },
+  { "ev_id" : 2009, "url": "http://example.org/2009" },
+  { "ev_id" : 2010, "url": "http://example.org/2010" }
+]);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2815/query-ASTERIXDB-2815.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2815/query-ASTERIXDB-2815.3.query.sqlpp
new file mode 100644
index 0000000..9072bb7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/query-ASTERIXDB-2815/query-ASTERIXDB-2815.3.query.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+select t.id, array_sort(urls) urls
+from RawTweet t
+let urls = (
+  select distinct value e.url
+  from Verification v, v.evidence ve, Evidence e
+  where t.id = v.tweet_id and ve = e.ev_id
+)
+where array_count(urls) > 2
+order by t.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1581-2/query-ASTERIXDB-1581-2.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1581-2/query-ASTERIXDB-1581-2.3.query.sqlpp
index 329ac5e..303d899 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1581-2/query-ASTERIXDB-1581-2.3.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1581-2/query-ASTERIXDB-1581-2.3.query.sqlpp
@@ -19,6 +19,9 @@
 
 use tpcds;
 
+--- test with subplan into subplan pushdown disabled
+set `compiler.subplan.merge` "false";
+
 select case when (select value count(ss)
                   from store_sales ss
                   where ss_quantity >= 1 and ss_quantity <= 20)[0] < 25437
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1581-2/query-ASTERIXDB-1581-2.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1581-2/query-ASTERIXDB-1581-2.4.query.sqlpp
new file mode 100644
index 0000000..fff3f85
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1581-2/query-ASTERIXDB-1581-2.4.query.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpcds;
+
+--- test with subplan into subplan pushdown enabled (default)
+--- set `compiler.subplan.merge` "true";
+
+select case when (select value count(ss)
+                  from store_sales ss
+                  where ss_quantity >= 1 and ss_quantity <= 20)[0] < 25437
+            then (select avg(ss_ext_discount_amt)
+                  from store_sales
+                  where ss_quantity >= 1 and ss_quantity <= 20)
+            else (select avg(ss_net_profit)
+                  from store_sales
+                  where ss_quantity >= 1 and ss_quantity <= 20)
+            end bucket1
+from item
+where i_item_sk = 1;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1581-correlated/query-ASTERIXDB-1581-correlated.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1581-correlated/query-ASTERIXDB-1581-correlated.3.query.sqlpp
index a46ffe0..21526c1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1581-correlated/query-ASTERIXDB-1581-correlated.3.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1581-correlated/query-ASTERIXDB-1581-correlated.3.query.sqlpp
@@ -19,6 +19,9 @@
 
 use tpcds;
 
+--- test with subplan into subplan pushdown disabled
+set `compiler.subplan.merge` "false";
+
 // The case expression contains correlated subqueries.
 select case when (select value count(ss)
                   from store_sales ss
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1581-correlated/query-ASTERIXDB-1581-correlated.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1581-correlated/query-ASTERIXDB-1581-correlated.4.query.sqlpp
new file mode 100644
index 0000000..cb856d5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpcds/query-ASTERIXDB-1581-correlated/query-ASTERIXDB-1581-correlated.4.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpcds;
+
+--- test with subplan into subplan pushdown enabled (default)
+--- set `compiler.subplan.merge` "true";
+
+// The case expression contains correlated subqueries.
+select case when (select value count(ss)
+                  from store_sales ss
+                  where ss_quantity = item.i_item_sk)[0] < 25437
+            then (select avg(ss_ext_discount_amt)
+                  from store_sales
+                  where ss_quantity = item.i_item_sk)
+            else (select avg(ss_net_profit)
+                  from store_sales
+                  where ss_quantity = item.i_item_sk)
+            end bucket1
+from item
+where i_item_sk = 2;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index cc38ea0..378fc83 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -18,6 +18,8 @@
     "compiler\.sort\.parallel" : false,
     "compiler\.sort\.samples" : 100,
     "compiler\.sortmemory" : 327680,
+    "compiler\.subplan\.merge" : true,
+    "compiler\.subplan\.nestedpushdown" : true,
     "compiler\.textsearchmemory" : 163840,
     "compiler\.windowmemory" : 196608,
     "default\.dir" : "target/io/dir/asterixdb",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index e8af59d..5ff1b7a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -18,6 +18,8 @@
     "compiler\.sort\.parallel" : true,
     "compiler\.sort\.samples" : 100,
     "compiler\.sortmemory" : 327680,
+    "compiler\.subplan\.merge" : true,
+    "compiler\.subplan\.nestedpushdown" : true,
     "compiler\.textsearchmemory" : 163840,
     "compiler\.windowmemory" : 196608,
     "default\.dir" : "target/io/dir/asterixdb",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 9d877c9..2e76d04 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -18,6 +18,8 @@
     "compiler\.sort\.parallel" : true,
     "compiler\.sort\.samples" : 100,
     "compiler\.sortmemory" : 327680,
+    "compiler\.subplan\.merge" : true,
+    "compiler\.subplan\.nestedpushdown" : true,
     "compiler\.textsearchmemory" : 163840,
     "compiler\.windowmemory" : 196608,
     "default\.dir" : "target/io/dir/asterixdb",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/in_let/in_let.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/in_let/in_let.3.adm
new file mode 100644
index 0000000..f63d51b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/in_let/in_let.3.adm
@@ -0,0 +1,2 @@
+{ "cid": "c1", "pid": "p2", "ts": 2000 }
+{ "cid": "c2", "pid": "p2", "ts": 2100 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/in_let/in_let.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/in_let/in_let.4.adm
new file mode 100644
index 0000000..f63d51b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/in_let/in_let.4.adm
@@ -0,0 +1,2 @@
+{ "cid": "c1", "pid": "p2", "ts": 2000 }
+{ "cid": "c2", "pid": "p2", "ts": 2100 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/in_let/in_let.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/in_let/in_let.5.adm
new file mode 100644
index 0000000..f63d51b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/in_let/in_let.5.adm
@@ -0,0 +1,2 @@
+{ "cid": "c1", "pid": "p2", "ts": 2000 }
+{ "cid": "c2", "pid": "p2", "ts": 2100 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/in_let/in_let.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/in_let/in_let.6.adm
new file mode 100644
index 0000000..f63d51b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/in_let/in_let.6.adm
@@ -0,0 +1,2 @@
+{ "cid": "c1", "pid": "p2", "ts": 2000 }
+{ "cid": "c2", "pid": "p2", "ts": 2100 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/in_let/in_let.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/in_let/in_let.7.adm
new file mode 100644
index 0000000..f63d51b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/in_let/in_let.7.adm
@@ -0,0 +1,2 @@
+{ "cid": "c1", "pid": "p2", "ts": 2000 }
+{ "cid": "c2", "pid": "p2", "ts": 2100 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/query-ASTERIXDB-2815/query-ASTERIXDB-2815.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/query-ASTERIXDB-2815/query-ASTERIXDB-2815.3.adm
new file mode 100644
index 0000000..448de2d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/query-ASTERIXDB-2815/query-ASTERIXDB-2815.3.adm
@@ -0,0 +1,2 @@
+{ "urls": [ "http://example.org/2003", "http://example.org/2004", "http://example.org/2009", "http://example.org/2010" ], "id": 2 }
+{ "urls": [ "http://example.org/2004", "http://example.org/2005", "http://example.org/2006", "http://example.org/2007", "http://example.org/2008", "http://example.org/2009" ], "id": 4 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/query-ASTERIXDB-1581-2/query-ASTERIXDB-1581-2.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/query-ASTERIXDB-1581-2/query-ASTERIXDB-1581-2.2.adm
new file mode 100644
index 0000000..2f7bdfc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/query-ASTERIXDB-1581-2/query-ASTERIXDB-1581-2.2.adm
@@ -0,0 +1 @@
+{ "bucket1": [ { "$1": 24.261666666666667 } ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/query-ASTERIXDB-1581-correlated/query-ASTERIXDB-1581-correlated.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/query-ASTERIXDB-1581-correlated/query-ASTERIXDB-1581-correlated.2.adm
new file mode 100644
index 0000000..d9395d8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/tpcds/query-ASTERIXDB-1581-correlated/query-ASTERIXDB-1581-correlated.2.adm
@@ -0,0 +1 @@
+{ "bucket1": [ { "$1": 46.03 } ] }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index f647495..837e050 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -10318,6 +10318,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="subquery">
+      <compilation-unit name="in_let">
+        <output-dir compare="Text">in_let</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="subquery">
       <compilation-unit name="not_exists">
         <output-dir compare="Text">not_exists</output-dir>
       </compilation-unit>
@@ -10405,6 +10410,11 @@
         <output-dir compare="Text">query-ASTERIXDB-1674</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="subquery">
+      <compilation-unit name="query-ASTERIXDB-2815">
+        <output-dir compare="Text">query-ASTERIXDB-2815</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="subset-collection">
     <test-case FilePath="subset-collection">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index 442a3e0..9340110 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -83,7 +83,15 @@
         COMPILER_EXTERNAL_FIELD_PUSHDOWN(
                 BOOLEAN,
                 AlgebricksConfig.EXTERNAL_FIELD_PUSHDOWN_DEFAULT,
-                "Enable pushdown of field accesses to the external dataset data-scan operator");
+                "Enable pushdown of field accesses to the external dataset data-scan operator"),
+        COMPILER_SUBPLAN_MERGE(
+                BOOLEAN,
+                AlgebricksConfig.SUBPLAN_MERGE_DEFAULT,
+                "Enable merging subplans with other subplans"),
+        COMPILER_SUBPLAN_NESTEDPUSHDOWN(
+                BOOLEAN,
+                AlgebricksConfig.SUBPLAN_NESTEDPUSHDOWN_DEFAULT,
+                "When merging subplans into groupby/suplan allow nesting of subplans");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -138,6 +146,10 @@
 
     public static final String COMPILER_EXTERNAL_FIELD_PUSHDOWN_KEY = Option.COMPILER_EXTERNAL_FIELD_PUSHDOWN.ini();
 
+    public static final String COMPILER_SUBPLAN_MERGE_KEY = Option.COMPILER_SUBPLAN_MERGE.ini();
+
+    public static final String COMPILER_SUBPLAN_NESTEDPUSHDOWN_KEY = Option.COMPILER_SUBPLAN_NESTEDPUSHDOWN.ini();
+
     public static final int COMPILER_PARALLELISM_AS_STORAGE = 0;
 
     public CompilerProperties(PropertiesAccessor accessor) {
@@ -191,4 +203,12 @@
     public boolean isFieldAccessPushdown() {
         return accessor.getBoolean(Option.COMPILER_EXTERNAL_FIELD_PUSHDOWN);
     }
+
+    public boolean getSubplanMerge() {
+        return accessor.getBoolean(Option.COMPILER_SUBPLAN_MERGE);
+    }
+
+    public boolean getSubplanNestedPushdown() {
+        return accessor.getBoolean(Option.COMPILER_SUBPLAN_NESTEDPUSHDOWN);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
index 0402b97..5079b25 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
@@ -66,6 +66,10 @@
                 compilerProperties.isSanityCheck());
         boolean externalFieldPushdown = getBoolean(querySpecificConfig,
                 CompilerProperties.COMPILER_EXTERNAL_FIELD_PUSHDOWN_KEY, compilerProperties.isFieldAccessPushdown());
+        boolean subplanMerge = getBoolean(querySpecificConfig, CompilerProperties.COMPILER_SUBPLAN_MERGE_KEY,
+                compilerProperties.getSubplanMerge());
+        boolean subplanNestedPushdown = getBoolean(querySpecificConfig,
+                CompilerProperties.COMPILER_SUBPLAN_NESTEDPUSHDOWN_KEY, compilerProperties.getSubplanNestedPushdown());
 
         PhysicalOptimizationConfig physOptConf = new PhysicalOptimizationConfig();
         physOptConf.setFrameSize(frameSize);
@@ -79,6 +83,8 @@
         physOptConf.setIndexOnly(indexOnly);
         physOptConf.setSanityCheckEnabled(sanityCheck);
         physOptConf.setExternalFieldPushdown(externalFieldPushdown);
+        physOptConf.setSubplanMerge(subplanMerge);
+        physOptConf.setSubplanNestedPushdown(subplanNestedPushdown);
         return physOptConf;
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
index 6fd1c6a..297f08e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
@@ -25,14 +25,19 @@
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.hyracks.storage.common.LocalResource;
 
+@SuppressWarnings("squid:S2160") // don't override equals
 public class DatasetResourceReference extends ResourceReference {
 
-    private int datasetId;
-    private int partitionId;
-    private long resourceId;
+    private final int datasetId;
+    private final int partitionId;
+    private final long resourceId;
 
-    private DatasetResourceReference() {
-        super();
+    private DatasetResourceReference(LocalResource localResource) {
+        super(Paths.get(localResource.getPath(), StorageConstants.METADATA_FILE_NAME).toString());
+        final DatasetLocalResource dsResource = (DatasetLocalResource) localResource.getResource();
+        datasetId = dsResource.getDatasetId();
+        partitionId = dsResource.getPartition();
+        resourceId = localResource.getId();
     }
 
     public static DatasetResourceReference of(LocalResource localResource) {
@@ -53,39 +58,6 @@
     }
 
     private static DatasetResourceReference parse(LocalResource localResource) {
-        final DatasetResourceReference datasetResourceReference = new DatasetResourceReference();
-        final String filePath = Paths.get(localResource.getPath(), StorageConstants.METADATA_FILE_NAME).toString();
-        parse(datasetResourceReference, filePath);
-        assignIds(localResource, datasetResourceReference);
-        return datasetResourceReference;
-    }
-
-    private static void assignIds(LocalResource localResource, DatasetResourceReference lrr) {
-        final DatasetLocalResource dsResource = (DatasetLocalResource) localResource.getResource();
-        lrr.datasetId = dsResource.getDatasetId();
-        lrr.partitionId = dsResource.getPartition();
-        lrr.resourceId = localResource.getId();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o instanceof ResourceReference) {
-            ResourceReference that = (ResourceReference) o;
-            return getRelativePath().toString().equals(that.getRelativePath().toString());
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        return getRelativePath().toString().hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return getRelativePath().toString();
+        return new DatasetResourceReference(localResource);
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index c3b6229..7791926 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -28,15 +28,29 @@
 
 public class ResourceReference {
 
-    protected String root;
-    protected String partition;
-    protected String dataverse; // == DataverseName.getCanonicalForm()
-    protected String dataset;
-    protected String rebalance;
-    protected String index;
-    protected String name;
+    protected final String root;
+    protected final String partition;
+    protected final String dataverse; // == DataverseName.getCanonicalForm()
+    protected final String dataset;
+    protected final String rebalance;
+    protected final String index;
+    protected final String name;
+    private volatile Path relativePath;
 
-    protected ResourceReference() {
+    protected ResourceReference(String path) {
+        // format: root/partition/dataverse/dataset/rebalanceCount/index/fileName
+        final String[] tokens = StringUtils.split(path, File.separatorChar);
+        if (tokens.length < 6) {
+            throw new IllegalStateException("Unrecognized path structure: " + path);
+        }
+        int offset = tokens.length;
+        name = tokens[--offset];
+        index = tokens[--offset];
+        rebalance = tokens[--offset];
+        dataset = tokens[--offset];
+        dataverse = tokens[--offset]; //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
+        partition = tokens[--offset];
+        root = tokens[--offset];
     }
 
     public static ResourceReference ofIndex(String indexPath) {
@@ -44,9 +58,7 @@
     }
 
     public static ResourceReference of(String localResourcePath) {
-        ResourceReference lrr = new ResourceReference();
-        parse(lrr, localResourcePath);
-        return lrr;
+        return new ResourceReference(localResourcePath);
     }
 
     public String getPartition() {
@@ -74,7 +86,10 @@
     }
 
     public Path getRelativePath() {
-        return Paths.get(root, partition, dataverse, dataset, rebalance, index);
+        if (relativePath == null) {
+            relativePath = Paths.get(root, partition, dataverse, dataset, rebalance, index);
+        }
+        return relativePath;
     }
 
     public ResourceReference getDatasetReference() {
@@ -86,22 +101,6 @@
         return Paths.get(root, partition, dataverse, dataset, rebalance, index, name);
     }
 
-    protected static void parse(ResourceReference ref, String path) {
-        // format: root/partition/dataverse/dataset/rebalanceCount/index/fileName
-        final String[] tokens = StringUtils.split(path, File.separatorChar);
-        if (tokens.length < 6) {
-            throw new IllegalStateException("Unrecognized path structure: " + path);
-        }
-        int offset = tokens.length;
-        ref.name = tokens[--offset];
-        ref.index = tokens[--offset];
-        ref.rebalance = tokens[--offset];
-        ref.dataset = tokens[--offset];
-        ref.dataverse = tokens[--offset]; //TODO(MULTI_PART_DATAVERSE_NAME):REVISIT
-        ref.partition = tokens[--offset];
-        ref.root = tokens[--offset];
-    }
-
     public int getPartitionNum() {
         return Integer.parseInt(partition.substring(StorageConstants.PARTITION_DIR_PREFIX.length()));
     }
@@ -113,14 +112,14 @@
         }
         if (o instanceof ResourceReference) {
             ResourceReference that = (ResourceReference) o;
-            return getRelativePath().toString().equals(that.getRelativePath().toString());
+            return getRelativePath().equals(that.getRelativePath());
         }
         return false;
     }
 
     @Override
     public int hashCode() {
-        return getRelativePath().toString().hashCode();
+        return getRelativePath().hashCode();
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractOperatorWithNestedPlans.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractOperatorWithNestedPlans.java
index 1b20c69..5236280 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractOperatorWithNestedPlans.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractOperatorWithNestedPlans.java
@@ -63,6 +63,14 @@
         return allRoots;
     }
 
+    public int getNumberOfRoots() {
+        int n = 0;
+        for (ILogicalPlan p : nestedPlans) {
+            n += p.getRoots().size();
+        }
+        return n;
+    }
+
     //
     // @Override
     // public void computeConstraintsAndEquivClasses() {
@@ -123,5 +131,4 @@
     public abstract void getUsedVariablesExceptNestedPlans(Collection<LogicalVariable> vars);
 
     public abstract void getProducedVariablesExceptNestedPlans(Collection<LogicalVariable> vars);
-
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
index 62b6a2d..4e86e2e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
@@ -76,10 +76,6 @@
         schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
     }
 
-    public void substituteVar(LogicalVariable v1, LogicalVariable v2) {
-        // do nothing
-    }
-
     public int getOutputArity() {
         return outputArity;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
index 202c291..ade9552 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
@@ -23,11 +23,9 @@
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
@@ -80,15 +78,6 @@
         for (int i = 0; i < n; i++) {
             env.setVarType(variables.get(i), ctx.getExpressionTypeComputer().getType(expressions.get(i).getValue(),
                     ctx.getMetadataProvider(), env));
-            if (expressions.get(i).getValue().getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                LogicalVariable var =
-                        ((VariableReferenceExpression) expressions.get(i).getValue()).getVariableReference();
-                for (List<LogicalVariable> list : env.getCorrelatedMissableVariableLists()) {
-                    if (list.contains(var)) {
-                        list.add(variables.get(i));
-                    }
-                }
-            }
         }
         return env;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
index 9ac6659..673d80d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
@@ -52,8 +52,8 @@
 
     @Override
     public void recomputeSchema() {
-        schema = new ArrayList<LogicalVariable>();
-        schema.addAll(this.getDistinctByVarList());
+        schema = new ArrayList<>();
+        schema.addAll(getDistinctByVarList());
         List<LogicalVariable> inputSchema = inputs.get(0).getValue().getSchema();
         for (LogicalVariable var : inputSchema) {
             if (!schema.contains(var)) {
@@ -66,13 +66,12 @@
     public VariablePropagationPolicy getVariablePropagationPolicy() {
         return new VariablePropagationPolicy() {
             @Override
-            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
-                    throws AlgebricksException {
-                /** make sure distinct key vars laid-out first */
+            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources) {
+                /* make sure distinct key vars laid-out first */
                 for (LogicalVariable keyVar : getDistinctByVarList()) {
                     target.addVariable(keyVar);
                 }
-                /** add other source vars */
+                /* add other source vars */
                 for (IOperatorSchema srcSchema : sources) {
                     for (LogicalVariable srcVar : srcSchema)
                         if (target.findVariable(srcVar) < 0) {
@@ -105,7 +104,7 @@
     }
 
     public List<LogicalVariable> getDistinctByVarList() {
-        List<LogicalVariable> varList = new ArrayList<LogicalVariable>(expressions.size());
+        List<LogicalVariable> varList = new ArrayList<>(expressions.size());
         for (Mutable<ILogicalExpression> eRef : expressions) {
             ILogicalExpression e = eRef.getValue();
             if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
@@ -121,7 +120,7 @@
             ILogicalExpression e = eRef.getValue();
             if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
                 VariableReferenceExpression v = (VariableReferenceExpression) e;
-                if (v.getVariableReference() == var) {
+                if (v.getVariableReference().equals(var)) {
                     return true;
                 }
             }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
index 0b280f4..9553913 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
@@ -169,7 +169,7 @@
                         target.addVariable(p.first);
                     } else {
                         if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-                            throw new AlgebricksException("hash group-by expects variable references.");
+                            throw new AlgebricksException("group-by expects variable references.");
                         }
                         VariableReferenceExpression v = (VariableReferenceExpression) expr;
                         target.addVariable(v.getVariableReference());
@@ -178,7 +178,7 @@
                 for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorList) {
                     ILogicalExpression expr = p.second.getValue();
                     if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-                        throw new AlgebricksException("pre-sorted group-by expects variable references.");
+                        throw new AlgebricksException("group-by expects variable references.");
                     }
                     VariableReferenceExpression v = (VariableReferenceExpression) expr;
                     LogicalVariable decor = v.getVariableReference();
@@ -241,6 +241,7 @@
                     env.setVarType(v1, env2.getVarType(v1));
                 }
             } else {
+                // TODO (dmitry): this needs to be revisited
                 VariableReferenceExpression vre = (VariableReferenceExpression) p.second.getValue();
                 LogicalVariable v2 = vre.getVariableReference();
                 env.setVarType(v2, env2.getVarType(v2));
@@ -251,6 +252,7 @@
             if (p.first != null) {
                 env.setVarType(p.first, env2.getType(expr));
             } else {
+                // TODO (dmitry): this needs to be revisited
                 VariableReferenceExpression vre = (VariableReferenceExpression) p.second.getValue();
                 LogicalVariable v2 = vre.getVariableReference();
                 env.setVarType(v2, env2.getVarType(v2));
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
index 154fb13..c84db0e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
@@ -86,6 +86,12 @@
                 b = true;
             }
         }
+        // Filtering
+        if (filterExpr != null) {
+            if (visitor.transform(filterExpr)) {
+                b = true;
+            }
+        }
         // Additional Filtering <For upsert>
         if (additionalFilteringExpressions != null) {
             for (int i = 0; i < additionalFilteringExpressions.size(); i++) {
@@ -94,12 +100,10 @@
                 }
             }
         }
-
         // Upsert indicator var <For upsert>
         if (upsertIndicatorExpr != null && visitor.transform(upsertIndicatorExpr)) {
             b = true;
         }
-
         // Old secondary <For upsert>
         if (prevSecondaryKeyExprs != null) {
             for (int i = 0; i < prevSecondaryKeyExprs.size(); i++) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
index ae90462..ce2f801 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -102,25 +102,22 @@
     }
 
     public void getProducedVariables(Collection<LogicalVariable> producedVariables) {
-        if (upsertIndicatorVar != null) {
+        if (operation == Kind.UPSERT) {
             producedVariables.add(upsertIndicatorVar);
-        }
-        if (prevRecordVar != null) {
             producedVariables.add(prevRecordVar);
-        }
-        if (prevAdditionalNonFilteringVars != null) {
-            producedVariables.addAll(prevAdditionalNonFilteringVars);
-        }
-        if (prevFilterVar != null) {
-            producedVariables.add(prevFilterVar);
+            if (prevAdditionalNonFilteringVars != null) {
+                producedVariables.addAll(prevAdditionalNonFilteringVars);
+            }
+            if (prevFilterVar != null) {
+                producedVariables.add(prevFilterVar);
+            }
         }
     }
 
     @Override
     public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
             throws AlgebricksException {
-        boolean changed = false;
-        changed = transform.transform(payloadExpr);
+        boolean changed = transform.transform(payloadExpr);
         for (Mutable<ILogicalExpression> e : primaryKeyExprs) {
             changed |= transform.transform(e);
         }
@@ -151,8 +148,7 @@
     public VariablePropagationPolicy getVariablePropagationPolicy() {
         return new VariablePropagationPolicy() {
             @Override
-            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
-                    throws AlgebricksException {
+            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources) {
                 if (operation == Kind.UPSERT) {
                     target.addVariable(upsertIndicatorVar);
                     target.addVariable(prevRecordVar);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterJoinOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterJoinOperator.java
index 797c5eb..4e382d2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterJoinOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterJoinOperator.java
@@ -67,8 +67,8 @@
         PropagatingTypeEnvironment env =
                 new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getMissableTypeComputer(),
                         ctx.getMetadataProvider(), TypePropagationPolicy.LEFT_OUTER, envPointers);
-        List<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
-        VariableUtilities.getLiveVariables(inputs.get(1).getValue(), liveVars); // live variables from outer branch can be null together
+        List<LogicalVariable> liveVars = new ArrayList<>();
+        VariableUtilities.getLiveVariables(inputs.get(1).getValue(), liveVars); // live variables from right branch can be MISSING together
         env.getCorrelatedMissableVariableLists().add(liveVars);
         return env;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterUnnestMapOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterUnnestMapOperator.java
index 6bacdb4..cd009c0 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterUnnestMapOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterUnnestMapOperator.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.logical;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -61,10 +60,7 @@
         // Propagates all input variables that come from the outer branch.
         PropagatingTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
 
-        env.getCorrelatedMissableVariableLists().add(new ArrayList<>(variables));
-
-        // For the variables from the inner branch, the output type is the union
-        // of (original type + null).
+        // The produced variables of the this operator are missable because of the left outer semantics.
         for (int i = 0; i < variables.size(); i++) {
             env.setVarType(variables.get(i), ctx.getMissableTypeComputer().makeMissableType(variableTypes.get(i)));
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterUnnestOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterUnnestOperator.java
index 8c95a3f..14996dd 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterUnnestOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/LeftOuterUnnestOperator.java
@@ -14,9 +14,6 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.logical;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -46,21 +43,14 @@
     @Override
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         PropagatingTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
+
+        // The produced variables of the this operator are missable because of the left outer semantics.
         Object t = env.getType(expression.getValue());
-        // For the variables from the inner branch, the output type is the union
-        // of (original type + missing).
         env.setVarType(variables.get(0), ctx.getMissableTypeComputer().makeMissableType(t));
         if (positionalVariable != null) {
             env.setVarType(positionalVariable, ctx.getMissableTypeComputer().makeMissableType(positionalVariableType));
         }
 
-        // The produced variables of the this operator are missable because of the left outer semantics.
-        List<LogicalVariable> missableVars = new ArrayList<>();
-        missableVars.add(variables.get(0));
-        if (positionalVariable != null) {
-            missableVars.add(positionalVariable);
-        }
-        env.getCorrelatedMissableVariableLists().add(missableVars);
         return env;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java
index 26ce137..b2e2dfd 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java
@@ -113,7 +113,7 @@
                 ILogicalExpression a2 = f2.getArguments().get(0).getValue();
                 if (a2.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
                     LogicalVariable var = ((VariableReferenceExpression) a2).getVariableReference();
-                    env.getNonNullVariables().add(var);
+                    env.getNonMissableVariables().add(var);
                 }
             }
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
index 7be761b..62ce60ce 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
@@ -18,7 +18,6 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
@@ -76,10 +75,4 @@
     public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
         return visitor.transform(branchingExpression);
     }
-
-    @Override
-    public void substituteVar(LogicalVariable v1, LogicalVariable v2) {
-        getBranchingExpression().getValue().substituteVar(v1, v2);
-    }
-
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index 54cbcd0..6ed90a5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -88,10 +88,13 @@
     private static final long ONE = 1L;
     private static final long ZERO_OR_ONE_GROUP = 2L;
     private static final long ONE_GROUP = 3L;
-    private static final long UNKNOWN = 1000L;
+    private static final long UNKNOWN = 100L; // so it fits into the auto-boxing cache
 
     private final Set<LogicalVariable> keyVariables = new HashSet<>();
 
+    private CardinalityInferenceVisitor() {
+    }
+
     @Override
     public Long visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
         return ONE;
@@ -244,6 +247,8 @@
 
     @Override
     public Long visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+        // DISTINCT cannot reduce cardinality from ONE to ZERO_OR_ONE, or from ONE_GROUP to ZERO_OR_ONE_GROUP
+        // therefore we don't need to call adjustCardinalityForTupleReductionOperator() here.
         return op.getInputs().get(0).getValue().accept(this, arg);
     }
 
@@ -384,4 +389,15 @@
         return inputCardinality;
     }
 
+    public static boolean isCardinalityExactOne(ILogicalOperator operator) throws AlgebricksException {
+        CardinalityInferenceVisitor visitor = new CardinalityInferenceVisitor();
+        long cardinality = operator.accept(visitor, null);
+        return cardinality == ONE;
+    }
+
+    public static boolean isCardinalityZeroOrOne(ILogicalOperator operator) throws AlgebricksException {
+        CardinalityInferenceVisitor visitor = new CardinalityInferenceVisitor();
+        long cardinality = operator.accept(visitor, null);
+        return cardinality == ZERO_OR_ONE || cardinality == ONE;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 3a4010e..e05d12a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -155,20 +155,12 @@
     }
 
     public ILogicalOperator deepCopy(ILogicalOperator op) throws AlgebricksException {
-        // The deep copy call outside this visitor always has a null argument.
-        return deepCopy(op, null);
-    }
-
-    private ILogicalOperator deepCopy(ILogicalOperator op, ILogicalOperator arg) throws AlgebricksException {
-        if (op == null) {
-            return null;
-        }
         if (reuseFreeVars) {
             // If the reuseFreeVars flag is set, we collect all free variables in the
             // given operator subtree and do not re-map them in the deep-copied plan.
             OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) op, freeVars);
         }
-        ILogicalOperator opCopy = op.accept(this, arg);
+        ILogicalOperator opCopy = deepCopyOperator(op, null);
         if (typeContext != null) {
             OperatorManipulationUtil.computeTypeEnvironmentBottomUp(opCopy, typeContext);
         }
@@ -184,9 +176,13 @@
         }
     }
 
+    private ILogicalOperator deepCopyOperator(ILogicalOperator op, ILogicalOperator arg) throws AlgebricksException {
+        return op != null ? op.accept(this, arg) : null;
+    }
+
     private Mutable<ILogicalOperator> deepCopyOperatorReference(Mutable<ILogicalOperator> opRef, ILogicalOperator arg)
             throws AlgebricksException {
-        return new MutableObject<>(deepCopy(opRef.getValue(), arg));
+        return new MutableObject<>(deepCopyOperator(opRef.getValue(), arg));
     }
 
     private List<Mutable<ILogicalOperator>> deepCopyOperatorReferenceList(List<Mutable<ILogicalOperator>> list,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 61da1b5..a2107e5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -69,6 +70,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -76,9 +78,11 @@
 public class SubstituteVariableVisitor
         implements ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> {
 
-    private final boolean goThroughNts;
     private final ITypingContext ctx;
 
+    //TODO(dmitry):unused -> remove
+    private final boolean goThroughNts;
+
     public SubstituteVariableVisitor(boolean goThroughNts, ITypingContext ctx) {
         this.goThroughNts = goThroughNts;
         this.ctx = ctx;
@@ -87,111 +91,125 @@
     @Override
     public Void visitAggregateOperator(AggregateOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        substAssignVariables(op.getVariables(), op.getExpressions(), pair);
-        substVarTypes(op, pair);
+        boolean producedVarFound =
+                substAssignVariables(op.getVariables(), op.getExpressions(), pair.first, pair.second);
+        if (producedVarFound) {
+            substProducedVarInTypeEnvironment(op, pair);
+        }
         return null;
     }
 
     @Override
     public Void visitAssignOperator(AssignOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        substAssignVariables(op.getVariables(), op.getExpressions(), pair);
-        // Substitute variables stored in ordering property
-        if (op.getExplicitOrderingProperty() != null) {
-            List<OrderColumn> orderColumns = op.getExplicitOrderingProperty().getOrderColumns();
-            for (int i = 0; i < orderColumns.size(); i++) {
-                OrderColumn oc = orderColumns.get(i);
-                if (oc.getColumn().equals(pair.first)) {
-                    orderColumns.set(i, new OrderColumn(pair.second, oc.getOrder()));
+        boolean producedVarFound =
+                substAssignVariables(op.getVariables(), op.getExpressions(), pair.first, pair.second);
+        if (producedVarFound) {
+            // Substitute variables stored in ordering property
+            if (op.getExplicitOrderingProperty() != null) {
+                List<OrderColumn> orderColumns = op.getExplicitOrderingProperty().getOrderColumns();
+                List<OrderColumn> newOrderColumns = new ArrayList<>(orderColumns.size());
+                for (OrderColumn oc : orderColumns) {
+                    LogicalVariable columnVar = oc.getColumn();
+                    LogicalVariable newColumnVar = columnVar.equals(pair.first) ? pair.second : columnVar;
+                    newOrderColumns.add(new OrderColumn(newColumnVar, oc.getOrder()));
                 }
+                op.setExplicitOrderingProperty(new LocalOrderProperty(newOrderColumns));
             }
+
+            substProducedVarInTypeEnvironment(op, pair);
         }
-        substVarTypes(op, pair);
         return null;
     }
 
     @Override
     public Void visitDataScanOperator(DataSourceScanOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        List<LogicalVariable> variables = op.getVariables();
-        for (int i = 0; i < variables.size(); i++) {
-            if (variables.get(i) == pair.first) {
-                variables.set(i, pair.second);
-                return null;
+        boolean producedVarFound = substProducedVariables(op.getVariables(), pair.first, pair.second);
+        if (!producedVarFound) {
+            if (op.isProjectPushed()) {
+                producedVarFound = substProducedVariables(op.getProjectVariables(), pair.first, pair.second);
             }
         }
-        if (op.getSelectCondition() != null) {
-            op.getSelectCondition().getValue().substituteVar(pair.first, pair.second);
+        if (producedVarFound) {
+            substProducedVarInTypeEnvironment(op, pair);
+        } else {
+            substUsedVariablesInExpr(op.getSelectCondition(), pair.first, pair.second);
+            substUsedVariablesInExpr(op.getAdditionalFilteringExpressions(), pair.first, pair.second);
+            substUsedVariables(op.getMinFilterVars(), pair.first, pair.second);
+            substUsedVariables(op.getMaxFilterVars(), pair.first, pair.second);
         }
-        substVarTypes(op, pair);
         return null;
     }
 
     @Override
     public Void visitDistinctOperator(DistinctOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        for (Mutable<ILogicalExpression> eRef : op.getExpressions()) {
-            eRef.getValue().substituteVar(pair.first, pair.second);
-        }
-        substVarTypes(op, pair);
+        substUsedVariablesInExpr(op.getExpressions(), pair.first, pair.second);
         return null;
     }
 
     @Override
     public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op,
             Pair<LogicalVariable, LogicalVariable> pair) {
-        // does not use any variable
+        // does not produce/use any variables
         return null;
     }
 
     @Override
     public Void visitExchangeOperator(ExchangeOperator op, Pair<LogicalVariable, LogicalVariable> pair) {
-        // does not use any variable
+        // does not produce/use any variables
         return null;
     }
 
     @Override
     public Void visitGroupByOperator(GroupByOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        subst(pair.first, pair.second, op.getGroupByList());
-        subst(pair.first, pair.second, op.getDecorList());
-        substInNestedPlans(pair.first, pair.second, op);
-        substVarTypes(op, pair);
+        boolean producedVarFound = substGbyVariables(op.getGroupByList(), pair.first, pair.second);
+        if (!producedVarFound) {
+            producedVarFound = substGbyVariables(op.getDecorList(), pair.first, pair.second);
+        }
+        if (!producedVarFound) {
+            substInNestedPlans(op, pair.first, pair.second);
+        }
+        // GROUP BY operator may add its used variables
+        // to its own output type environment as produced variables
+        // therefore we need perform variable substitution in its own type environment
+        // TODO (dmitry): this needs to be revisited
+        substProducedVarInTypeEnvironment(op, pair);
         return null;
     }
 
     @Override
     public Void visitInnerJoinOperator(InnerJoinOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        op.getCondition().getValue().substituteVar(pair.first, pair.second);
-        substVarTypes(op, pair);
+        substUsedVariablesInExpr(op.getCondition(), pair.first, pair.second);
         return null;
     }
 
     @Override
     public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        op.getCondition().getValue().substituteVar(pair.first, pair.second);
-        substVarTypes(op, pair);
+        substUsedVariablesInExpr(op.getCondition(), pair.first, pair.second);
+        // LEFT OUTER JOIN operator adds its right branch variables
+        // to its own output type environment as 'correlatedMissableVariables'
+        // therefore we need perform variable substitution in its own type environment
+        substProducedVarInTypeEnvironment(op, pair);
         return null;
     }
 
     @Override
     public Void visitLimitOperator(LimitOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        if (op.hasMaxObjects()) {
-            op.getMaxObjects().getValue().substituteVar(pair.first, pair.second);
-        }
-        if (op.hasOffset()) {
-            op.getOffset().getValue().substituteVar(pair.first, pair.second);
-        }
-        substVarTypes(op, pair);
+        substUsedVariablesInExpr(op.getMaxObjects(), pair.first, pair.second);
+        substUsedVariablesInExpr(op.getOffset(), pair.first, pair.second);
         return null;
     }
 
     @Override
     public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op,
             Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
+        // does not produce/use any variables
         return null;
     }
 
@@ -199,88 +217,87 @@
     public Void visitOrderOperator(OrderOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
         for (Pair<IOrder, Mutable<ILogicalExpression>> oe : op.getOrderExpressions()) {
-            oe.second.getValue().substituteVar(pair.first, pair.second);
+            substUsedVariablesInExpr(oe.second, pair.first, pair.second);
         }
-        substVarTypes(op, pair);
         return null;
     }
 
     @Override
     public Void visitProjectOperator(ProjectOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        List<LogicalVariable> usedVariables = op.getVariables();
-        int n = usedVariables.size();
-        for (int i = 0; i < n; i++) {
-            LogicalVariable v = usedVariables.get(i);
-            if (v.equals(pair.first)) {
-                usedVariables.set(i, pair.second);
-            }
-        }
-        substVarTypes(op, pair);
+        substUsedVariables(op.getVariables(), pair.first, pair.second);
         return null;
     }
 
     @Override
     public Void visitRunningAggregateOperator(RunningAggregateOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        substAssignVariables(op.getVariables(), op.getExpressions(), pair);
-        substVarTypes(op, pair);
+        boolean producedVarFound =
+                substAssignVariables(op.getVariables(), op.getExpressions(), pair.first, pair.second);
+        if (producedVarFound) {
+            substProducedVarInTypeEnvironment(op, pair);
+        }
         return null;
     }
 
     @Override
     public Void visitScriptOperator(ScriptOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        substInArray(op.getInputVariables(), pair.first, pair.second);
-        substInArray(op.getOutputVariables(), pair.first, pair.second);
-        substVarTypes(op, pair);
+        boolean producedVarFound = substProducedVariables(op.getOutputVariables(), pair.first, pair.second);
+        if (producedVarFound) {
+            substProducedVarInTypeEnvironment(op, pair);
+        } else {
+            substUsedVariables(op.getInputVariables(), pair.first, pair.second);
+        }
         return null;
     }
 
     @Override
-    public Void visitSelectOperator(SelectOperator op, Pair<LogicalVariable, LogicalVariable> pair) {
-        op.getCondition().getValue().substituteVar(pair.first, pair.second);
+    public Void visitSelectOperator(SelectOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        substUsedVariablesInExpr(op.getCondition(), pair.first, pair.second);
+        // SELECT operator may add its used variable
+        // to its own output type environment as 'nonMissableVariable' (not(is-missing($used_var))
+        // therefore we need perform variable substitution in its own type environment
+        substProducedVarInTypeEnvironment(op, pair);
         return null;
     }
 
     @Override
     public Void visitSubplanOperator(SubplanOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        substInNestedPlans(pair.first, pair.second, op);
+        substInNestedPlans(op, pair.first, pair.second);
+        // do not call substProducedVarInTypeEnvironment() because the variables are produced by nested plans
         return null;
     }
 
     @Override
     public Void visitUnionOperator(UnionAllOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap = op.getVariableMappings();
-        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> t : varMap) {
-            if (t.first.equals(pair.first)) {
-                t.first = pair.second;
-            }
-            if (t.second.equals(pair.first)) {
-                t.second = pair.second;
-            }
-            if (t.third.equals(pair.first)) {
-                t.third = pair.second;
-            }
+        boolean producedVarFound = substUnionAllVariables(op.getVariableMappings(), pair.first, pair.second);
+        if (producedVarFound) {
+            substProducedVarInTypeEnvironment(op, pair);
         }
-        substVarTypes(op, pair);
         return null;
     }
 
     @Override
     public Void visitIntersectOperator(IntersectOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        boolean hasExtraVars = op.hasExtraVariables();
-        substInArray(op.getOutputCompareVariables(), pair.first, pair.second);
-        if (hasExtraVars) {
-            substInArray(op.getOutputExtraVariables(), pair.first, pair.second);
+        boolean producedVarFound = substProducedVariables(op.getOutputCompareVariables(), pair.first, pair.second);
+        if (!producedVarFound) {
+            if (op.hasExtraVariables()) {
+                producedVarFound = substProducedVariables(op.getOutputExtraVariables(), pair.first, pair.second);
+            }
         }
-        for (int i = 0, n = op.getNumInput(); i < n; i++) {
-            substInArray(op.getInputCompareVariables(i), pair.first, pair.second);
-            if (hasExtraVars) {
-                substInArray(op.getInputExtraVariables(i), pair.first, pair.second);
+        if (producedVarFound) {
+            substProducedVarInTypeEnvironment(op, pair);
+        } else {
+            for (int i = 0, n = op.getNumInput(); i < n; i++) {
+                substUsedVariables(op.getInputCompareVariables(i), pair.first, pair.second);
+                if (op.hasExtraVariables()) {
+                    substUsedVariables(op.getInputExtraVariables(i), pair.first, pair.second);
+                }
             }
         }
         return null;
@@ -289,9 +306,11 @@
     @Override
     public Void visitUnnestMapOperator(UnnestMapOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        substituteVarsForAbstractUnnestMapOp(op, pair);
-        if (op.getSelectCondition() != null) {
-            op.getSelectCondition().getValue().substituteVar(pair.first, pair.second);
+        boolean producedVarFound = substituteVarsForAbstractUnnestMapOp(op, pair.first, pair.second);
+        if (producedVarFound) {
+            substProducedVarInTypeEnvironment(op, pair);
+        } else {
+            substUsedVariablesInExpr(op.getSelectCondition(), pair.first, pair.second);
         }
         return null;
     }
@@ -299,81 +318,290 @@
     @Override
     public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op,
             Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
-        substituteVarsForAbstractUnnestMapOp(op, pair);
+        boolean producedVarFound = substituteVarsForAbstractUnnestMapOp(op, pair.first, pair.second);
+        if (producedVarFound) {
+            substProducedVarInTypeEnvironment(op, pair);
+        }
         return null;
     }
 
-    private void substituteVarsForAbstractUnnestMapOp(AbstractUnnestMapOperator op,
-            Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
-        List<LogicalVariable> variables = op.getVariables();
-        for (int i = 0; i < variables.size(); i++) {
-            if (variables.get(i) == pair.first) {
-                variables.set(i, pair.second);
-                return;
-            }
+    private boolean substituteVarsForAbstractUnnestMapOp(AbstractUnnestMapOperator op, LogicalVariable v1,
+            LogicalVariable v2) {
+        boolean producedVarFound = substProducedVariables(op.getVariables(), v1, v2);
+        if (!producedVarFound) {
+            substUsedVariablesInExpr(op.getExpressionRef(), v1, v2);
+            substUsedVariablesInExpr(op.getAdditionalFilteringExpressions(), v1, v2);
+            substUsedVariables(op.getMinFilterVars(), v1, v2);
+            substUsedVariables(op.getMaxFilterVars(), v1, v2);
         }
-        op.getExpressionRef().getValue().substituteVar(pair.first, pair.second);
-        substVarTypes(op, pair);
+        return producedVarFound;
     }
 
     @Override
     public Void visitUnnestOperator(UnnestOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        return visitUnnestNonMapOperator(op, pair);
+        boolean producedVarFound = substituteVarsForAbstractUnnestNonMapOp(op, pair.first, pair.second);
+        if (producedVarFound) {
+            substProducedVarInTypeEnvironment(op, pair);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        boolean producedVarFound = substituteVarsForAbstractUnnestNonMapOp(op, pair.first, pair.second);
+        if (producedVarFound) {
+            substProducedVarInTypeEnvironment(op, pair);
+        }
+        return null;
+    }
+
+    private boolean substituteVarsForAbstractUnnestNonMapOp(AbstractUnnestNonMapOperator op, LogicalVariable v1,
+            LogicalVariable v2) {
+        boolean producedVarFound = substProducedVariables(op.getVariables(), v1, v2);
+        if (!producedVarFound) {
+            if (op.hasPositionalVariable() && op.getPositionalVariable().equals(v1)) {
+                op.setPositionalVariable(v2);
+                producedVarFound = true;
+            }
+        }
+        if (!producedVarFound) {
+            substUsedVariablesInExpr(op.getExpressionRef(), v1, v2);
+        }
+        return producedVarFound;
     }
 
     @Override
     public Void visitWriteOperator(WriteOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        for (Mutable<ILogicalExpression> e : op.getExpressions()) {
-            e.getValue().substituteVar(pair.first, pair.second);
-        }
-        substVarTypes(op, pair);
+        substUsedVariablesInExpr(op.getExpressions(), pair.first, pair.second);
         return null;
     }
 
     @Override
     public Void visitDistributeResultOperator(DistributeResultOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        for (Mutable<ILogicalExpression> e : op.getExpressions()) {
-            e.getValue().substituteVar(pair.first, pair.second);
-        }
-        substVarTypes(op, pair);
+        substUsedVariablesInExpr(op.getExpressions(), pair.first, pair.second);
         return null;
     }
 
     @Override
     public Void visitWriteResultOperator(WriteResultOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
-        op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);
-        for (Mutable<ILogicalExpression> e : op.getKeyExpressions()) {
-            e.getValue().substituteVar(pair.first, pair.second);
-        }
-        substVarTypes(op, pair);
+        substUsedVariablesInExpr(op.getPayloadExpression(), pair.first, pair.second);
+        substUsedVariablesInExpr(op.getKeyExpressions(), pair.first, pair.second);
+        substUsedVariablesInExpr(op.getAdditionalFilteringExpressions(), pair.first, pair.second);
         return null;
     }
 
-    private void subst(LogicalVariable v1, LogicalVariable v2,
-            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> varExprPairList) {
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : varExprPairList) {
+    @Override
+    public Void visitReplicateOperator(ReplicateOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        // does not produce/use any variables
+        return null;
+    }
+
+    @Override
+    public Void visitSplitOperator(SplitOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        substUsedVariablesInExpr(op.getBranchingExpression(), pair.first, pair.second);
+        return null;
+    }
+
+    @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        // does not produce/use any variables
+        return null;
+    }
+
+    @Override
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op,
+            Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
+        boolean producedVarFound = false;
+        if (op.getOperation() == InsertDeleteUpsertOperator.Kind.UPSERT) {
+            if (op.getUpsertIndicatorVar() != null && op.getUpsertIndicatorVar().equals(pair.first)) {
+                op.setUpsertIndicatorVar(pair.second);
+                producedVarFound = true;
+            } else if (op.getBeforeOpRecordVar() != null && op.getBeforeOpRecordVar().equals(pair.first)) {
+                op.setPrevRecordVar(pair.second);
+                producedVarFound = true;
+            } else if (op.getBeforeOpFilterVar() != null && op.getBeforeOpFilterVar().equals(pair.first)) {
+                op.setPrevFilterVar(pair.second);
+                producedVarFound = true;
+            } else {
+                producedVarFound =
+                        substProducedVariables(op.getBeforeOpAdditionalNonFilteringVars(), pair.first, pair.second);
+            }
+        }
+        if (producedVarFound) {
+            substProducedVarInTypeEnvironment(op, pair);
+        } else {
+            substUsedVariablesInExpr(op.getPayloadExpression(), pair.first, pair.second);
+            substUsedVariablesInExpr(op.getPrimaryKeyExpressions(), pair.first, pair.second);
+            substUsedVariablesInExpr(op.getAdditionalFilteringExpressions(), pair.first, pair.second);
+            substUsedVariablesInExpr(op.getAdditionalNonFilteringExpressions(), pair.first, pair.second);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op,
+            Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
+        substUsedVariablesInExpr(op.getPrimaryKeyExpressions(), pair.first, pair.second);
+        substUsedVariablesInExpr(op.getSecondaryKeyExpressions(), pair.first, pair.second);
+        substUsedVariablesInExpr(op.getFilterExpression(), pair.first, pair.second);
+        substUsedVariablesInExpr(op.getAdditionalFilteringExpressions(), pair.first, pair.second);
+        substUsedVariablesInExpr(op.getUpsertIndicatorExpr(), pair.first, pair.second);
+        substUsedVariablesInExpr(op.getPrevSecondaryKeyExprs(), pair.first, pair.second);
+        substUsedVariablesInExpr(op.getPrevAdditionalFilteringExpression(), pair.first, pair.second);
+        return null;
+    }
+
+    @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        boolean producedVarFound = substProducedVariables(op.getTokenizeVars(), pair.first, pair.second);
+        if (producedVarFound) {
+            substProducedVarInTypeEnvironment(op, pair);
+        } else {
+            substUsedVariablesInExpr(op.getPrimaryKeyExpressions(), pair.first, pair.second);
+            substUsedVariablesInExpr(op.getSecondaryKeyExpressions(), pair.first, pair.second);
+            substUsedVariablesInExpr(op.getFilterExpression(), pair.first, pair.second);
+            substUsedVariablesInExpr(op.getAdditionalFilteringExpressions(), pair.first, pair.second);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitForwardOperator(ForwardOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        substUsedVariablesInExpr(op.getSideDataExpression(), pair.first, pair.second);
+        return null;
+    }
+
+    @Override
+    public Void visitSinkOperator(SinkOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        // does not produce/use any variables
+        return null;
+    }
+
+    @Override
+    public Void visitDelegateOperator(DelegateOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+            throws AlgebricksException {
+        // does not produce/use any variables
+        return null;
+    }
+
+    @Override
+    public Void visitWindowOperator(WindowOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        boolean producedVarFound =
+                substAssignVariables(op.getVariables(), op.getExpressions(), pair.first, pair.second);
+        if (producedVarFound) {
+            substProducedVarInTypeEnvironment(op, pair);
+        } else {
+            substUsedVariablesInExpr(op.getPartitionExpressions(), pair.first, pair.second);
+            for (Pair<IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
+                substUsedVariablesInExpr(p.second, pair.first, pair.second);
+            }
+            for (Pair<IOrder, Mutable<ILogicalExpression>> p : op.getFrameValueExpressions()) {
+                substUsedVariablesInExpr(p.second, pair.first, pair.second);
+            }
+            substUsedVariablesInExpr(op.getFrameStartExpressions(), pair.first, pair.second);
+            substUsedVariablesInExpr(op.getFrameStartValidationExpressions(), pair.first, pair.second);
+            substUsedVariablesInExpr(op.getFrameEndExpressions(), pair.first, pair.second);
+            substUsedVariablesInExpr(op.getFrameEndValidationExpressions(), pair.first, pair.second);
+            substUsedVariablesInExpr(op.getFrameExcludeExpressions(), pair.first, pair.second);
+            substUsedVariablesInExpr(op.getFrameExcludeUnaryExpression(), pair.first, pair.second);
+            substUsedVariablesInExpr(op.getFrameOffsetExpression(), pair.first, pair.second);
+            substInNestedPlans(op, pair.first, pair.second);
+        }
+        return null;
+    }
+
+    private void substUsedVariablesInExpr(Mutable<ILogicalExpression> exprRef, LogicalVariable v1, LogicalVariable v2) {
+        if (exprRef != null && exprRef.getValue() != null) {
+            exprRef.getValue().substituteVar(v1, v2);
+        }
+    }
+
+    private void substUsedVariablesInExpr(List<Mutable<ILogicalExpression>> expressions, LogicalVariable v1,
+            LogicalVariable v2) {
+        if (expressions != null) {
+            for (Mutable<ILogicalExpression> exprRef : expressions) {
+                substUsedVariablesInExpr(exprRef, v1, v2);
+            }
+        }
+    }
+
+    private void substUsedVariables(List<LogicalVariable> variables, LogicalVariable v1, LogicalVariable v2) {
+        if (variables != null) {
+            for (int i = 0, n = variables.size(); i < n; i++) {
+                if (variables.get(i).equals(v1)) {
+                    variables.set(i, v2);
+                }
+            }
+        }
+    }
+
+    private boolean substProducedVariables(List<LogicalVariable> variables, LogicalVariable v1, LogicalVariable v2) {
+        if (variables != null) {
+            for (int i = 0, n = variables.size(); i < n; i++) {
+                if (variables.get(i).equals(v1)) {
+                    variables.set(i, v2);
+                    return true; // found produced var
+                }
+            }
+        }
+        return false;
+    }
+
+    private boolean substAssignVariables(List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions,
+            LogicalVariable v1, LogicalVariable v2) {
+        for (int i = 0, n = variables.size(); i < n; i++) {
+            if (variables.get(i).equals(v1)) {
+                variables.set(i, v2);
+                return true; // found produced var
+            } else {
+                expressions.get(i).getValue().substituteVar(v1, v2);
+            }
+        }
+        return false;
+    }
+
+    private boolean substGbyVariables(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyPairList,
+            LogicalVariable v1, LogicalVariable v2) {
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : gbyPairList) {
             if (ve.first != null && ve.first.equals(v1)) {
                 ve.first = v2;
-                return;
+                return true; // found produced var
             }
             ve.second.getValue().substituteVar(v1, v2);
         }
+        return false;
     }
 
-    private void substInArray(List<LogicalVariable> varArray, LogicalVariable v1, LogicalVariable v2) {
-        for (int i = 0; i < varArray.size(); i++) {
-            LogicalVariable v = varArray.get(i);
-            if (v == v1) {
-                varArray.set(i, v2);
+    private boolean substUnionAllVariables(List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap,
+            LogicalVariable v1, LogicalVariable v2) {
+        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> t : varMap) {
+            if (t.first.equals(v1)) {
+                t.first = v2;
+            }
+            if (t.second.equals(v1)) {
+                t.second = v2;
+            }
+            if (t.third.equals(v1)) {
+                t.third = v2;
+                return true; // found produced var
             }
         }
+        return false;
     }
 
-    private void substInNestedPlans(LogicalVariable v1, LogicalVariable v2, AbstractOperatorWithNestedPlans op)
+    private void substInNestedPlans(AbstractOperatorWithNestedPlans op, LogicalVariable v1, LogicalVariable v2)
             throws AlgebricksException {
         for (ILogicalPlan p : op.getNestedPlans()) {
             for (Mutable<ILogicalOperator> r : p.getRoots()) {
@@ -382,164 +610,14 @@
         }
     }
 
-    private void substAssignVariables(List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions,
-            Pair<LogicalVariable, LogicalVariable> pair) {
-        int n = variables.size();
-        for (int i = 0; i < n; i++) {
-            if (variables.get(i).equals(pair.first)) {
-                variables.set(i, pair.second);
-            } else {
-                expressions.get(i).getValue().substituteVar(pair.first, pair.second);
-            }
-        }
-    }
-
-    @Override
-    public Void visitReplicateOperator(ReplicateOperator op, Pair<LogicalVariable, LogicalVariable> arg)
-            throws AlgebricksException {
-        op.substituteVar(arg.first, arg.second);
-        return null;
-    }
-
-    @Override
-    public Void visitSplitOperator(SplitOperator op, Pair<LogicalVariable, LogicalVariable> arg)
-            throws AlgebricksException {
-        op.substituteVar(arg.first, arg.second);
-        return null;
-    }
-
-    @Override
-    public Void visitMaterializeOperator(MaterializeOperator op, Pair<LogicalVariable, LogicalVariable> arg)
-            throws AlgebricksException {
-        return null;
-    }
-
-    @Override
-    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op,
-            Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
-        op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);
-        for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
-            e.getValue().substituteVar(pair.first, pair.second);
-        }
-        substVarTypes(op, pair);
-        return null;
-    }
-
-    @Override
-    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op,
-            Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
-        for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
-            e.getValue().substituteVar(pair.first, pair.second);
-        }
-        for (Mutable<ILogicalExpression> e : op.getSecondaryKeyExpressions()) {
-            e.getValue().substituteVar(pair.first, pair.second);
-        }
-        substVarTypes(op, pair);
-        return null;
-    }
-
-    @Override
-    public Void visitTokenizeOperator(TokenizeOperator op, Pair<LogicalVariable, LogicalVariable> pair)
-            throws AlgebricksException {
-        for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
-            e.getValue().substituteVar(pair.first, pair.second);
-        }
-        for (Mutable<ILogicalExpression> e : op.getSecondaryKeyExpressions()) {
-            e.getValue().substituteVar(pair.first, pair.second);
-        }
-        substVarTypes(op, pair);
-        return null;
-    }
-
-    @Override
-    public Void visitForwardOperator(ForwardOperator op, Pair<LogicalVariable, LogicalVariable> arg)
-            throws AlgebricksException {
-        op.getSideDataExpression().getValue().substituteVar(arg.first, arg.second);
-        substVarTypes(op, arg);
-        return null;
-    }
-
-    @Override
-    public Void visitSinkOperator(SinkOperator op, Pair<LogicalVariable, LogicalVariable> pair)
-            throws AlgebricksException {
-        return null;
-    }
-
-    private void substVarTypes(ILogicalOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+    private void substProducedVarInTypeEnvironment(ILogicalOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
         if (ctx == null) {
             return;
         }
         IVariableTypeEnvironment env = ctx.getOutputTypeEnvironment(op);
         if (env != null) {
-            env.substituteProducedVariable(arg.first, arg.second);
+            env.substituteProducedVariable(pair.first, pair.second);
         }
     }
-
-    @Override
-    public Void visitDelegateOperator(DelegateOperator op, Pair<LogicalVariable, LogicalVariable> arg)
-            throws AlgebricksException {
-        return null;
-    }
-
-    @Override
-    public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Pair<LogicalVariable, LogicalVariable> pair)
-            throws AlgebricksException {
-        return visitUnnestNonMapOperator(op, pair);
-    }
-
-    private Void visitUnnestNonMapOperator(AbstractUnnestNonMapOperator op, Pair<LogicalVariable, LogicalVariable> pair)
-            throws AlgebricksException {
-        List<LogicalVariable> variables = op.getVariables();
-        for (int i = 0; i < variables.size(); i++) {
-            if (variables.get(i) == pair.first) {
-                variables.set(i, pair.second);
-                return null;
-            }
-        }
-        op.getExpressionRef().getValue().substituteVar(pair.first, pair.second);
-        substVarTypes(op, pair);
-        return null;
-    }
-
-    @Override
-    public Void visitWindowOperator(WindowOperator op, Pair<LogicalVariable, LogicalVariable> pair)
-            throws AlgebricksException {
-        for (Mutable<ILogicalExpression> expr : op.getPartitionExpressions()) {
-            expr.getValue().substituteVar(pair.first, pair.second);
-        }
-        for (Pair<IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
-            p.second.getValue().substituteVar(pair.first, pair.second);
-        }
-        for (Pair<IOrder, Mutable<ILogicalExpression>> p : op.getFrameValueExpressions()) {
-            p.second.getValue().substituteVar(pair.first, pair.second);
-        }
-        for (Mutable<ILogicalExpression> expr : op.getFrameStartExpressions()) {
-            expr.getValue().substituteVar(pair.first, pair.second);
-        }
-        for (Mutable<ILogicalExpression> expr : op.getFrameStartValidationExpressions()) {
-            expr.getValue().substituteVar(pair.first, pair.second);
-        }
-        for (Mutable<ILogicalExpression> expr : op.getFrameEndExpressions()) {
-            expr.getValue().substituteVar(pair.first, pair.second);
-        }
-        for (Mutable<ILogicalExpression> expr : op.getFrameEndValidationExpressions()) {
-            expr.getValue().substituteVar(pair.first, pair.second);
-        }
-        for (Mutable<ILogicalExpression> expr : op.getFrameExcludeExpressions()) {
-            expr.getValue().substituteVar(pair.first, pair.second);
-        }
-        ILogicalExpression frameExcludeUnaryExpr = op.getFrameExcludeUnaryExpression().getValue();
-        if (frameExcludeUnaryExpr != null) {
-            frameExcludeUnaryExpr.substituteVar(pair.first, pair.second);
-        }
-        ILogicalExpression frameOffsetExpr = op.getFrameOffsetExpression().getValue();
-        if (frameOffsetExpr != null) {
-            frameOffsetExpr.substituteVar(pair.first, pair.second);
-        }
-        substAssignVariables(op.getVariables(), op.getExpressions(), pair);
-        substInNestedPlans(pair.first, pair.second, op);
-        substVarTypes(op, pair);
-        return null;
-    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/TypePropagationPolicy.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/TypePropagationPolicy.java
index 9d60370..c37c674 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/TypePropagationPolicy.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/TypePropagationPolicy.java
@@ -31,21 +31,22 @@
 
         @Override
         public Object getVarType(LogicalVariable var, IMissableTypeComputer ntc,
-                List<LogicalVariable> nonNullVariableList, List<List<LogicalVariable>> correlatedNullableVariableLists,
-                ITypeEnvPointer... typeEnvs) throws AlgebricksException {
+                List<LogicalVariable> nonMissableVariableList,
+                List<List<LogicalVariable>> correlatedMissableVariableLists, ITypeEnvPointer... typeEnvs)
+                throws AlgebricksException {
             for (ITypeEnvPointer p : typeEnvs) {
                 IVariableTypeEnvironment env = p.getTypeEnv();
                 if (env == null) {
                     throw new AlgebricksException(
                             "Null environment for pointer " + p + " in getVarType for var=" + var);
                 }
-                Object t = env.getVarType(var, nonNullVariableList, correlatedNullableVariableLists);
+                Object t = env.getVarType(var, nonMissableVariableList, correlatedMissableVariableLists);
                 if (t != null) {
                     if (ntc != null && ntc.canBeMissing(t)) {
-                        for (List<LogicalVariable> list : correlatedNullableVariableLists) {
+                        for (List<LogicalVariable> list : correlatedMissableVariableLists) {
                             if (list.contains(var)) {
                                 for (LogicalVariable v : list) {
-                                    if (nonNullVariableList.contains(v)) {
+                                    if (nonMissableVariableList.contains(v)) {
                                         return ntc.getNonOptionalType(t);
                                     }
                                 }
@@ -63,16 +64,17 @@
 
         @Override
         public Object getVarType(LogicalVariable var, IMissableTypeComputer ntc,
-                List<LogicalVariable> nonNullVariableList, List<List<LogicalVariable>> correlatedNullableVariableLists,
-                ITypeEnvPointer... typeEnvs) throws AlgebricksException {
+                List<LogicalVariable> nonMissableVariableList,
+                List<List<LogicalVariable>> correlatedMissableVariableLists, ITypeEnvPointer... typeEnvs)
+                throws AlgebricksException {
             int n = typeEnvs.length;
             // Searches from the inner branch to the outer branch.
             // TODO(buyingyi): A split operator could lead to the case that the type for a variable could be
             // found in both inner and outer branches. Fix computeOutputTypeEnvironment() in ProjectOperator
             // and investigate why many test queries fail if only live variables' types are propagated.
             for (int i = n - 1; i >= 0; i--) {
-                Object t =
-                        typeEnvs[i].getTypeEnv().getVarType(var, nonNullVariableList, correlatedNullableVariableLists);
+                Object t = typeEnvs[i].getTypeEnv().getVarType(var, nonMissableVariableList,
+                        correlatedMissableVariableLists);
                 if (t == null) {
                     continue;
                 }
@@ -82,7 +84,7 @@
 
                 // inner branch
                 boolean nonMissingVarIsProduced = false;
-                for (LogicalVariable v : nonNullVariableList) {
+                for (LogicalVariable v : nonMissableVariableList) {
                     boolean toBreak = false;
                     if (v == var) {
                         nonMissingVarIsProduced = true;
@@ -106,6 +108,6 @@
     };
 
     public abstract Object getVarType(LogicalVariable var, IMissableTypeComputer ntc,
-            List<LogicalVariable> nonNullVariableList, List<List<LogicalVariable>> correlatedNullableVariableLists,
+            List<LogicalVariable> nonMissableVariableList, List<List<LogicalVariable>> correlatedMissableVariableLists,
             ITypeEnvPointer... typeEnvs) throws AlgebricksException;
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java
index 9d2a5da..27aa902 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/PropagatingTypeEnvironment.java
@@ -32,60 +32,74 @@
 
     private final TypePropagationPolicy policy;
 
-    private final IMissableTypeComputer nullableTypeComputer;
+    private final IMissableTypeComputer missableTypeComputer;
 
     private final ITypeEnvPointer[] envPointers;
 
-    private final List<LogicalVariable> nonNullVariables = new ArrayList<>();
+    private final List<LogicalVariable> nonMissableVariables = new ArrayList<>();
 
-    private final List<List<LogicalVariable>> correlatedNullableVariableLists = new ArrayList<>();
+    private final List<List<LogicalVariable>> correlatedMissableVariableLists = new ArrayList<>();
 
     public PropagatingTypeEnvironment(IExpressionTypeComputer expressionTypeComputer,
-            IMissableTypeComputer nullableTypeComputer, IMetadataProvider<?, ?> metadataProvider,
+            IMissableTypeComputer missableTypeComputer, IMetadataProvider<?, ?> metadataProvider,
             TypePropagationPolicy policy, ITypeEnvPointer[] envPointers) {
         super(expressionTypeComputer, metadataProvider);
-        this.nullableTypeComputer = nullableTypeComputer;
+        this.missableTypeComputer = missableTypeComputer;
         this.policy = policy;
         this.envPointers = envPointers;
     }
 
     @Override
     public Object getVarType(LogicalVariable var) throws AlgebricksException {
-        return getVarTypeFullList(var, nonNullVariables, correlatedNullableVariableLists);
+        return getVarTypeFullList(var, nonMissableVariables, correlatedMissableVariableLists);
     }
 
-    public List<LogicalVariable> getNonNullVariables() {
-        return nonNullVariables;
+    public List<LogicalVariable> getNonMissableVariables() {
+        return nonMissableVariables;
     }
 
     public List<List<LogicalVariable>> getCorrelatedMissableVariableLists() {
-        return correlatedNullableVariableLists;
+        return correlatedMissableVariableLists;
     }
 
     @Override
-    public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariableList,
-            List<List<LogicalVariable>> correlatedNullableVariableLists) throws AlgebricksException {
-        for (LogicalVariable v : nonNullVariables) {
-            if (!nonNullVariableList.contains(v)) {
-                nonNullVariableList.add(v);
+    public Object getVarType(LogicalVariable var, List<LogicalVariable> nonMissableVariableList,
+            List<List<LogicalVariable>> correlatedMissableVariableLists) throws AlgebricksException {
+        for (LogicalVariable v : nonMissableVariables) {
+            if (!nonMissableVariableList.contains(v)) {
+                nonMissableVariableList.add(v);
             }
         }
-        Object t = getVarTypeFullList(var, nonNullVariableList, correlatedNullableVariableLists);
-        for (List<LogicalVariable> list : this.correlatedNullableVariableLists) {
-            if (!correlatedNullableVariableLists.contains(list)) {
-                correlatedNullableVariableLists.add(list);
+        Object t = getVarTypeFullList(var, nonMissableVariableList, correlatedMissableVariableLists);
+        for (List<LogicalVariable> list : correlatedMissableVariableLists) {
+            if (!correlatedMissableVariableLists.contains(list)) {
+                correlatedMissableVariableLists.add(list);
             }
         }
         return t;
     }
 
-    private Object getVarTypeFullList(LogicalVariable var, List<LogicalVariable> nonNullVariableList,
-            List<List<LogicalVariable>> correlatedNullableVariableLists) throws AlgebricksException {
+    private Object getVarTypeFullList(LogicalVariable var, List<LogicalVariable> nonMissableVariableList,
+            List<List<LogicalVariable>> correlatedMissableVariableLists) throws AlgebricksException {
         Object t = varTypeMap.get(var);
         if (t != null) {
             return t;
         }
-        return policy.getVarType(var, nullableTypeComputer, nonNullVariableList, correlatedNullableVariableLists,
+        return policy.getVarType(var, missableTypeComputer, nonMissableVariableList, correlatedMissableVariableLists,
                 envPointers);
     }
+
+    @Override
+    public boolean substituteProducedVariable(LogicalVariable v1, LogicalVariable v2) throws AlgebricksException {
+        boolean result = super.substituteProducedVariable(v1, v2);
+        if (nonMissableVariables.remove(v1)) {
+            nonMissableVariables.add(v2);
+        }
+        for (List<LogicalVariable> missableVarList : correlatedMissableVariableLists) {
+            if (missableVarList.remove(v1)) {
+                missableVarList.add(v2);
+            }
+        }
+        return result;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 9ca2f6e..dd109ff 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -218,8 +218,14 @@
 
     public static Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> deepCopyWithNewVars(
             ILogicalOperator root, IOptimizationContext ctx) throws AlgebricksException {
+        return deepCopyWithNewVars(root, ctx, true);
+    }
+
+    public static Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> deepCopyWithNewVars(
+            ILogicalOperator root, IOptimizationContext ctx, boolean computeTypeEnvironment)
+            throws AlgebricksException {
         LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor =
-                new LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, ctx, true);
+                new LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, computeTypeEnvironment ? ctx : null, true);
         ILogicalOperator newRoot = deepCopyVisitor.deepCopy(root);
         return new Pair<>(newRoot, deepCopyVisitor.getInputToOutputVariableMapping());
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index c6589b0..c0d72bb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -285,13 +285,11 @@
     }
 
     public static boolean isCardinalityExactOne(ILogicalOperator operator) throws AlgebricksException {
-        CardinalityInferenceVisitor visitor = new CardinalityInferenceVisitor();
-        return operator.accept(visitor, null) == 1L;
+        return CardinalityInferenceVisitor.isCardinalityExactOne(operator);
     }
 
     public static boolean isCardinalityZeroOrOne(ILogicalOperator operator) throws AlgebricksException {
-        CardinalityInferenceVisitor visitor = new CardinalityInferenceVisitor();
-        return operator.accept(visitor, null) <= 1;
+        return CardinalityInferenceVisitor.isCardinalityZeroOrOne(operator);
     }
 
     /**
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
index 566c11d..2ed40fd 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/config/AlgebricksConfig.java
@@ -30,4 +30,6 @@
     public static final boolean INDEX_ONLY_DEFAULT = true;
     public static final boolean SANITYCHECK_DEFAULT = false;
     public static final boolean EXTERNAL_FIELD_PUSHDOWN_DEFAULT = false;
+    public static final boolean SUBPLAN_MERGE_DEFAULT = true;
+    public static final boolean SUBPLAN_NESTEDPUSHDOWN_DEFAULT = true;
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index e725cce..bfa4298 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -42,6 +42,8 @@
     private static final String INDEX_ONLY = "INDEX_ONLY";
     private static final String SANITY_CHECK = "SANITY_CHECK";
     private static final String EXTERNAL_FIELD_PUSHDOWN = "EXTERNAL_FIELD_PUSHDOWN";
+    private static final String SUBPLAN_MERGE = "SUBPLAN_MERGE";
+    private static final String SUBPLAN_NESTEDPUSHDOWN = "SUBPLAN_NESTEDPUSHDOWN";
 
     private Properties properties = new Properties();
 
@@ -199,6 +201,22 @@
         setBoolean(EXTERNAL_FIELD_PUSHDOWN, externalFieldPushDown);
     }
 
+    public boolean getSubplanMerge() {
+        return getBoolean(SUBPLAN_MERGE, AlgebricksConfig.SUBPLAN_MERGE_DEFAULT);
+    }
+
+    public void setSubplanMerge(boolean value) {
+        setBoolean(SUBPLAN_MERGE, value);
+    }
+
+    public boolean getSubplanNestedPushdown() {
+        return getBoolean(SUBPLAN_NESTEDPUSHDOWN, AlgebricksConfig.SUBPLAN_NESTEDPUSHDOWN_DEFAULT);
+    }
+
+    public void setSubplanNestedPushdown(boolean value) {
+        setBoolean(SUBPLAN_NESTEDPUSHDOWN, value);
+    }
+
     private void setInt(String property, int value) {
         properties.setProperty(property, Integer.toString(value));
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanWithInputCardinalityOneRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanWithInputCardinalityOneRule.java
index e2576ba..9af3608 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanWithInputCardinalityOneRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanWithInputCardinalityOneRule.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hyracks.algebricks.rewriter.rules.subplan;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
@@ -84,36 +83,32 @@
             }
 
             SubplanOperator subplan = (SubplanOperator) op1;
-            Set<LogicalVariable> usedVarsUp = new ListSet<>();
-            OperatorPropertiesUtil.getFreeVariablesInPath(rootRef.getValue(), subplan, usedVarsUp);
             // TODO(buyingyi): figure out the rewriting for subplan operators with multiple subplans.
             if (subplan.getNestedPlans().size() != 1) {
                 continue;
             }
 
+            Set<LogicalVariable> usedVarsUp = new ListSet<>();
+            OperatorPropertiesUtil.getFreeVariablesInPath(rootRef.getValue(), subplan, usedVarsUp);
+
             // Recursively rewrites the pipelines inside a nested subplan.
             for (Mutable<ILogicalOperator> nestedRootRef : subplan.getNestedPlans().get(0).getRoots()) {
-                changed |= this.rewriteForOperator(nestedRootRef, nestedRootRef, context);
+                changed |= rewriteForOperator(nestedRootRef, nestedRootRef, context);
             }
 
-            ILogicalOperator subplanInputOperator = subplan.getInputs().get(0).getValue();
+            Mutable<ILogicalOperator> subplanInputOperatorRef = subplan.getInputs().get(0);
+            ILogicalOperator subplanInputOperator = subplanInputOperatorRef.getValue();
             Set<LogicalVariable> subplanInputVars = new ListSet<>();
             VariableUtilities.getLiveVariables(subplanInputOperator, subplanInputVars);
-            int subplanInputVarSize = subplanInputVars.size();
-            subplanInputVars.removeAll(usedVarsUp);
-            // Makes sure the free variables are only used in the subplan.
-            if (subplanInputVars.size() < subplanInputVarSize) {
+            // Make sure that subplan input vars are only used inside the subplan (i.e. not used above the subplan op)
+            if (!OperatorPropertiesUtil.disjoint(subplanInputVars, usedVarsUp)) {
                 continue;
             }
-            Set<LogicalVariable> freeVars = new ListSet<>();
-            OperatorPropertiesUtil.getFreeVariablesInSubplans(subplan, freeVars);
-            boolean cardinalityOne = isCardinalityOne(subplan.getInputs().get(0), freeVars);
-            if (!cardinalityOne) {
+            if (!OperatorPropertiesUtil.isCardinalityExactOne(subplanInputOperator)) {
                 continue;
             }
-            /** If the cardinality of freeVars in the subplan is one, the subplan can be removed. */
+            /* The subplan can be removed. */
             ILogicalPlan plan = subplan.getNestedPlans().get(0);
-
             List<Mutable<ILogicalOperator>> rootRefs = plan.getRoots();
             // TODO(buyingyi): investigate the case of multi-root plans.
             if (rootRefs.size() != 1) {
@@ -132,62 +127,4 @@
 
         return changed;
     }
-
-    /**
-     * Whether the cardinality of the input free variables are one.
-     *
-     * @param opRef
-     *            the operator to be checked (including its input operators)
-     * @param freeVars
-     *            variables to be checked for produced operators
-     * @return true if every input variable has cardinality one; false otherwise.
-     * @throws AlgebricksException
-     */
-    private boolean isCardinalityOne(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> freeVars)
-            throws AlgebricksException {
-        Set<LogicalVariable> varsWithCardinalityOne = new ListSet<>();
-        Set<LogicalVariable> varsLiveAtUnnestAndJoin = new ListSet<>();
-        isCardinalityOne(opRef, freeVars, varsWithCardinalityOne, varsLiveAtUnnestAndJoin);
-        varsWithCardinalityOne.removeAll(varsLiveAtUnnestAndJoin);
-        return varsWithCardinalityOne.equals(freeVars);
-    }
-
-    /**
-     * Recursively adding variables which has cardinality one into the input free variable set.
-     *
-     * @param opRef
-     *            , the current operator reference.
-     * @param freeVars
-     *            , a set of variables.
-     * @param varsWithCardinalityOne
-     *            , variables in the free variable set with cardinality one at the time they are created.
-     * @param varsLiveAtUnnestAndJoin
-     *            , live variables at Unnest and Join. The cardinalities of those variables can become more than one
-     *            even if their cardinalities were one at the time those variables were created.
-     * @throws AlgebricksException
-     */
-    private void isCardinalityOne(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> freeVars,
-            Set<LogicalVariable> varsWithCardinalityOne, Set<LogicalVariable> varsLiveAtUnnestAndJoin)
-            throws AlgebricksException {
-        AbstractLogicalOperator operator = (AbstractLogicalOperator) opRef.getValue();
-        List<LogicalVariable> liveVars = new ArrayList<>();
-        VariableUtilities.getLiveVariables(operator, liveVars);
-
-        if (OperatorPropertiesUtil.isCardinalityZeroOrOne(operator)) {
-            for (LogicalVariable liveVar : liveVars) {
-                if (freeVars.contains(liveVar)) {
-                    varsWithCardinalityOne.add(liveVar);
-                }
-            }
-        } else {
-            // Operators with the following tags could still have have cardinality one,
-            // hence they are in this "else" branch.
-            if (operator.getOperatorTag() == LogicalOperatorTag.UNNEST
-                    || operator.getOperatorTag() == LogicalOperatorTag.INNERJOIN
-                    || operator.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
-                VariableUtilities.getLiveVariables(operator, varsLiveAtUnnestAndJoin);
-            }
-        }
-    }
-
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/NestedSubplanToJoinRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/NestedSubplanToJoinRule.java
index 53b8eff..3f0eea1 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/NestedSubplanToJoinRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/NestedSubplanToJoinRule.java
@@ -26,7 +26,6 @@
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -110,21 +109,25 @@
              * Expends the input and roots into a DAG of nested loop joins.
              * Though joins should be left-outer joins, a left-outer join with condition TRUE is equivalent to an inner join.
              **/
-            Mutable<ILogicalExpression> expr = new MutableObject<ILogicalExpression>(ConstantExpression.TRUE);
             Mutable<ILogicalOperator> nestedRootRef = nestedRoots.get(0);
-            InnerJoinOperator join =
-                    new InnerJoinOperator(expr, new MutableObject<ILogicalOperator>(subplanInput), nestedRootRef);
+            InnerJoinOperator join = new InnerJoinOperator(new MutableObject<>(ConstantExpression.TRUE),
+                    new MutableObject<>(subplanInput), nestedRootRef);
             join.setSourceLocation(sourceLoc);
 
             /** rewrite the nested tuple source to be empty tuple source */
             rewriteNestedTupleSource(nestedRootRef, context);
+            context.computeAndSetTypeEnvironmentForOperator(join);
 
             for (int i = 1; i < nestedRoots.size(); i++) {
-                join = new InnerJoinOperator(expr, new MutableObject<ILogicalOperator>(join), nestedRoots.get(i));
+                nestedRootRef = nestedRoots.get(i);
+                join = new InnerJoinOperator(new MutableObject<>(ConstantExpression.TRUE), new MutableObject<>(join),
+                        nestedRootRef);
                 join.setSourceLocation(sourceLoc);
+                rewriteNestedTupleSource(nestedRootRef, context);
+                context.computeAndSetTypeEnvironmentForOperator(join);
             }
             op1.getInputs().get(index).setValue(join);
-            context.computeAndSetTypeEnvironmentForOperator(join);
+
             rewritten = true;
         }
         return rewritten;
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
index 420bf12..4113dbd 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
@@ -25,8 +25,10 @@
 import java.util.Deque;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -39,7 +41,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
@@ -49,14 +51,16 @@
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 
 /**
- * This rule pushes an array of subplans on top of a group-by into the
- * nested plan of the group-by.
+ * This rule pushes an array of subplans on top of a group-by or a subplan into the
+ * nested plan of the group-by / subplan
  *
  * @author yingyib
+ *
+ * TODO(dmitry):rename to PushSubplanIntoGroupByOrSubplanRule
  */
-
 public class PushSubplanIntoGroupByRule implements IAlgebraicRewriteRule {
     /** The pointer to the topmost operator */
     private Mutable<ILogicalOperator> rootRef;
@@ -99,134 +103,171 @@
                 subplans.addFirst(currentSubplan);
                 op = op.getInputs().get(0).getValue();
             }
-            // Only processes the case a group-by operator is the input of the subplan operators.
-            if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
-                continue;
-            }
-            GroupByOperator gby = (GroupByOperator) op;
-            // Recursively rewrites the pipelines inside a nested subplan.
-            for (ILogicalPlan subplan : gby.getNestedPlans()) {
-                for (Mutable<ILogicalOperator> nestedRootRef : subplan.getRoots()) {
-                    changed |= rewriteForOperator(nestedRootRef, nestedRootRef.getValue(), context);
+            if (op.getOperatorTag() == LogicalOperatorTag.GROUP) {
+                // Process the case a group-by operator is the input of the subplan operators.
+                GroupByOperator gby = (GroupByOperator) op;
+                // Recursively rewrites the pipelines inside a nested subplan.
+                for (ILogicalPlan gbyNestedPlan : gby.getNestedPlans()) {
+                    for (Mutable<ILogicalOperator> gbyNestedPlanRootRef : gbyNestedPlan.getRoots()) {
+                        changed |= rewriteForOperator(gbyNestedPlanRootRef, gbyNestedPlanRootRef.getValue(), context);
+                    }
+                }
+                changed |= pushSubplansIntoGroupByOrSubplan(rootRef, parentOperator, i, subplans, gby, context);
+            } else if (subplans.size() > 1 && context.getPhysicalOptimizationConfig().getSubplanMerge()) {
+                // Process the case a subplan operator is the input of the subplan operators.
+                SubplanOperator destOp = subplans.removeFirst();
+                if (!context.checkIfInDontApplySet(this, destOp)) {
+                    changed |= pushSubplansIntoGroupByOrSubplan(rootRef, parentOperator, i, subplans, destOp, context);
                 }
             }
-            changed |= pushSubplansIntoGroupBy(rootRef, parentOperator, i, subplans, gby, context);
         }
         return changed;
     }
 
-    // Pushes subplans into the group by operator.
-    private boolean pushSubplansIntoGroupBy(Mutable<ILogicalOperator> currentRootRef, ILogicalOperator parentOperator,
-            int parentChildIdx, Deque<SubplanOperator> subplans, GroupByOperator gby, IOptimizationContext context)
-            throws AlgebricksException {
+    // Pushes subplans into operator with nested plans (group by or subplan).
+    private boolean pushSubplansIntoGroupByOrSubplan(Mutable<ILogicalOperator> currentRootRef,
+            ILogicalOperator parentOperator, int parentChildIdx, Deque<SubplanOperator> subplans,
+            AbstractOperatorWithNestedPlans destOp, IOptimizationContext context) throws AlgebricksException {
         boolean changed = false;
-        List<ILogicalPlan> newGbyNestedPlans = new ArrayList<>();
-        List<ILogicalPlan> originalNestedPlansInGby = gby.getNestedPlans();
+        List<ILogicalPlan> newDestOpNestedPlans = new ArrayList<>();
 
-        // Adds all original subplans from the group by.
-        for (ILogicalPlan gbyNestedPlanOriginal : originalNestedPlansInGby) {
-            newGbyNestedPlans.add(gbyNestedPlanOriginal);
+        Deque<Set<LogicalVariable>> freeVarsInSubplans = new ArrayDeque<>(subplans.size());
+        for (SubplanOperator subplan : subplans) {
+            Set<LogicalVariable> freeVarsInSubplan = new ListSet<>();
+            OperatorPropertiesUtil.getFreeVariablesInSubplans(subplan, freeVarsInSubplan);
+            freeVarsInSubplans.addLast(freeVarsInSubplan);
         }
 
-        // Tries to push subplans into the group by.
-        Iterator<SubplanOperator> subplanOperatorIterator = subplans.iterator();
-        while (subplanOperatorIterator.hasNext()) {
-            SubplanOperator subplan = subplanOperatorIterator.next();
-            Iterator<ILogicalPlan> subplanNestedPlanIterator = subplan.getNestedPlans().iterator();
-            while (subplanNestedPlanIterator.hasNext()) {
-                ILogicalPlan subplanNestedPlan = subplanNestedPlanIterator.next();
-                List<Mutable<ILogicalOperator>> upperSubplanRootRefs = subplanNestedPlan.getRoots();
-                Iterator<Mutable<ILogicalOperator>> upperSubplanRootRefIterator = upperSubplanRootRefs.iterator();
-                while (upperSubplanRootRefIterator.hasNext()) {
-                    Mutable<ILogicalOperator> rootOpRef = upperSubplanRootRefIterator.next();
+        Set<LogicalVariable> liveVarsBeforeDestOp = new ListSet<>();
+        ILogicalOperator destOpInput = destOp.getInputs().get(0).getValue();
+        VariableUtilities.getLiveVariables(destOpInput, liveVarsBeforeDestOp);
 
-                    if (downToNts(rootOpRef) == null) {
+        Set<LogicalVariable> liveVarsInDestOpNestedPlan = new ListSet<>();
+        Set<LogicalVariable> upperSubplanFreeVarsTmp = new ListSet<>();
+
+        // Tries to push subplans into the destination operator (destop)
+        for (Iterator<SubplanOperator> subplanOperatorIterator = subplans.iterator(); subplanOperatorIterator
+                .hasNext();) {
+            SubplanOperator upperSubplan = subplanOperatorIterator.next();
+            Set<LogicalVariable> upperSubplanFreeVars = freeVarsInSubplans.removeFirst();
+
+            for (Iterator<ILogicalPlan> upperSubplanNestedPlanIter =
+                    upperSubplan.getNestedPlans().iterator(); upperSubplanNestedPlanIter.hasNext();) {
+                ILogicalPlan upperSubplanNestedPlan = upperSubplanNestedPlanIter.next();
+                List<Mutable<ILogicalOperator>> upperSubplanRootRefs = upperSubplanNestedPlan.getRoots();
+                for (Iterator<Mutable<ILogicalOperator>> upperSubplanRootRefIter =
+                        upperSubplanRootRefs.iterator(); upperSubplanRootRefIter.hasNext();) {
+                    Mutable<ILogicalOperator> upperSubplanRootRef = upperSubplanRootRefIter.next();
+                    Mutable<ILogicalOperator> upperSubplanNtsRef = downToNts(upperSubplanRootRef);
+                    if (upperSubplanNtsRef == null) {
                         continue;
                     }
 
-                    // Collects free variables in the root operator of a nested plan and its descent.
-                    Set<LogicalVariable> freeVars = new ListSet<>();
-                    OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) rootOpRef.getValue(),
-                            freeVars);
-
-                    // Checks whether the above freeVars are all contained in live variables * of one nested plan
-                    // inside the group-by operator. If yes, then the subplan can be pushed into the nested plan
-                    // of the group-by.
-                    for (ILogicalPlan gbyNestedPlanOriginal : originalNestedPlansInGby) {
-                        ILogicalPlan gbyNestedPlan = OperatorManipulationUtil.deepCopy(gbyNestedPlanOriginal, context);
-                        List<Mutable<ILogicalOperator>> gbyRootOpRefs = gbyNestedPlan.getRoots();
-                        for (int rootIndex = 0; rootIndex < gbyRootOpRefs.size(); rootIndex++) {
-                            // Sets the nts for a original subplan.
-                            Mutable<ILogicalOperator> originalGbyRootOpRef = gbyNestedPlan.getRoots().get(rootIndex);
-                            Mutable<ILogicalOperator> originalGbyNtsRef = downToNts(originalGbyRootOpRef);
-                            if (originalGbyNtsRef == null) {
-                                continue;
-                            }
-                            NestedTupleSourceOperator originalNts =
-                                    (NestedTupleSourceOperator) originalGbyNtsRef.getValue();
-                            originalNts.setDataSourceReference(new MutableObject<>(gby));
-
-                            // Pushes a new subplan if possible.
-                            Mutable<ILogicalOperator> gbyRootOpRef = gbyRootOpRefs.get(rootIndex);
-                            Set<LogicalVariable> liveVars = new ListSet<>();
-                            VariableUtilities.getLiveVariables(gbyRootOpRef.getValue(), liveVars);
-                            if (!liveVars.containsAll(freeVars)) {
+                    loop_dest_op_nested_plans: for (ILogicalPlan originalDestOpNestedPlan : destOp.getNestedPlans()) {
+                        for (Mutable<ILogicalOperator> originalDestOpNestedPlanRootRef : originalDestOpNestedPlan
+                                .getRoots()) {
+                            if (downToNts(originalDestOpNestedPlanRootRef) == null) {
                                 continue;
                             }
 
-                            AggregateOperator aggOp = (AggregateOperator) gbyRootOpRef.getValue();
-                            for (int varIndex = aggOp.getVariables().size() - 1; varIndex >= 0; varIndex--) {
-                                if (!freeVars.contains(aggOp.getVariables().get(varIndex))) {
-                                    aggOp.getVariables().remove(varIndex);
-                                    aggOp.getExpressions().remove(varIndex);
+                            // Check that the upper subplan's freeVars contains only
+                            // 1. live variables from the current destop's nested plan (must have, otherwise we exit),
+                            // 2. (optionally) live variables before the destop.
+                            // If yes, then the subplan could be pushed into the destop's nested plan.
+                            // In case #2 above we push upper plan into the nested plan as a subplan,
+                            // otherwise (no live variables before destop are used by upper subplan)
+                            // we merge the upper supplan into the destop's nested plan
+                            upperSubplanFreeVarsTmp.clear();
+                            upperSubplanFreeVarsTmp.addAll(upperSubplanFreeVars);
+
+                            liveVarsInDestOpNestedPlan.clear();
+                            VariableUtilities.getLiveVariables(originalDestOpNestedPlanRootRef.getValue(),
+                                    liveVarsInDestOpNestedPlan);
+                            if (!upperSubplanFreeVarsTmp.removeAll(liveVarsInDestOpNestedPlan)) {
+                                // upper subplan's freeVars doesn't contain any live variables of
+                                // the current destop's nested plan => exit
+                                continue;
+                            }
+
+                            boolean needInnerSubplan = false;
+                            if (!upperSubplanFreeVarsTmp.isEmpty()) {
+                                if (!context.getPhysicalOptimizationConfig().getSubplanNestedPushdown()) {
+                                    continue;
+                                }
+                                upperSubplanFreeVarsTmp.removeAll(liveVarsBeforeDestOp);
+                                if (!upperSubplanFreeVarsTmp.isEmpty()) {
+                                    continue;
+                                }
+                                needInnerSubplan = true;
+                            }
+
+                            // We can push the current subplan into the destop's nested plan
+
+                            // Copy the original nested pipeline inside the group-by.
+                            // (don't compute type environment for each operator, we'll do it later)
+                            Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> copiedDestOpNestedPlanRootRefAndVarMap =
+                                    OperatorManipulationUtil.deepCopyWithNewVars(
+                                            originalDestOpNestedPlanRootRef.getValue(), context, false);
+                            ILogicalOperator copiedDestOpNestedPlanRootRef =
+                                    copiedDestOpNestedPlanRootRefAndVarMap.first;
+
+                            AggregateOperator originalAggOp =
+                                    (AggregateOperator) originalDestOpNestedPlanRootRef.getValue();
+                            AggregateOperator copiedAggOp = (AggregateOperator) copiedDestOpNestedPlanRootRef;
+                            for (int varIndex = originalAggOp.getVariables().size() - 1; varIndex >= 0; varIndex--) {
+                                if (!upperSubplanFreeVars.contains(originalAggOp.getVariables().get(varIndex))) {
+                                    copiedAggOp.getVariables().remove(varIndex);
+                                    copiedAggOp.getExpressions().remove(varIndex);
                                 }
                             }
 
-                            // Copy the original nested pipeline inside the group-by.
-                            Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> copiedAggOpAndVarMap =
-                                    OperatorManipulationUtil.deepCopyWithNewVars(aggOp, context);
-                            ILogicalOperator newBottomAgg = copiedAggOpAndVarMap.first;
-
-                            // Substitutes variables in the upper nested pipe line.
-                            VariableUtilities.substituteVariablesInDescendantsAndSelf(rootOpRef.getValue(),
-                                    copiedAggOpAndVarMap.second, context);
+                            // Substitutes variables in the upper nested plan.
+                            ILogicalOperator upperSubplanRoot = upperSubplanRootRef.getValue();
+                            VariableUtilities.substituteVariablesInDescendantsAndSelf(upperSubplanRoot,
+                                    copiedDestOpNestedPlanRootRefAndVarMap.second, context);
 
                             // Does the actual push.
-                            Mutable<ILogicalOperator> ntsRef = downToNts(rootOpRef);
-                            ntsRef.setValue(newBottomAgg);
-                            gbyRootOpRef.setValue(rootOpRef.getValue());
+                            Mutable<ILogicalOperator> copiedDestOpNestedPlanNtsRef = Objects
+                                    .requireNonNull(downToNts(new MutableObject<>(copiedDestOpNestedPlanRootRef)));
+                            NestedTupleSourceOperator copiedDestOpNestedPlanNts =
+                                    (NestedTupleSourceOperator) copiedDestOpNestedPlanNtsRef.getValue();
 
-                            // Sets the nts for a new pushed plan.
-                            Mutable<ILogicalOperator> oldGbyNtsRef = downToNts(new MutableObject<>(newBottomAgg));
-                            NestedTupleSourceOperator nts = (NestedTupleSourceOperator) oldGbyNtsRef.getValue();
-                            nts.setDataSourceReference(new MutableObject<>(gby));
+                            if (needInnerSubplan) {
+                                SubplanOperator newInnerSubplan = new SubplanOperator(copiedDestOpNestedPlanRootRef);
+                                NestedTupleSourceOperator newNts =
+                                        new NestedTupleSourceOperator(new MutableObject<>(destOp));
+                                newInnerSubplan.getInputs().add(new MutableObject<>(newNts));
+                                copiedDestOpNestedPlanNts.setDataSourceReference(new MutableObject<>(newInnerSubplan));
+                                upperSubplanNtsRef.setValue(newInnerSubplan);
+                            } else {
+                                copiedDestOpNestedPlanNts.setDataSourceReference(new MutableObject<>(destOp));
+                                upperSubplanNtsRef.setValue(copiedDestOpNestedPlanRootRef);
+                            }
 
-                            OperatorManipulationUtil.computeTypeEnvironmentBottomUp(rootOpRef.getValue(), context);
-                            newGbyNestedPlans.add(new ALogicalPlanImpl(rootOpRef));
-
-                            upperSubplanRootRefIterator.remove();
+                            newDestOpNestedPlans.add(new ALogicalPlanImpl(new MutableObject<>(upperSubplanRoot)));
+                            upperSubplanRootRefIter.remove();
                             changed = true;
-                            break;
+                            break loop_dest_op_nested_plans;
                         }
                     }
                 }
 
                 if (upperSubplanRootRefs.isEmpty()) {
-                    subplanNestedPlanIterator.remove();
+                    upperSubplanNestedPlanIter.remove();
                     changed = true;
                 }
             }
-            if (subplan.getNestedPlans().isEmpty()) {
+            if (upperSubplan.getNestedPlans().isEmpty()) {
                 subplanOperatorIterator.remove();
                 changed = true;
             }
         }
 
-        // Resets the nested subplans for the group-by operator.
-        gby.getNestedPlans().clear();
-        gby.getNestedPlans().addAll(newGbyNestedPlans);
+        if (!changed) {
+            return false;
+        }
 
-        // Connects the group-by operator with its parent operator.
+        // Connects the destination operator with its parent operator.
         ILogicalOperator parent;
         int childIdx;
         if (!subplans.isEmpty()) {
@@ -236,15 +277,29 @@
             parent = parentOperator;
             childIdx = parentChildIdx;
         }
-        parent.getInputs().get(childIdx).setValue(gby);
+        parent.getInputs().get(childIdx).setValue(destOp);
 
-        // Removes unnecessary pipelines inside the group by operator.
-        changed |= cleanup(currentRootRef.getValue(), gby);
+        // Add new nested plans into the destination operator
+        destOp.getNestedPlans().addAll(newDestOpNestedPlans);
+        // Remove unnecessary nested plans from the destination operator.
+        cleanup(currentRootRef.getValue(), destOp);
+
+        // If subplan pushdown results in destop subplan operator with multiple roots then
+        // we break destop subplan operator into separate subplan operators.
+        if (destOp.getOperatorTag() == LogicalOperatorTag.SUBPLAN && destOp.getNumberOfRoots() > 1) {
+            splitMultiRootSubplan((SubplanOperator) destOp, context);
+        }
 
         // Computes type environments.
-        context.computeAndSetTypeEnvironmentForOperator(gby);
+        for (ILogicalPlan nestedPlan : destOp.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> rootOp : nestedPlan.getRoots()) {
+                OperatorManipulationUtil.computeTypeEnvironmentBottomUp(rootOp.getValue(), context);
+            }
+        }
+        context.computeAndSetTypeEnvironmentForOperator(destOp);
         context.computeAndSetTypeEnvironmentForOperator(parent);
-        return changed;
+
+        return true;
     }
 
     /**
@@ -252,17 +307,14 @@
      *
      * @param rootOp,
      *            the root operator of a plan or nested plan.
-     * @param gby,
-     *            the group-by operator.
-     * @throws AlgebricksException
+     * @param destOp,
+     *            the operator with nested plans that needs to be cleaned up.
      */
-    private boolean cleanup(ILogicalOperator rootOp, GroupByOperator gby) throws AlgebricksException {
-        boolean changed = false;
+    private void cleanup(ILogicalOperator rootOp, AbstractOperatorWithNestedPlans destOp) throws AlgebricksException {
         Set<LogicalVariable> freeVars = new HashSet<>();
-        OperatorPropertiesUtil.getFreeVariablesInPath(rootOp, gby, freeVars);
-        Iterator<ILogicalPlan> nestedPlanIterator = gby.getNestedPlans().iterator();
-        while (nestedPlanIterator.hasNext()) {
-            ILogicalPlan nestedPlan = nestedPlanIterator.next();
+        OperatorPropertiesUtil.getFreeVariablesInPath(rootOp, destOp, freeVars);
+        for (Iterator<ILogicalPlan> nestedPlanIter = destOp.getNestedPlans().iterator(); nestedPlanIter.hasNext();) {
+            ILogicalPlan nestedPlan = nestedPlanIter.next();
             Iterator<Mutable<ILogicalOperator>> nestRootRefIterator = nestedPlan.getRoots().iterator();
             while (nestRootRefIterator.hasNext()) {
                 Mutable<ILogicalOperator> nestRootRef = nestRootRefIterator.next();
@@ -271,20 +323,52 @@
                     if (!freeVars.contains(aggOp.getVariables().get(varIndex))) {
                         aggOp.getVariables().remove(varIndex);
                         aggOp.getExpressions().remove(varIndex);
-                        changed = true;
                     }
                 }
                 if (aggOp.getVariables().isEmpty()) {
                     nestRootRefIterator.remove();
-                    changed = true;
                 }
             }
             if (nestedPlan.getRoots().isEmpty()) {
-                nestedPlanIterator.remove();
-                changed = true;
+                nestedPlanIter.remove();
             }
         }
-        return changed;
+    }
+
+    /**
+     * Split multi-root subplan operator into separate subplan operators each with a single root
+     */
+    private void splitMultiRootSubplan(SubplanOperator destOp, IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator currentInputOp = destOp.getInputs().get(0).getValue();
+        LinkedList<Mutable<ILogicalOperator>> destOpRootRefs = destOp.allRootsInReverseOrder();
+        for (;;) {
+            Mutable<ILogicalOperator> destOpRootRef = destOpRootRefs.removeFirst();
+            ILogicalOperator destOpRoot = destOpRootRef.getValue();
+            if (!destOpRootRefs.isEmpty()) {
+                SubplanOperator newSubplanOp = new SubplanOperator(destOpRoot);
+                newSubplanOp.setSourceLocation(destOp.getSourceLocation());
+                newSubplanOp.getInputs().add(new MutableObject<>(currentInputOp));
+
+                Mutable<ILogicalOperator> ntsRef = downToNts(destOpRootRef);
+                if (ntsRef == null) {
+                    throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE, destOpRoot.getSourceLocation(), "");
+                }
+                ((NestedTupleSourceOperator) ntsRef.getValue()).getDataSourceReference().setValue(newSubplanOp);
+
+                OperatorManipulationUtil.computeTypeEnvironmentBottomUp(destOpRoot, context);
+                context.computeAndSetTypeEnvironmentForOperator(newSubplanOp);
+                context.addToDontApplySet(this, newSubplanOp);
+                currentInputOp = newSubplanOp;
+            } else {
+                destOp.getNestedPlans().clear();
+                destOp.getNestedPlans().add(new ALogicalPlanImpl(new MutableObject<>(destOpRoot)));
+                destOp.getInputs().clear();
+                destOp.getInputs().add(new MutableObject<>(currentInputOp));
+                context.addToDontApplySet(this, destOp);
+                break;
+            }
+        }
     }
 
     private Mutable<ILogicalOperator> downToNts(Mutable<ILogicalOperator> opRef) {
@@ -297,4 +381,4 @@
         }
         return null;
     }
-}
+}
\ No newline at end of file