Merge branch 'gerrit/cheshire-cat'

Change-Id: I30c4b693504c22aba85cdd5bc5f8fe34e262e29a
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/AsterixMoveFreeVariableOperatorOutOfSubplanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/AsterixMoveFreeVariableOperatorOutOfSubplanRule.java
index 4d6b4bb..66dcc83 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/AsterixMoveFreeVariableOperatorOutOfSubplanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/AsterixMoveFreeVariableOperatorOutOfSubplanRule.java
@@ -18,13 +18,22 @@
  */
 package org.apache.asterix.optimizer.rules.subplan;
 
+import java.util.Set;
+
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.rewriter.rules.subplan.MoveFreeVariableOperatorOutOfSubplanRule;
 
 public class AsterixMoveFreeVariableOperatorOutOfSubplanRule extends MoveFreeVariableOperatorOutOfSubplanRule {
 
     @Override
-    protected boolean movableOperator(LogicalOperatorTag operatorTag) {
+    protected boolean movableOperatorKind(LogicalOperatorTag operatorTag) {
         return (operatorTag == LogicalOperatorTag.ASSIGN);
     }
+
+    @Override
+    protected boolean movableIndependentOperator(ILogicalOperator op, Set<LogicalVariable> usedVars) {
+        return !usedVars.isEmpty();
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/leftouterjoin/loj-03-no-listify.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/leftouterjoin/loj-03-no-listify.sqlpp
new file mode 100644
index 0000000..510e26c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/leftouterjoin/loj-03-no-listify.sqlpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Test that listify() is eliminated
+ *              on the right side of an outer join
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE tasksType AS {
+  id : integer
+};
+
+CREATE DATASET tasks(tasksType) PRIMARY KEY id;
+
+SELECT t0.taskId, t0.cnt_all, t1.cnt_x, t2.cnt_y, t3.cnt_z
+FROM (
+  SELECT taskId, COUNT(1) AS cnt_all FROM tasks GROUP BY taskId ORDER BY taskId
+) AS t0
+LEFT OUTER JOIN (
+  SELECT taskId, COUNT(1) AS cnt_x FROM tasks WHERE status="x" GROUP BY taskId
+) AS t1 ON t0.taskId = t1.taskId
+LEFT OUTER JOIN (
+  SELECT taskId, COUNT(1) AS cnt_y FROM tasks WHERE status="y" GROUP BY taskId
+) AS t2 ON t0.taskId = t2.taskId
+LEFT OUTER JOIN (
+  SELECT taskId, COUNT(1) AS cnt_z FROM tasks WHERE status="z" GROUP BY taskId
+) AS t3 ON t0.taskId = t3.taskId
+ORDER BY t0.taskId;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/loj-03-no-listify.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/loj-03-no-listify.plan
new file mode 100644
index 0000000..6d569dc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/loj-03-no-listify.plan
@@ -0,0 +1,127 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$taskId(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$taskId(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- HYBRID_HASH_JOIN [$$taskId][$$taskId]  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- HYBRID_HASH_JOIN [$$taskId][$$taskId]  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- HYBRID_HASH_JOIN [$$taskId][$$taskId]  |PARTITIONED|
+                                    -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$taskId(ASC)] HASH:[$$taskId]  |PARTITIONED|
+                                      -- SORT_GROUP_BY[$$279]  |PARTITIONED|
+                                              {
+                                                -- AGGREGATE  |LOCAL|
+                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                              }
+                                        -- HASH_PARTITION_EXCHANGE [$$279]  |PARTITIONED|
+                                          -- SORT_GROUP_BY[$$242]  |PARTITIONED|
+                                                  {
+                                                    -- AGGREGATE  |LOCAL|
+                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                  }
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- REPLICATE  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- DATASOURCE_SCAN (test.tasks)  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- SORT_GROUP_BY[$$281]  |PARTITIONED|
+                                                    {
+                                                      -- AGGREGATE  |LOCAL|
+                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                    }
+                                              -- HASH_PARTITION_EXCHANGE [$$281]  |PARTITIONED|
+                                                -- SORT_GROUP_BY[$$243]  |PARTITIONED|
+                                                        {
+                                                          -- AGGREGATE  |LOCAL|
+                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                        }
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- STREAM_SELECT  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- REPLICATE  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN (test.tasks)  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- SORT_GROUP_BY[$$283]  |PARTITIONED|
+                                            {
+                                              -- AGGREGATE  |LOCAL|
+                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                            }
+                                      -- HASH_PARTITION_EXCHANGE [$$283]  |PARTITIONED|
+                                        -- SORT_GROUP_BY[$$244]  |PARTITIONED|
+                                                {
+                                                  -- AGGREGATE  |LOCAL|
+                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                }
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              -- STREAM_SELECT  |PARTITIONED|
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  -- ASSIGN  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- REPLICATE  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- DATASOURCE_SCAN (test.tasks)  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- SORT_GROUP_BY[$$285]  |PARTITIONED|
+                                    {
+                                      -- AGGREGATE  |LOCAL|
+                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                    }
+                              -- HASH_PARTITION_EXCHANGE [$$285]  |PARTITIONED|
+                                -- SORT_GROUP_BY[$$245]  |PARTITIONED|
+                                        {
+                                          -- AGGREGATE  |LOCAL|
+                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                        }
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- STREAM_SELECT  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- REPLICATE  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- ASSIGN  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- DATASOURCE_SCAN (test.tasks)  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/loj-03-no-listify/loj-03-no-listify.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/loj-03-no-listify/loj-03-no-listify.1.ddl.sqlpp
new file mode 100644
index 0000000..bc81bff
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/loj-03-no-listify/loj-03-no-listify.1.ddl.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE tasksType AS {
+  id : integer
+};
+
+CREATE DATASET tasks(tasksType) PRIMARY KEY id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/loj-03-no-listify/loj-03-no-listify.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/loj-03-no-listify/loj-03-no-listify.2.update.sqlpp
new file mode 100644
index 0000000..061d357
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/loj-03-no-listify/loj-03-no-listify.2.update.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+INSERT INTO tasks
+([
+  {"id": 1,  "taskId":1, "status":"x"},
+  {"id": 2,  "taskId":1, "status":"x"},
+  {"id": 3,  "taskId":1, "status":"y"},
+  {"id": 4,  "taskId":1, "status":"y"},
+  {"id": 5,  "taskId":1, "status":"z"},
+  {"id": 6,  "taskId":1, "status":"z"},
+  {"id": 7,  "taskId":2, "status":"x"},
+  {"id": 8,  "taskId":2, "status":"x"},
+  {"id": 9,  "taskId":2, "status":"y"},
+  {"id": 10, "taskId":2, "status":"y"},
+  {"id": 11, "taskId":2, "status":"z"},
+  {"id": 12, "taskId":2, "status":"z"}
+]);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/loj-03-no-listify/loj-03-no-listify.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/loj-03-no-listify/loj-03-no-listify.3.query.sqlpp
new file mode 100644
index 0000000..d64e711
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/loj-03-no-listify/loj-03-no-listify.3.query.sqlpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Test that listify() is eliminated
+ *              on the right side of an outer join
+ */
+
+USE test;
+
+SELECT t0.taskId, t0.cnt_all, t1.cnt_x, t2.cnt_y, t3.cnt_z
+FROM (
+  SELECT taskId, COUNT(1) AS cnt_all FROM tasks GROUP BY taskId ORDER BY taskId
+) AS t0
+LEFT OUTER JOIN (
+  SELECT taskId, COUNT(1) AS cnt_x FROM tasks WHERE status="x" GROUP BY taskId
+) AS t1 ON t0.taskId = t1.taskId
+LEFT OUTER JOIN (
+  SELECT taskId, COUNT(1) AS cnt_y FROM tasks WHERE status="y" GROUP BY taskId
+) AS t2 ON t0.taskId = t2.taskId
+LEFT OUTER JOIN (
+  SELECT taskId, COUNT(1) AS cnt_z FROM tasks WHERE status="z" GROUP BY taskId
+) AS t3 ON t0.taskId = t3.taskId
+ORDER BY t0.taskId;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/loj-03-no-listify/loj-03-no-listify.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/loj-03-no-listify/loj-03-no-listify.3.adm
new file mode 100644
index 0000000..466b846
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/loj-03-no-listify/loj-03-no-listify.3.adm
@@ -0,0 +1,2 @@
+{ "cnt_all": 6, "cnt_x": 2, "cnt_y": 2, "cnt_z": 2, "taskId": 1 }
+{ "cnt_all": 6, "cnt_x": 2, "cnt_y": 2, "cnt_z": 2, "taskId": 2 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index a6e06ba..22c2f7d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -12852,6 +12852,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="leftouterjoin">
+      <compilation-unit name="loj-03-no-listify">
+        <output-dir compare="Text">loj-03-no-listify</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="leftouterjoin">
       <compilation-unit name="query_issue658">
         <output-dir compare="Text">query_issue658</output-dir>
       </compilation-unit>
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/MoveFreeVariableOperatorOutOfSubplanRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/MoveFreeVariableOperatorOutOfSubplanRule.java
index 94cae74..66ee3a4 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/MoveFreeVariableOperatorOutOfSubplanRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/MoveFreeVariableOperatorOutOfSubplanRule.java
@@ -92,8 +92,7 @@
                     ILogicalOperator childOp = childOpRef.getValue();
 
                     // Try to move operators that only uses free variables out of the subplan.
-                    if (movableOperator(currentOp.getOperatorTag())
-                            && independentOperator(currentOp, liveVarsBeforeSubplan)
+                    if (movableOperator(currentOp, liveVarsBeforeSubplan)
                             && producedVariablesCanbePropagated(currentOp)) {
                         extractOperator(subplanOp, inputOp, currentOpRef);
                         inputOp = currentOp;
@@ -109,12 +108,17 @@
         return changed;
     }
 
-    // Checks whether the current operator is independent of the nested input pipeline in the subplan.
-    private boolean independentOperator(ILogicalOperator op, Set<LogicalVariable> liveVarsBeforeSubplan)
+    private boolean movableOperator(ILogicalOperator op, Set<LogicalVariable> liveVarsBeforeSubplan)
             throws AlgebricksException {
+        if (!movableOperatorKind(op.getOperatorTag())) {
+            return false;
+        }
+
+        // Checks whether the current operator is independent of the nested input pipeline in the subplan.
         Set<LogicalVariable> usedVars = new HashSet<>();
         VariableUtilities.getUsedVariables(op, usedVars);
-        return liveVarsBeforeSubplan.containsAll(usedVars);
+        boolean independent = liveVarsBeforeSubplan.containsAll(usedVars);
+        return independent && movableIndependentOperator(op, usedVars);
     }
 
     // Checks whether there is a variable killing operator in the nested pipeline
@@ -152,7 +156,11 @@
         currentOp.getInputs().get(0).setValue(inputOp);
     }
 
-    protected boolean movableOperator(LogicalOperatorTag operatorTag) {
-        return (operatorTag == LogicalOperatorTag.ASSIGN || operatorTag == LogicalOperatorTag.SUBPLAN);
+    protected boolean movableOperatorKind(LogicalOperatorTag operatorTag) {
+        return operatorTag == LogicalOperatorTag.ASSIGN || operatorTag == LogicalOperatorTag.SUBPLAN;
+    }
+
+    protected boolean movableIndependentOperator(ILogicalOperator op, Set<LogicalVariable> usedVars) {
+        return true;
     }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
index 6d5d67d..420bf12 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
@@ -70,29 +70,30 @@
             rootRef = opRef;
             invoked = true;
         }
-        return rewriteForOperator(rootRef, opRef, context);
+        return rewriteForOperator(rootRef, opRef.getValue(), context);
     }
 
     // The core rewriting function for an operator.
-    private boolean rewriteForOperator(Mutable<ILogicalOperator> rootRef, Mutable<ILogicalOperator> opRef,
+    private boolean rewriteForOperator(Mutable<ILogicalOperator> rootRef, ILogicalOperator parentOperator,
             IOptimizationContext context) throws AlgebricksException {
         boolean changed = false;
-        ILogicalOperator parentOperator = opRef.getValue();
-        for (Mutable<ILogicalOperator> ref : parentOperator.getInputs()) {
+        List<Mutable<ILogicalOperator>> parentInputs = parentOperator.getInputs();
+        for (int i = 0, n = parentInputs.size(); i < n; i++) {
+            Mutable<ILogicalOperator> ref = parentInputs.get(i);
             ILogicalOperator op = ref.getValue();
             // Only processes subplan operator.
-            Deque<SubplanOperator> subplans = new ArrayDeque<>();
             if (op.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
                 // Recursively rewrites the child plan.
-                changed |= rewriteForOperator(rootRef, ref, context);
+                changed |= rewriteForOperator(rootRef, op, context);
                 continue;
             }
+            Deque<SubplanOperator> subplans = new ArrayDeque<>();
             while (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
                 SubplanOperator currentSubplan = (SubplanOperator) op;
                 // Recursively rewrites the pipelines inside a nested subplan.
                 for (ILogicalPlan subplan : currentSubplan.getNestedPlans()) {
                     for (Mutable<ILogicalOperator> nestedRootRef : subplan.getRoots()) {
-                        changed |= rewriteForOperator(nestedRootRef, nestedRootRef, context);
+                        changed |= rewriteForOperator(nestedRootRef, nestedRootRef.getValue(), context);
                     }
                 }
                 subplans.addFirst(currentSubplan);
@@ -106,17 +107,17 @@
             // Recursively rewrites the pipelines inside a nested subplan.
             for (ILogicalPlan subplan : gby.getNestedPlans()) {
                 for (Mutable<ILogicalOperator> nestedRootRef : subplan.getRoots()) {
-                    changed |= rewriteForOperator(nestedRootRef, nestedRootRef, context);
+                    changed |= rewriteForOperator(nestedRootRef, nestedRootRef.getValue(), context);
                 }
             }
-            changed |= pushSubplansIntoGroupBy(rootRef, parentOperator, subplans, gby, context);
+            changed |= pushSubplansIntoGroupBy(rootRef, parentOperator, i, subplans, gby, context);
         }
         return changed;
     }
 
     // Pushes subplans into the group by operator.
     private boolean pushSubplansIntoGroupBy(Mutable<ILogicalOperator> currentRootRef, ILogicalOperator parentOperator,
-            Deque<SubplanOperator> subplans, GroupByOperator gby, IOptimizationContext context)
+            int parentChildIdx, Deque<SubplanOperator> subplans, GroupByOperator gby, IOptimizationContext context)
             throws AlgebricksException {
         boolean changed = false;
         List<ILogicalPlan> newGbyNestedPlans = new ArrayList<>();
@@ -226,8 +227,16 @@
         gby.getNestedPlans().addAll(newGbyNestedPlans);
 
         // Connects the group-by operator with its parent operator.
-        ILogicalOperator parent = !subplans.isEmpty() ? subplans.getFirst() : parentOperator;
-        parent.getInputs().get(0).setValue(gby);
+        ILogicalOperator parent;
+        int childIdx;
+        if (!subplans.isEmpty()) {
+            parent = subplans.getFirst();
+            childIdx = 0;
+        } else {
+            parent = parentOperator;
+            childIdx = parentChildIdx;
+        }
+        parent.getInputs().get(childIdx).setValue(gby);
 
         // Removes unnecessary pipelines inside the group by operator.
         changed |= cleanup(currentRootRef.getValue(), gby);