[ASTERIXDB-2051][COMP] Fix PushSubplanIntoGroupByRule for complex cases.

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- re-implement PushSubplanIntoGroupByRule and let it handle general cases;
- add an option to LogicalOperatorDeepCopyWithNewVariablesVisitor for
  not re-mapping free variables in the given plan subtree.

Change-Id: I969c40112be0506981357a9c41bf9675ae12ffb9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1992
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
index 725de12..19c6da7 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
@@ -629,13 +629,13 @@
 
         // Create first copy.
         LogicalOperatorDeepCopyWithNewVariablesVisitor firstDeepCopyVisitor = new LogicalOperatorDeepCopyWithNewVariablesVisitor(
-                context, context, newProbeSubTreeVarMap);
+                context, context, newProbeSubTreeVarMap, true);
         ILogicalOperator newProbeSubTree = firstDeepCopyVisitor.deepCopy(probeSubTree.getRoot());
         inferTypes(newProbeSubTree, context);
         Mutable<ILogicalOperator> newProbeSubTreeRootRef = new MutableObject<ILogicalOperator>(newProbeSubTree);
         // Create second copy.
         LogicalOperatorDeepCopyWithNewVariablesVisitor secondDeepCopyVisitor = new LogicalOperatorDeepCopyWithNewVariablesVisitor(
-                context, context, joinInputSubTreeVarMap);
+                context, context, joinInputSubTreeVarMap, true);
         ILogicalOperator joinInputSubTree = secondDeepCopyVisitor.deepCopy(probeSubTree.getRoot());
         inferTypes(joinInputSubTree, context);
         probeSubTree.getRootRef().setValue(joinInputSubTree);
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp
new file mode 100644
index 0000000..4b94bf6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-810-3.sqlpp
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description  : This test case is to verify the fix for issue810
+ * https://code.google.com/p/asterixdb/issues/detail?id=810
+ * Expected Res : SUCCESS
+ * Date         : 16th Nov. 2014
+ */
+
+DROP  DATAVERSE tpch IF EXISTS;
+CREATE DATAVERSE tpch;
+
+USE tpch;
+
+
+CREATE TYPE LineItemType AS CLOSED {
+  l_orderkey : integer,
+  l_partkey : integer,
+  l_suppkey : integer,
+  l_linenumber : integer,
+  l_quantity : double,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+};
+
+CREATE DATASET LineItem(LineItemType) PRIMARY KEY l_orderkey,l_linenumber;
+
+
+SELECT l_returnflag AS l_returnflag,
+       l_linestatus AS l_linestatus,
+       coll_count(cheap) AS count_cheaps,
+       coll_count(expensive) AS count_expensives
+FROM LineItem AS l
+/* +hash */
+GROUP BY l.l_returnflag AS l_returnflag,l.l_linestatus AS l_linestatus
+GROUP AS g
+LET cheap = (
+      SELECT ELEMENT m
+      FROM (FROM g SELECT VALUE l) AS m
+      WHERE m.l_discount > 0.05
+),
+expensive = (
+      SELECT ELEMENT g.l
+      FROM g
+      WHERE g.l.l_discount <= 0.05
+)
+ORDER BY l_returnflag,l_linestatus
+;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
index d9ce6c8..26d6103 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/inlined_q18_large_volume_customer.plan
@@ -8,12 +8,12 @@
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STABLE_SORT [topK: 100] [$$o_totalprice(DESC), $$o_orderdate(ASC)]  |PARTITIONED|
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- SORT_GROUP_BY[$$72, $$73]  |PARTITIONED|
+                    -- SORT_GROUP_BY[$$73, $$74]  |PARTITIONED|
                             {
                               -- AGGREGATE  |LOCAL|
                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                             }
-                      -- HASH_PARTITION_EXCHANGE [$$72, $$73]  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$73, $$74]  |PARTITIONED|
                         -- SORT_GROUP_BY[$$56, $$57]  |PARTITIONED|
                                 {
                                   -- AGGREGATE  |LOCAL|
@@ -49,12 +49,12 @@
                                             -- STREAM_PROJECT  |PARTITIONED|
                                               -- STREAM_SELECT  |PARTITIONED|
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- SORT_GROUP_BY[$$69]  |PARTITIONED|
+                                                  -- SORT_GROUP_BY[$$70]  |PARTITIONED|
                                                           {
                                                             -- AGGREGATE  |LOCAL|
                                                               -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                           }
-                                                    -- HASH_PARTITION_EXCHANGE [$$69]  |PARTITIONED|
+                                                    -- HASH_PARTITION_EXCHANGE [$$70]  |PARTITIONED|
                                                       -- PRE_CLUSTERED_GROUP_BY[$$58]  |PARTITIONED|
                                                               {
                                                                 -- AGGREGATE  |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
index ab10f2d..f2adbf6 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1263.plan
@@ -3,7 +3,7 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$27]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$28]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
@@ -20,9 +20,9 @@
                             -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              -- STABLE_SORT [$$27(ASC)]  |PARTITIONED|
-                -- HASH_PARTITION_EXCHANGE [$$27]  |PARTITIONED|
-                  -- SORT_GROUP_BY[$$18, $$24]  |PARTITIONED|
+              -- STABLE_SORT [$$28(ASC)]  |PARTITIONED|
+                -- HASH_PARTITION_EXCHANGE [$$28]  |PARTITIONED|
+                  -- SORT_GROUP_BY[$$18, $$25]  |PARTITIONED|
                           {
                             -- AGGREGATE  |LOCAL|
                               -- NESTED_TUPLE_SOURCE  |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
index cad2fbb..238736a 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-2.plan
@@ -3,7 +3,7 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]  |PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$42, $$43]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$44, $$45]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
@@ -13,8 +13,8 @@
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              -- STABLE_SORT [$$42(ASC), $$43(ASC)]  |PARTITIONED|
-                -- HASH_PARTITION_EXCHANGE [$$42, $$43]  |PARTITIONED|
+              -- STABLE_SORT [$$44(ASC), $$45(ASC)]  |PARTITIONED|
+                -- HASH_PARTITION_EXCHANGE [$$44, $$45]  |PARTITIONED|
                   -- PRE_CLUSTERED_GROUP_BY[$$30, $$31]  |PARTITIONED|
                           {
                             -- AGGREGATE  |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan
new file mode 100644
index 0000000..9a7df35
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810-3.plan
@@ -0,0 +1,37 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$44, $$45]  |PARTITIONED|
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+                  {
+                    -- AGGREGATE  |LOCAL|
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                  }
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STABLE_SORT [$$44(ASC), $$45(ASC)]  |PARTITIONED|
+                -- HASH_PARTITION_EXCHANGE [$$44, $$45]  |PARTITIONED|
+                  -- PRE_CLUSTERED_GROUP_BY[$$31, $$32]  |PARTITIONED|
+                          {
+                            -- AGGREGATE  |LOCAL|
+                              -- STREAM_SELECT  |LOCAL|
+                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                          }
+                          {
+                            -- AGGREGATE  |LOCAL|
+                              -- STREAM_SELECT  |LOCAL|
+                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                          }
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STABLE_SORT [$$31(ASC), $$32(ASC)]  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
index 1c56f1c..18b4218 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-810.plan
@@ -3,7 +3,7 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]  |PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$42, $$43]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$44, $$45]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
@@ -13,8 +13,8 @@
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              -- STABLE_SORT [$$42(ASC), $$43(ASC)]  |PARTITIONED|
-                -- HASH_PARTITION_EXCHANGE [$$42, $$43]  |PARTITIONED|
+              -- STABLE_SORT [$$44(ASC), $$45(ASC)]  |PARTITIONED|
+                -- HASH_PARTITION_EXCHANGE [$$44, $$45]  |PARTITIONED|
                   -- PRE_CLUSTERED_GROUP_BY[$$32, $$33]  |PARTITIONED|
                           {
                             -- AGGREGATE  |LOCAL|
@@ -31,9 +31,7 @@
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                           -- ASSIGN  |PARTITIONED|
                             -- STREAM_PROJECT  |PARTITIONED|
-                              -- ASSIGN  |PARTITIONED|
-                                -- STREAM_PROJECT  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- DATASOURCE_SCAN  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
index 13fb7e1..2d75ff0 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue697.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-          -- SORT_GROUP_BY[$$19]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$20]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_EXCHANGE [$$19]  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$20]  |PARTITIONED|
               -- PRE_CLUSTERED_GROUP_BY[$$16]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
index fe1f67a..4405b9d 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue785.plan
@@ -13,12 +13,12 @@
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
               -- STABLE_SORT [$$nation_key(ASC)]  |PARTITIONED|
                 -- HASH_PARTITION_EXCHANGE [$$nation_key]  |PARTITIONED|
-                  -- SORT_GROUP_BY[$$69, $$70]  |PARTITIONED|
+                  -- SORT_GROUP_BY[$$70, $$71]  |PARTITIONED|
                           {
                             -- AGGREGATE  |LOCAL|
                               -- NESTED_TUPLE_SOURCE  |LOCAL|
                           }
-                    -- HASH_PARTITION_EXCHANGE [$$69, $$70]  |PARTITIONED|
+                    -- HASH_PARTITION_EXCHANGE [$$70, $$71]  |PARTITIONED|
                       -- SORT_GROUP_BY[$$50, $$54]  |PARTITIONED|
                               {
                                 -- AGGREGATE  |LOCAL|
@@ -31,7 +31,7 @@
                                 -- HASH_PARTITION_EXCHANGE [$$56]  |PARTITIONED|
                                   -- STREAM_PROJECT  |PARTITIONED|
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- HYBRID_HASH_JOIN [$$54][$$63]  |PARTITIONED|
+                                      -- HYBRID_HASH_JOIN [$$54][$$64]  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
@@ -48,7 +48,7 @@
                                                       -- DATASOURCE_SCAN  |PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                           -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                        -- HASH_PARTITION_EXCHANGE [$$63]  |PARTITIONED|
+                                        -- HASH_PARTITION_EXCHANGE [$$64]  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
index e229e75..1d24a80 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810-2.plan
@@ -3,7 +3,7 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]  |PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$75, $$76]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$69, $$70]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
@@ -17,8 +17,8 @@
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              -- STABLE_SORT [$$75(ASC), $$76(ASC)]  |PARTITIONED|
-                -- HASH_PARTITION_EXCHANGE [$$75, $$76]  |PARTITIONED|
+              -- STABLE_SORT [$$69(ASC), $$70(ASC)]  |PARTITIONED|
+                -- HASH_PARTITION_EXCHANGE [$$69, $$70]  |PARTITIONED|
                   -- PRE_CLUSTERED_GROUP_BY[$$42, $$43]  |PARTITIONED|
                           {
                             -- AGGREGATE  |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
index ca941bf..3cfd8af 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue810.plan
@@ -3,7 +3,7 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$l_returnflag(ASC), $$l_linestatus(ASC) ]  |PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$34, $$35]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$36, $$37]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
@@ -13,8 +13,8 @@
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              -- STABLE_SORT [$$34(ASC), $$35(ASC)]  |PARTITIONED|
-                -- HASH_PARTITION_EXCHANGE [$$34, $$35]  |PARTITIONED|
+              -- STABLE_SORT [$$36(ASC), $$37(ASC)]  |PARTITIONED|
+                -- HASH_PARTITION_EXCHANGE [$$36, $$37]  |PARTITIONED|
                   -- PRE_CLUSTERED_GROUP_BY[$$23, $$24]  |PARTITIONED|
                           {
                             -- AGGREGATE  |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
index a5124c3..2eb7603 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1308-2.plan
@@ -27,34 +27,34 @@
                               -- UNNEST  |LOCAL|
                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                       }
-                -- SUBPLAN  |PARTITIONED|
-                        {
-                          -- AGGREGATE  |LOCAL|
-                            -- STREAM_SELECT  |LOCAL|
-                              -- ASSIGN  |LOCAL|
-                                -- UNNEST  |LOCAL|
-                                  -- SUBPLAN  |LOCAL|
-                                          {
-                                            -- AGGREGATE  |LOCAL|
-                                              -- IN_MEMORY_STABLE_SORT [$$j(ASC)]  |LOCAL|
-                                                -- UNNEST  |LOCAL|
-                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                          }
-                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
-                        }
-                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- PRE_CLUSTERED_GROUP_BY[$$94, $$95, $$96, $$97]  |PARTITIONED|
-                            {
-                              -- AGGREGATE  |LOCAL|
-                                -- NESTED_TUPLE_SOURCE  |LOCAL|
-                            }
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- STABLE_SORT [$$94(ASC), $$95(ASC), $$96(ASC), $$97(ASC)]  |PARTITIONED|
-                          -- HASH_PARTITION_EXCHANGE [$$94, $$95, $$96, $$97]  |PARTITIONED|
-                            -- STREAM_PROJECT  |PARTITIONED|
-                              -- ASSIGN  |PARTITIONED|
-                                -- STREAM_PROJECT  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- DATASOURCE_SCAN  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- PRE_CLUSTERED_GROUP_BY[$$94, $$95, $$96, $$97]  |PARTITIONED|
+                          {
+                            -- AGGREGATE  |LOCAL|
+                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                          }
+                          {
+                            -- AGGREGATE  |LOCAL|
+                              -- STREAM_SELECT  |LOCAL|
+                                -- ASSIGN  |LOCAL|
+                                  -- UNNEST  |LOCAL|
+                                    -- SUBPLAN  |LOCAL|
+                                            {
+                                              -- AGGREGATE  |LOCAL|
+                                                -- IN_MEMORY_STABLE_SORT [$$j(ASC)]  |LOCAL|
+                                                  -- UNNEST  |LOCAL|
+                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                            }
+                                      -- AGGREGATE  |LOCAL|
+                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                          }
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STABLE_SORT [$$94(ASC), $$95(ASC), $$96(ASC), $$97(ASC)]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$94, $$95, $$96, $$97]  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ASSIGN  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
index cb89a73..cef387a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -43,12 +44,15 @@
     private final IVariableContext varContext;
     private final Map<LogicalVariable, LogicalVariable> inVarMapping;
     private final Map<LogicalVariable, LogicalVariable> outVarMapping;
+    private final Set<LogicalVariable> freeVars;
 
     public LogicalExpressionDeepCopyWithNewVariablesVisitor(IVariableContext varContext,
-            Map<LogicalVariable, LogicalVariable> inVarMapping, Map<LogicalVariable, LogicalVariable> variableMapping) {
+            Map<LogicalVariable, LogicalVariable> inVarMapping, Map<LogicalVariable, LogicalVariable> variableMapping,
+            Set<LogicalVariable> freeVars) {
         this.varContext = varContext;
         this.inVarMapping = inVarMapping;
         this.outVarMapping = variableMapping;
+        this.freeVars = freeVars;
     }
 
     public ILogicalExpression deepCopy(ILogicalExpression expr) throws AlgebricksException {
@@ -146,6 +150,9 @@
     public ILogicalExpression visitVariableReferenceExpression(VariableReferenceExpression expr, Void arg)
             throws AlgebricksException {
         LogicalVariable var = expr.getVariableReference();
+        if (freeVars.contains(var)) {
+            return expr;
+        }
         LogicalVariable givenVarReplacement = inVarMapping.get(var);
         if (givenVarReplacement != null) {
             outVarMapping.put(var, givenVarReplacement);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 0da9110..934577f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -19,9 +19,11 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -38,10 +40,10 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -52,7 +54,6 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
@@ -64,10 +65,12 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
 
 /**
@@ -87,7 +90,13 @@
 
     // Key: New variable in the new plan. Value: The old variable in the
     // original plan.
-    private final LinkedHashMap<LogicalVariable, LogicalVariable> outputVarToInputVarMapping;
+    private final LinkedHashMap<LogicalVariable, LogicalVariable> outputVarToInputVarMapping = new LinkedHashMap<>();
+
+    // Free variables: variables that shouldn't be deep copied, i.e., mapped.
+    private final Set<LogicalVariable> freeVars = new HashSet<>();
+
+    // Whether free variables in the given plan subtree should be reused.
+    private final boolean reuseFreeVars;
 
     /**
      * @param varContext
@@ -96,12 +105,20 @@
      *            the type context.
      */
     public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext varContext, ITypingContext typeContext) {
-        this.varContext = varContext;
-        this.typeContext = typeContext;
-        this.inputVarToOutputVarMapping = new LinkedHashMap<>();
-        this.outputVarToInputVarMapping = new LinkedHashMap<>();
-        this.exprDeepCopyVisitor = new LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext,
-                outputVarToInputVarMapping, inputVarToOutputVarMapping);
+        this(varContext, typeContext, new LinkedHashMap<>(), false);
+    }
+
+    /**
+     * @param varContext
+     *            , the variable context.
+     * @param typeContext
+     *            the type context.
+     * @param reuseFreeVars
+     *            whether free variables in the given plan tree should be reused.
+     */
+    public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext varContext, ITypingContext typeContext,
+            boolean reuseFreeVars) {
+        this(varContext, typeContext, new LinkedHashMap<>(), reuseFreeVars);
     }
 
     /**
@@ -113,15 +130,17 @@
      *            Variable mapping keyed by variables in the original plan.
      *            Those variables are replaced by their corresponding value in
      *            the map in the copied plan.
+     * @param reuseFreeVars
+     *            whether free variables in the given plan tree should be reused.
      */
     public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext varContext, ITypingContext typeContext,
-            LinkedHashMap<LogicalVariable, LogicalVariable> inVarMapping) {
+            LinkedHashMap<LogicalVariable, LogicalVariable> inVarMapping, boolean reuseFreeVars) {
         this.varContext = varContext;
         this.typeContext = typeContext;
         this.inputVarToOutputVarMapping = inVarMapping;
-        this.outputVarToInputVarMapping = new LinkedHashMap<>();
-        exprDeepCopyVisitor = new LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext, inVarMapping,
-                inputVarToOutputVarMapping);
+        this.exprDeepCopyVisitor = new LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext, inVarMapping,
+                inputVarToOutputVarMapping, freeVars);
+        this.reuseFreeVars = reuseFreeVars;
     }
 
     private void copyAnnotations(ILogicalOperator src, ILogicalOperator dest) {
@@ -137,6 +156,11 @@
         if (op == null) {
             return null;
         }
+        if (reuseFreeVars) {
+            // If the reuseFreeVars flag is set, we collect all free variables in the
+            // given operator subtree and do not re-map them in the deep-copied plan.
+            OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) op, freeVars);
+        }
         ILogicalOperator opCopy = op.accept(this, arg);
         if (typeContext != null) {
             OperatorManipulationUtil.computeTypeEnvironmentBottomUp(opCopy, typeContext);
@@ -207,6 +231,9 @@
         if (var == null) {
             return null;
         }
+        if (freeVars.contains(var)) {
+            return var;
+        }
         LogicalVariable givenVarReplacement = outputVarToInputVarMapping.get(var);
         if (givenVarReplacement != null) {
             inputVarToOutputVarMapping.put(var, givenVarReplacement);
@@ -247,6 +274,7 @@
     }
 
     public void reset() {
+        freeVars.clear();
         inputVarToOutputVarMapping.clear();
         outputVarToInputVarMapping.clear();
     }
@@ -563,10 +591,6 @@
         return opCopy;
     }
 
-    public LinkedHashMap<LogicalVariable, LogicalVariable> getOutputToInputVariableMapping() {
-        return outputVarToInputVarMapping;
-    }
-
     public LinkedHashMap<LogicalVariable, LogicalVariable> getInputToOutputVariableMapping() {
         return inputVarToOutputVarMapping;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 8a8fe6d..8d00696 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -20,10 +20,12 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -38,6 +40,8 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.
+        LogicalOperatorDeepCopyWithNewVariablesVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.OperatorDeepCopyVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -196,6 +200,14 @@
         return new ALogicalPlanImpl(newRoots);
     }
 
+    public static Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> deepCopyWithNewVars(
+            ILogicalOperator root, IOptimizationContext ctx) throws AlgebricksException {
+        LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor = new
+                LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, null, true);
+        ILogicalOperator newRoot = deepCopyVisitor.deepCopy(root);
+        return Pair.of(newRoot, deepCopyVisitor.getInputToOutputVariableMapping());
+    }
+
     private static void setDataSource(ILogicalPlan plan, ILogicalOperator dataSource) {
         for (Mutable<ILogicalOperator> rootRef : plan.getRoots()) {
             setDataSource(rootRef, dataSource);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index 8b36a68..463f214 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -75,7 +75,7 @@
      * collection provided.
      *
      * @param op
-     * @param vars
+     * @param freeVars
      *            - The collection to which the free variables will be added.
      */
     public static void getFreeVariablesInSelfOrDesc(AbstractLogicalOperator op, Set<LogicalVariable> freeVars)
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
index e73f2a5..ed4196b 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
@@ -322,7 +322,7 @@
             // Finds the reference of the bottom-most operator in the pipeline that
             // should not be pushed to the combiner group-by.
             Mutable<ILogicalOperator> currentOpRef = new MutableObject<ILogicalOperator>(nestedGby);
-            Mutable<ILogicalOperator> bottomOpRef = findBottomOpRefStayInOldGby(currentOpRef);
+            Mutable<ILogicalOperator> bottomOpRef = findBottomOpRefStayInOldGby(nestedGby, currentOpRef);
 
             // Adds the used variables in the pipeline from <code>currentOpRef</code> to <code>bottomOpRef</code>
             // into the group-by keys for the introduced combiner group-by operator.
@@ -392,16 +392,29 @@
      * Find the bottom-most nested operator reference in the query pipeline rooted at <code>currentOpRef</code>
      * that cannot be pushed into the combiner group-by operator.
      *
-     * @param currentOpRef
+     * @param nestedGby
+     *            the nested group-by operator.
+     * @param currentOpRef,the
+     *            reference of the current op.
      * @return the bottom-most reference of a select operator
      */
-    private Mutable<ILogicalOperator> findBottomOpRefStayInOldGby(Mutable<ILogicalOperator> currentOpRef)
+    private Mutable<ILogicalOperator> findBottomOpRefStayInOldGby(GroupByOperator nestedGby,
+            Mutable<ILogicalOperator> currentOpRef)
             throws AlgebricksException {
+        Set<LogicalVariable> usedVarsInNestedGby = new HashSet<>();
+        // Collects used variables in nested pipelines.
+        for (ILogicalPlan nestedPlan : nestedGby.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> rootOpRef : nestedPlan.getRoots()) {
+                VariableUtilities.getUsedVariablesInDescendantsAndSelf(rootOpRef.getValue(), usedVarsInNestedGby);
+            }
+        }
         Mutable<ILogicalOperator> bottomOpRef = currentOpRef;
         while (currentOpRef.getValue().getInputs().size() > 0) {
-            Set<LogicalVariable> producedVars = new HashSet<>();
-            VariableUtilities.getProducedVariables(currentOpRef.getValue(), producedVars);
-            if (currentOpRef.getValue().getOperatorTag() == LogicalOperatorTag.SELECT || !producedVars.isEmpty()) {
+            // Used for checking the dependency between nestedGby and the current operator
+            Set<LogicalVariable> dependingVars = new HashSet<>();
+            VariableUtilities.getProducedVariables(currentOpRef.getValue(), dependingVars);
+            dependingVars.removeAll(usedVarsInNestedGby);
+            if (currentOpRef.getValue().getOperatorTag() == LogicalOperatorTag.SELECT || !dependingVars.isEmpty()) {
                 bottomOpRef = currentOpRef;
             }
             currentOpRef = currentOpRef.getValue().getInputs().get(0);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
index 3a86565..af95ecd 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
@@ -20,13 +20,18 @@
 
 package org.apache.hyracks.algebricks.rewriter.rules.subplan;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Deque;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -40,7 +45,9 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 /**
@@ -51,151 +58,209 @@
  */
 
 public class PushSubplanIntoGroupByRule implements IAlgebraicRewriteRule {
-    /** Stores used variables above the current operator. */
-    private final Set<LogicalVariable> usedVarsSoFar = new HashSet<LogicalVariable>();
+    /** The pointer to the topmost operator */
+    private Mutable<ILogicalOperator> rootRef;
+    /** Whether the rule has ever been invoked */
+    private boolean invoked = false;
 
     @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
-        return false;
+        if (!invoked) {
+            rootRef = opRef;
+            invoked = true;
+        }
+        return rewriteForOperator(rootRef, opRef, context);
     }
 
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
-        ILogicalOperator parentOperator = opRef.getValue();
-        if (context.checkIfInDontApplySet(this, parentOperator)) {
-            return false;
-        }
-        context.addToDontApplySet(this, parentOperator);
-        VariableUtilities.getUsedVariables(parentOperator, usedVarsSoFar);
-        if (parentOperator.getInputs().size() <= 0) {
-            return false;
-        }
+    // The core rewriting function for an operator.
+    private boolean rewriteForOperator(Mutable<ILogicalOperator> rootRef, Mutable<ILogicalOperator> opRef,
+            IOptimizationContext context) throws AlgebricksException {
         boolean changed = false;
-        GroupByOperator gby = null;
+        ILogicalOperator parentOperator = opRef.getValue();
         for (Mutable<ILogicalOperator> ref : parentOperator.getInputs()) {
-            AbstractLogicalOperator op = (AbstractLogicalOperator) ref.getValue();
-            /** Only processes subplan operator. */
-            List<SubplanOperator> subplans = new ArrayList<SubplanOperator>();
-            if (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
-                while (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
-                    SubplanOperator currentSubplan = (SubplanOperator) op;
-                    subplans.add(currentSubplan);
-                    op = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+            ILogicalOperator op = ref.getValue();
+            // Only processes subplan operator.
+            Deque<SubplanOperator> subplans = new ArrayDeque<>();
+            if (op.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
+                // Recursively rewrites the child plan.
+                changed |= rewriteForOperator(rootRef, ref, context);
+                continue;
+            }
+            while (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
+                SubplanOperator currentSubplan = (SubplanOperator) op;
+                // Recursively rewrites the pipelines inside a nested subplan.
+                for (ILogicalPlan subplan : currentSubplan.getNestedPlans()) {
+                    for (Mutable<ILogicalOperator> nestedRootRef : subplan.getRoots()) {
+                        changed |= rewriteForOperator(nestedRootRef, nestedRootRef, context);
+                    }
                 }
-                /** Only processes the case a group-by operator is the input of the subplan operators. */
-                if (op.getOperatorTag() == LogicalOperatorTag.GROUP) {
-                    gby = (GroupByOperator) op;
-                    List<ILogicalPlan> newGbyNestedPlans = new ArrayList<ILogicalPlan>();
-                    for (SubplanOperator subplan : subplans) {
-                        List<ILogicalPlan> subplanNestedPlans = subplan.getNestedPlans();
-                        List<ILogicalPlan> gbyNestedPlans = gby.getNestedPlans();
-                        List<ILogicalPlan> subplanNestedPlansToRemove = new ArrayList<ILogicalPlan>();
-                        for (ILogicalPlan subplanNestedPlan : subplanNestedPlans) {
-                            List<Mutable<ILogicalOperator>> rootOpRefs = subplanNestedPlan.getRoots();
-                            List<Mutable<ILogicalOperator>> rootOpRefsToRemove = new ArrayList<Mutable<ILogicalOperator>>();
-                            for (Mutable<ILogicalOperator> rootOpRef : rootOpRefs) {
-                                /** Gets free variables in the root operator of a nested plan and its descent. */
-                                Set<LogicalVariable> freeVars = new ListSet<LogicalVariable>();
-                                VariableUtilities.getUsedVariablesInDescendantsAndSelf(rootOpRef.getValue(), freeVars);
-                                Set<LogicalVariable> producedVars = new ListSet<LogicalVariable>();
-                                VariableUtilities.getProducedVariablesInDescendantsAndSelf(rootOpRef.getValue(),
-                                        producedVars);
-                                freeVars.removeAll(producedVars);
-                                /** * Checks whether the above freeVars are all contained in live variables * of one nested plan inside the group-by operator. * If yes, then the subplan can be pushed into the nested plan of the group-by. */
-                                for (ILogicalPlan gbyNestedPlanOriginal : gbyNestedPlans) {
-                                    // add a subplan in the original gby
-                                    if (!newGbyNestedPlans.contains(gbyNestedPlanOriginal)) {
-                                        newGbyNestedPlans.add(gbyNestedPlanOriginal);
-                                    }
-
-                                    // add a pushed subplan
-                                    ILogicalPlan gbyNestedPlan = OperatorManipulationUtil.deepCopy(
-                                            gbyNestedPlanOriginal, context);
-                                    List<Mutable<ILogicalOperator>> gbyRootOpRefs = gbyNestedPlan.getRoots();
-                                    for (int rootIndex = 0; rootIndex < gbyRootOpRefs.size(); rootIndex++) {
-                                        //set the nts for a original subplan
-                                        Mutable<ILogicalOperator> originalGbyRootOpRef = gbyNestedPlanOriginal
-                                                .getRoots().get(rootIndex);
-                                        Mutable<ILogicalOperator> originalGbyNtsRef = downToNts(originalGbyRootOpRef);
-                                        NestedTupleSourceOperator originalNts = (NestedTupleSourceOperator) originalGbyNtsRef
-                                                .getValue();
-                                        originalNts.setDataSourceReference(new MutableObject<ILogicalOperator>(gby));
-
-                                        //push a new subplan if possible
-                                        Mutable<ILogicalOperator> gbyRootOpRef = gbyRootOpRefs.get(rootIndex);
-                                        Set<LogicalVariable> liveVars = new ListSet<LogicalVariable>();
-                                        VariableUtilities.getLiveVariables(gbyRootOpRef.getValue(), liveVars);
-                                        if (liveVars.containsAll(freeVars)) {
-                                            /** Does the actual push. */
-                                            Mutable<ILogicalOperator> ntsRef = downToNts(rootOpRef);
-                                            ntsRef.setValue(gbyRootOpRef.getValue());
-                                            // Removes unused vars.
-                                            AggregateOperator aggOp = (AggregateOperator) gbyRootOpRef.getValue();
-                                            for (int varIndex = aggOp.getVariables().size() - 1; varIndex >= 0; varIndex--) {
-                                                if (!freeVars.contains(aggOp.getVariables().get(varIndex))) {
-                                                    aggOp.getVariables().remove(varIndex);
-                                                    aggOp.getExpressions().remove(varIndex);
-                                                }
-                                            }
-
-                                            gbyRootOpRef.setValue(rootOpRef.getValue());
-                                            rootOpRefsToRemove.add(rootOpRef);
-
-                                            // Sets the nts for a new pushed plan.
-                                            Mutable<ILogicalOperator> oldGbyNtsRef = downToNts(gbyRootOpRef);
-                                            NestedTupleSourceOperator nts = (NestedTupleSourceOperator) oldGbyNtsRef
-                                                    .getValue();
-                                            nts.setDataSourceReference(new MutableObject<ILogicalOperator>(gby));
-
-                                            newGbyNestedPlans.add(gbyNestedPlan);
-                                            changed = true;
-                                            continue;
-                                        }
-                                    }
-                                }
-                            }
-                            rootOpRefs.removeAll(rootOpRefsToRemove);
-                            if (rootOpRefs.size() == 0) {
-                                subplanNestedPlansToRemove.add(subplanNestedPlan);
-                            }
-                        }
-                        subplanNestedPlans.removeAll(subplanNestedPlansToRemove);
-                    }
-                    if (changed) {
-                        ref.setValue(gby);
-                        gby.getNestedPlans().clear();
-                        gby.getNestedPlans().addAll(newGbyNestedPlans);
-                    }
+                subplans.addFirst(currentSubplan);
+                op = op.getInputs().get(0).getValue();
+            }
+            // Only processes the case a group-by operator is the input of the subplan operators.
+            if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
+                continue;
+            }
+            GroupByOperator gby = (GroupByOperator) op;
+            // Recursively rewrites the pipelines inside a nested subplan.
+            for (ILogicalPlan subplan : gby.getNestedPlans()) {
+                for (Mutable<ILogicalOperator> nestedRootRef : subplan.getRoots()) {
+                    changed |= rewriteForOperator(nestedRootRef, nestedRootRef, context);
                 }
             }
+            changed |= pushSubplansIntoGroupBy(rootRef, parentOperator, subplans, gby, context);
         }
-        if (changed) {
-            cleanup(gby);
-            context.computeAndSetTypeEnvironmentForOperator(gby);
-            context.computeAndSetTypeEnvironmentForOperator(parentOperator);
+        return changed;
+    }
+
+    // Pushes subplans into the group by operator.
+    private boolean pushSubplansIntoGroupBy(Mutable<ILogicalOperator> currentRootRef, ILogicalOperator parentOperator,
+            Deque<SubplanOperator> subplans, GroupByOperator gby, IOptimizationContext context)
+            throws AlgebricksException {
+        boolean changed = false;
+        List<ILogicalPlan> newGbyNestedPlans = new ArrayList<>();
+        List<ILogicalPlan> originalNestedPlansInGby = gby.getNestedPlans();
+
+        // Adds all original subplans from the group by.
+        for (ILogicalPlan gbyNestedPlanOriginal : originalNestedPlansInGby) {
+            newGbyNestedPlans.add(gbyNestedPlanOriginal);
         }
+
+        // Tries to push subplans into the group by.
+        Iterator<SubplanOperator> subplanOperatorIterator = subplans.iterator();
+        while (subplanOperatorIterator.hasNext()) {
+            SubplanOperator subplan = subplanOperatorIterator.next();
+            Iterator<ILogicalPlan> subplanNestedPlanIterator = subplan.getNestedPlans().iterator();
+            while (subplanNestedPlanIterator.hasNext()) {
+                ILogicalPlan subplanNestedPlan = subplanNestedPlanIterator.next();
+                List<Mutable<ILogicalOperator>> upperSubplanRootRefs = subplanNestedPlan.getRoots();
+                Iterator<Mutable<ILogicalOperator>> upperSubplanRootRefIterator = upperSubplanRootRefs.iterator();
+                while (upperSubplanRootRefIterator.hasNext()) {
+                    Mutable<ILogicalOperator> rootOpRef = upperSubplanRootRefIterator.next();
+
+                    // Collects free variables in the root operator of a nested plan and its descent.
+                    Set<LogicalVariable> freeVars = new ListSet<>();
+                    OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) rootOpRef.getValue(),
+                            freeVars);
+
+                    // Checks whether the above freeVars are all contained in live variables * of one nested plan
+                    // inside the group-by operator. If yes, then the subplan can be pushed into the nested plan
+                    // of the group-by.
+                    for (ILogicalPlan gbyNestedPlanOriginal : originalNestedPlansInGby) {
+                        ILogicalPlan gbyNestedPlan = OperatorManipulationUtil.deepCopy(gbyNestedPlanOriginal, context);
+                        List<Mutable<ILogicalOperator>> gbyRootOpRefs = gbyNestedPlan.getRoots();
+                        for (int rootIndex = 0; rootIndex < gbyRootOpRefs.size(); rootIndex++) {
+                            // Sets the nts for a original subplan.
+                            Mutable<ILogicalOperator> originalGbyRootOpRef = gbyNestedPlan.getRoots().get(rootIndex);
+                            Mutable<ILogicalOperator> originalGbyNtsRef = downToNts(originalGbyRootOpRef);
+                            NestedTupleSourceOperator originalNts = (NestedTupleSourceOperator) originalGbyNtsRef
+                                    .getValue();
+                            originalNts.setDataSourceReference(new MutableObject<>(gby));
+
+                            // Pushes a new subplan if possible.
+                            Mutable<ILogicalOperator> gbyRootOpRef = gbyRootOpRefs.get(rootIndex);
+                            Set<LogicalVariable> liveVars = new ListSet<>();
+                            VariableUtilities.getLiveVariables(gbyRootOpRef.getValue(), liveVars);
+                            if (!liveVars.containsAll(freeVars)) {
+                                continue;
+                            }
+
+                            AggregateOperator aggOp = (AggregateOperator) gbyRootOpRef.getValue();
+                            for (int varIndex = aggOp.getVariables().size() - 1; varIndex >= 0; varIndex--) {
+                                if (!freeVars.contains(aggOp.getVariables().get(varIndex))) {
+                                    aggOp.getVariables().remove(varIndex);
+                                    aggOp.getExpressions().remove(varIndex);
+                                }
+                            }
+
+                            // Copy the original nested pipeline inside the group-by.
+                            Pair<ILogicalOperator, Map<LogicalVariable, LogicalVariable>> copiedAggOpAndVarMap =
+                                    OperatorManipulationUtil.deepCopyWithNewVars(aggOp, context);
+                            ILogicalOperator newBottomAgg = copiedAggOpAndVarMap.getLeft();
+
+                            // Substitutes variables in the upper nested pipe line.
+                            VariableUtilities.substituteVariablesInDescendantsAndSelf(rootOpRef.getValue(),
+                                    copiedAggOpAndVarMap.getRight(), context);
+
+                            // Does the actual push.
+                            Mutable<ILogicalOperator> ntsRef = downToNts(rootOpRef);
+                            ntsRef.setValue(newBottomAgg);
+                            gbyRootOpRef.setValue(rootOpRef.getValue());
+
+                            // Sets the nts for a new pushed plan.
+                            Mutable<ILogicalOperator> oldGbyNtsRef = downToNts(new MutableObject<>(newBottomAgg));
+                            NestedTupleSourceOperator nts = (NestedTupleSourceOperator) oldGbyNtsRef.getValue();
+                            nts.setDataSourceReference(new MutableObject<>(gby));
+
+                            OperatorManipulationUtil.computeTypeEnvironmentBottomUp(rootOpRef.getValue(), context);
+                            newGbyNestedPlans.add(new ALogicalPlanImpl(rootOpRef));
+
+                            upperSubplanRootRefIterator.remove();
+                            changed |= true;
+                            break;
+                        }
+                    }
+                }
+
+                if (upperSubplanRootRefs.isEmpty()) {
+                    subplanNestedPlanIterator.remove();
+                }
+            }
+            if (subplan.getNestedPlans().isEmpty()) {
+                subplanOperatorIterator.remove();
+            }
+        }
+
+        // Resets the nested subplans for the group-by operator.
+        gby.getNestedPlans().clear();
+        gby.getNestedPlans().addAll(newGbyNestedPlans);
+
+        // Connects the group-by operator with its parent operator.
+        ILogicalOperator parent = !subplans.isEmpty() ? subplans.getFirst() : parentOperator;
+        parent.getInputs().get(0).setValue(gby);
+
+        // Removes unnecessary pipelines inside the group by operator.
+        cleanup(currentRootRef.getValue(), gby);
+
+        // Computes type environments.
+        context.computeAndSetTypeEnvironmentForOperator(gby);
+        context.computeAndSetTypeEnvironmentForOperator(parent);
         return changed;
     }
 
     /**
      * Removes unused aggregation variables (and expressions)
      *
-     * @param gby
+     * @param rootOp,
+     *            the root operator of a plan or nested plan.
+     * @param gby,
+     *            the group-by operator.
      * @throws AlgebricksException
      */
-    private void cleanup(GroupByOperator gby) throws AlgebricksException {
-        for (ILogicalPlan nestedPlan : gby.getNestedPlans()) {
-            for (Mutable<ILogicalOperator> rootRef : nestedPlan.getRoots()) {
-                AggregateOperator aggOp = (AggregateOperator) rootRef.getValue();
+    private void cleanup(ILogicalOperator rootOp, GroupByOperator gby) throws AlgebricksException {
+        Set<LogicalVariable> freeVars = new HashSet<>();
+        OperatorPropertiesUtil.getFreeVariablesInPath(rootOp, gby, freeVars);
+        Iterator<ILogicalPlan> nestedPlanIterator = gby.getNestedPlans().iterator();
+        while (nestedPlanIterator.hasNext()) {
+            ILogicalPlan nestedPlan = nestedPlanIterator.next();
+            Iterator<Mutable<ILogicalOperator>> nestRootRefIterator = nestedPlan.getRoots().iterator();
+            while (nestRootRefIterator.hasNext()) {
+                Mutable<ILogicalOperator> nestRootRef = nestRootRefIterator.next();
+                AggregateOperator aggOp = (AggregateOperator) nestRootRef.getValue();
                 for (int varIndex = aggOp.getVariables().size() - 1; varIndex >= 0; varIndex--) {
-                    if (!usedVarsSoFar.contains(aggOp.getVariables().get(varIndex))) {
+                    if (!freeVars.contains(aggOp.getVariables().get(varIndex))) {
                         aggOp.getVariables().remove(varIndex);
                         aggOp.getExpressions().remove(varIndex);
                     }
                 }
+                if (aggOp.getVariables().isEmpty()) {
+                    nestRootRefIterator.remove();
+                }
             }
-
+            if (nestedPlan.getRoots().isEmpty()) {
+                nestedPlanIterator.remove();
+            }
         }
     }