ASTERIXDB-1081, ASTERIXDB-1086, ASTERIXDB-1246: proper multiple plan paths handling

 - ASTERIXDB-1081: Fixed RemoveUnusedAssignAndAggregateRule to reflect multiple paths in the plan.
 - ASTERIXDB-1086: Fixed IntroduceProjectsRule to reflect multiples paths in the plan.
 - ASTERIXDB-1246: Fixed RemoveRedundantGroupByDecorVarsRule to remove duplicate/unnecessary
                   decor variables before IntroduceProjects rule fires.

Change-Id: I69e055572f024f28a857d4e64916b4806e63c083
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1073
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index e291dc1..cd8d747 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -115,7 +115,7 @@
 import org.apache.hyracks.algebricks.rewriter.rules.PushUnnestDownThroughUnionRule;
 import org.apache.hyracks.algebricks.rewriter.rules.ReinferAllTypesRule;
 import org.apache.hyracks.algebricks.rewriter.rules.RemoveCartesianProductWithEmptyBranchRule;
-import org.apache.hyracks.algebricks.rewriter.rules.RemoveRedundantGroupByDecorVars;
+import org.apache.hyracks.algebricks.rewriter.rules.RemoveRedundantGroupByDecorVarsRule;
 import org.apache.hyracks.algebricks.rewriter.rules.RemoveRedundantVariablesRule;
 import org.apache.hyracks.algebricks.rewriter.rules.RemoveUnnecessarySortMergeExchange;
 import org.apache.hyracks.algebricks.rewriter.rules.RemoveUnusedAssignAndAggregateRule;
@@ -265,7 +265,7 @@
         consolidation.add(new IntroduceAggregateCombinerRule());
         consolidation.add(new CountVarToCountOneRule());
         consolidation.add(new RemoveUnusedAssignAndAggregateRule());
-        consolidation.add(new RemoveRedundantGroupByDecorVars());
+        consolidation.add(new RemoveRedundantGroupByDecorVarsRule());
         //PushUnnestDownUnion => RemoveRedundantListifyRule cause these rules are correlated
         consolidation.add(new PushUnnestDownThroughUnionRule());
         consolidation.add(new RemoveRedundantListifyRule());
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
index 9087eeb..931ec7d 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
@@ -9,57 +9,53 @@
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
             -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$82(ASC)] HASH:[$$82]  |PARTITIONED|
-              -- SORT_GROUP_BY[$$58]  |PARTITIONED|
+              -- SORT_GROUP_BY[$$11]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
                           -- NESTED_TUPLE_SOURCE  |LOCAL|
                       }
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- STREAM_PROJECT  |PARTITIONED|
-                    -- ASSIGN  |PARTITIONED|
+                    -- STREAM_SELECT  |PARTITIONED|
                       -- STREAM_PROJECT  |PARTITIONED|
-                        -- STREAM_SELECT  |PARTITIONED|
-                          -- STREAM_PROJECT  |PARTITIONED|
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- PRE_CLUSTERED_GROUP_BY[$$79]  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- PRE_CLUSTERED_GROUP_BY[$$79]  |PARTITIONED|
+                                  {
+                                    -- AGGREGATE  |LOCAL|
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                  }
+                            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$79(ASC)] HASH:[$$79]  |PARTITIONED|
+                              -- PRE_CLUSTERED_GROUP_BY[$$75]  |PARTITIONED|
                                       {
                                         -- AGGREGATE  |LOCAL|
-                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                          -- STREAM_SELECT  |LOCAL|
+                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
                                       }
-                                -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$79(ASC)] HASH:[$$79]  |PARTITIONED|
-                                  -- PRE_CLUSTERED_GROUP_BY[$$75]  |PARTITIONED|
-                                          {
-                                            -- AGGREGATE  |LOCAL|
-                                              -- STREAM_SELECT  |LOCAL|
-                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                          }
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STABLE_SORT [$$75(ASC)]  |PARTITIONED|
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- STABLE_SORT [$$75(ASC)]  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- HYBRID_HASH_JOIN [$$62][$$70]  |PARTITIONED|
-                                                -- HASH_PARTITION_EXCHANGE [$$62]  |PARTITIONED|
-                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                    -- ASSIGN  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- HYBRID_HASH_JOIN [$$65][$$11]  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$65]  |PARTITIONED|
-                                                              -- UNNEST  |UNPARTITIONED|
-                                                                -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
-                                                              -- ASSIGN  |PARTITIONED|
-                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                  -- ASSIGN  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                -- HASH_PARTITION_EXCHANGE [$$70]  |PARTITIONED|
-                                                  -- ASSIGN  |PARTITIONED|
-                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                          -- HYBRID_HASH_JOIN [$$62][$$70]  |PARTITIONED|
+                                            -- HASH_PARTITION_EXCHANGE [$$62]  |PARTITIONED|
+                                              -- ASSIGN  |PARTITIONED|
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- HYBRID_HASH_JOIN [$$65][$$11]  |PARTITIONED|
+                                                      -- HASH_PARTITION_EXCHANGE [$$65]  |PARTITIONED|
+                                                        -- UNNEST  |UNPARTITIONED|
+                                                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                      -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                            -- HASH_PARTITION_EXCHANGE [$$70]  |PARTITIONED|
+                                              -- ASSIGN  |PARTITIONED|
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- DATASOURCE_SCAN  |PARTITIONED|
                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- DATASOURCE_SCAN  |PARTITIONED|
-                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists.plan
index fcb9d0d..a379cb8 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/exists.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$31(ASC) ]  |PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$105]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$104]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$105(ASC)] HASH:[$$105]  |PARTITIONED|
+            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$104(ASC)] HASH:[$$104]  |PARTITIONED|
               -- SORT_GROUP_BY[$$81]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
@@ -21,12 +21,12 @@
                         -- STREAM_SELECT  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- PRE_CLUSTERED_GROUP_BY[$$102]  |PARTITIONED|
+                              -- PRE_CLUSTERED_GROUP_BY[$$101]  |PARTITIONED|
                                       {
                                         -- AGGREGATE  |LOCAL|
                                           -- NESTED_TUPLE_SOURCE  |LOCAL|
                                       }
-                                -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$102(ASC)] HASH:[$$102]  |PARTITIONED|
+                                -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$101(ASC)] HASH:[$$101]  |PARTITIONED|
                                   -- PRE_CLUSTERED_GROUP_BY[$$95]  |PARTITIONED|
                                           {
                                             -- AGGREGATE  |LOCAL|
@@ -46,13 +46,12 @@
                                                         -- NESTED_LOOP  |PARTITIONED|
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                             -- ASSIGN  |PARTITIONED|
-                                                              -- ASSIGN  |PARTITIONED|
-                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                  -- ASSIGN  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ASSIGN  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                           -- BROADCAST_EXCHANGE  |PARTITIONED|
                                                             -- STREAM_PROJECT  |UNPARTITIONED|
                                                               -- ASSIGN  |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists.plan
index 2ef9a24..bc68200 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/subquery/not_exists.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$31(ASC) ]  |PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$106]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$105]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$106(ASC)] HASH:[$$106]  |PARTITIONED|
+            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$105(ASC)] HASH:[$$105]  |PARTITIONED|
               -- SORT_GROUP_BY[$$82]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
@@ -21,12 +21,12 @@
                         -- STREAM_SELECT  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- PRE_CLUSTERED_GROUP_BY[$$103]  |PARTITIONED|
+                              -- PRE_CLUSTERED_GROUP_BY[$$102]  |PARTITIONED|
                                       {
                                         -- AGGREGATE  |LOCAL|
                                           -- NESTED_TUPLE_SOURCE  |LOCAL|
                                       }
-                                -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$103(ASC)] HASH:[$$103]  |PARTITIONED|
+                                -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$102(ASC)] HASH:[$$102]  |PARTITIONED|
                                   -- PRE_CLUSTERED_GROUP_BY[$$96]  |PARTITIONED|
                                           {
                                             -- AGGREGATE  |LOCAL|
@@ -46,13 +46,12 @@
                                                         -- NESTED_LOOP  |PARTITIONED|
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                             -- ASSIGN  |PARTITIONED|
-                                                              -- ASSIGN  |PARTITIONED|
-                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                  -- ASSIGN  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ASSIGN  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                           -- BROADCAST_EXCHANGE  |PARTITIONED|
                                                             -- STREAM_PROJECT  |UNPARTITIONED|
                                                               -- ASSIGN  |UNPARTITIONED|
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
index c670b6b..d17e021 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
@@ -20,13 +20,14 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -53,6 +54,8 @@
     private final Set<LogicalVariable> producedVars = new HashSet<>();
     private final List<LogicalVariable> projectVars = new ArrayList<>();
     protected boolean hasRun = false;
+    // Keep track of used variables after the current operator, including used variables in itself.
+    private final Map<AbstractLogicalOperator, HashSet<LogicalVariable>> allUsedVarsAfterOpMap = new HashMap<>();
 
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
@@ -60,14 +63,48 @@
     }
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         if (hasRun) {
             return false;
         }
         hasRun = true;
+
+        // Collect all used variables after each operator, including the used variables in itself in the plan.
+        // This is necessary since introduceProjects() may generate a wrong project if it doesn't have the information
+        // for all paths in the plan in case there are two or more branches since it can only deal one path at a time.
+        // So, a variable used in one path might be removed while the method traverses another path.
+        Set<LogicalVariable> parentUsedVars = new HashSet<>();
+        collectUsedVars(opRef, parentUsedVars);
+
+        // Introduce projects
         return introduceProjects(null, -1, opRef, Collections.<LogicalVariable> emptySet(), context);
     }
 
+    /**
+     * Collect all used variables after each operator, including the used variables in itself in the plan.
+     * Collecting information in a separate method is required since there can be multiple paths in the plan
+     * and introduceProjects() method can deal with only one path at a time during conducting depth-first-search.
+     */
+    protected void collectUsedVars(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> parentUsedVars)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        HashSet<LogicalVariable> usedVarsPerOp = new HashSet<>();
+        VariableUtilities.getUsedVariables(op, usedVarsPerOp);
+        usedVarsPerOp.addAll(parentUsedVars);
+
+        if (allUsedVarsAfterOpMap.get(op) == null) {
+            allUsedVarsAfterOpMap.put(op, usedVarsPerOp);
+        } else {
+            allUsedVarsAfterOpMap.get(op).addAll(usedVarsPerOp);
+        }
+
+        for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+            collectUsedVars(inputOpRef, usedVarsPerOp);
+        }
+
+    }
+
     protected boolean introduceProjects(AbstractLogicalOperator parentOp, int parentInputIndex,
             Mutable<ILogicalOperator> opRef, Set<LogicalVariable> parentUsedVars, IOptimizationContext context)
             throws AlgebricksException {
@@ -78,10 +115,16 @@
         VariableUtilities.getUsedVariables(op, usedVars);
 
         // In the top-down pass, maintain a set of variables that are used in op and all its parents.
+        // This is a necessary step for the newly created project operator during this optimization,
+        // since we already have all information from collectUsedVars() method for the other operators.
         HashSet<LogicalVariable> parentsUsedVars = new HashSet<>();
         parentsUsedVars.addAll(parentUsedVars);
         parentsUsedVars.addAll(usedVars);
 
+        if (allUsedVarsAfterOpMap.get(op) != null) {
+            parentsUsedVars.addAll(allUsedVarsAfterOpMap.get(op));
+        }
+
         // Descend into children.
         for (int i = 0; i < op.getInputs().size(); i++) {
             Mutable<ILogicalOperator> inputOpRef = op.getInputs().get(i);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVars.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVars.java
deleted file mode 100644
index ebdc88a..0000000
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVars.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.rewriter.rules;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.commons.lang3.mutable.Mutable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-/**
- * Removes duplicate variables from a group-by operator's decor list.
- */
-public class RemoveRedundantGroupByDecorVars implements IAlgebraicRewriteRule {
-
-    private final Set<LogicalVariable> vars = new HashSet<LogicalVariable>();
-
-    @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
-        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
-            return false;
-        }
-        if (context.checkIfInDontApplySet(this, op)) {
-            return false;
-        }
-        vars.clear();
-
-        boolean modified = false;
-        GroupByOperator groupOp = (GroupByOperator) op;
-        Iterator<Pair<LogicalVariable, Mutable<ILogicalExpression>>> iter = groupOp.getDecorList().iterator();
-        while (iter.hasNext()) {
-            Pair<LogicalVariable, Mutable<ILogicalExpression>> decor = iter.next();
-            if (decor.first != null || decor.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
-                continue;
-            }
-            VariableReferenceExpression varRefExpr = (VariableReferenceExpression) decor.second.getValue();
-            LogicalVariable var = varRefExpr.getVariableReference();
-            if (vars.contains(var)) {
-                iter.remove();
-                modified = true;
-            } else {
-                vars.add(var);
-            }
-        }
-        if (modified) {
-            context.addToDontApplySet(this, op);
-        }
-        return modified;
-    }
-
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
-        return false;
-    }
-}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVarsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVarsRule.java
new file mode 100644
index 0000000..b9ad011
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVarsRule.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Removes duplicate and/or unnecessary variables from a group-by operator's decor list.
+ */
+public class RemoveRedundantGroupByDecorVarsRule implements IAlgebraicRewriteRule {
+
+    private Set<LogicalVariable> usedVars = new HashSet<>();
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        // Begin from the root operator to collect used variables after a possible group-by operator.
+        if (op.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT
+                && op.getOperatorTag() != LogicalOperatorTag.SINK) {
+            return false;
+        }
+        if (context.checkIfInDontApplySet(this, op)) {
+            return false;
+        }
+        usedVars.clear();
+        boolean planTransformed = checkAndApplyTheRule(opRef, context);
+
+        return planTransformed;
+    }
+
+    /**
+     * Collect used variables in each operator in the plan until the optimizer sees a GroupBy operator.
+     * It first removes duplicated variables in the decor list.
+     * Then, it eliminates useless variables in the decor list that are not going to be used
+     * after the given groupBy operator.
+     */
+    protected boolean checkAndApplyTheRule(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        Set<LogicalVariable> usedVarsFromThisOp = new HashSet<>();
+        Set<LogicalVariable> collectedUsedVarsBeforeThisOpFromRoot = new HashSet<>();
+        boolean redundantVarsRemoved = false;
+        boolean uselessVarsRemoved = false;
+
+        // Found Group-By operator?
+        if (op.getOperatorTag() == LogicalOperatorTag.GROUP) {
+            GroupByOperator groupByOp = (GroupByOperator) op;
+            Set<LogicalVariable> decorVars = new HashSet<>();
+
+            // First, get rid of duplicated variables from a group-by operator's decor list.
+            Iterator<Pair<LogicalVariable, Mutable<ILogicalExpression>>> iter = groupByOp.getDecorList().iterator();
+            while (iter.hasNext()) {
+                Pair<LogicalVariable, Mutable<ILogicalExpression>> decor = iter.next();
+                if (decor.first != null
+                        || decor.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                    continue;
+                }
+                VariableReferenceExpression varRefExpr = (VariableReferenceExpression) decor.second.getValue();
+                LogicalVariable var = varRefExpr.getVariableReference();
+                if (decorVars.contains(var)) {
+                    iter.remove();
+                    redundantVarsRemoved = true;
+                } else {
+                    decorVars.add(var);
+                }
+            }
+
+            // Next, get rid of useless decor variables in the GROUP-BY operator.
+            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> newDecorList = new ArrayList<>();
+            for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : groupByOp.getDecorList()) {
+                LogicalVariable decorVar = GroupByOperator.getDecorVariable(p);
+                // If a variable in the decor list will not be used after this operator, then it needs to be removed.
+                if (!usedVars.contains(decorVar)) {
+                    uselessVarsRemoved = true;
+                } else {
+                    // Maintain the variable since it will be used.
+                    newDecorList.add(p);
+                }
+            }
+
+            // If we have identified useless decor variables,
+            // then the decor list needs to be reset without those variables.
+            if (uselessVarsRemoved) {
+                groupByOp.getDecorList().clear();
+                groupByOp.getDecorList().addAll(newDecorList);
+            }
+
+            // If the plan transformation is successful, we don't need to traverse the plan any more,
+            // since if there are more GROUP-BY operators, the next trigger on this plan will find them.
+            if (redundantVarsRemoved || uselessVarsRemoved) {
+                context.computeAndSetTypeEnvironmentForOperator(groupByOp);
+                context.addToDontApplySet(this, op);
+                return redundantVarsRemoved || uselessVarsRemoved;
+            }
+        }
+
+        // Either we have found a GroupBy operator but no removal is happened or
+        // there we haven't found a GroupBy operator yet. Thus, we add used variables for this operator
+        // and keep traversing the plan.
+        VariableUtilities.getUsedVariables(op, usedVarsFromThisOp);
+        collectedUsedVarsBeforeThisOpFromRoot.addAll(usedVars);
+        usedVars.addAll(usedVarsFromThisOp);
+
+        // Recursively check the plan and try to optimize it.
+        for (int i = 0; i < op.getInputs().size(); i++) {
+            boolean groupByChanged = checkAndApplyTheRule(op.getInputs().get(i), context);
+            if (groupByChanged) {
+                return true;
+            }
+        }
+
+        // This rule can't be applied to this operator or its descendants.
+        // Thus, remove the effects of this operator so that the depth-first-search can return to the parent.
+        usedVars.clear();
+        usedVars.addAll(collectedUsedVarsBeforeThisOpFromRoot);
+
+        return false;
+    }
+
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
index 90ab975..55831f0 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
@@ -21,8 +21,9 @@
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -52,47 +53,120 @@
  */
 public class RemoveUnusedAssignAndAggregateRule implements IAlgebraicRewriteRule {
 
+    // Keep the variables that are produced by ASSIGN, UNNEST, AGGREGATE, UNION,
+    // and GROUP operators.
+    private Map<Mutable<ILogicalOperator>, Set<LogicalVariable>> assignedVarMap = new LinkedHashMap<>();
+    private Set<LogicalVariable> assignedVarSet = new HashSet<>();
+
+    // Keep the variables that are used after ASSIGN, UNNEST, AGGREGATE, UNION,
+    // and GROUP operators.
+    private Map<Mutable<ILogicalOperator>, Set<LogicalVariable>> accumulatedUsedVarFromRootMap = new LinkedHashMap<>();
+
+    private boolean isTransformed = false;
+
+    // Keep the variable-mapping of a UNION operator.
+    // This is required to keep the variables of the left or right branch of the UNION operator
+    // if the output variable of the UNION operator is survived.
+    private Set<LogicalVariable> survivedUnionSourceVarSet = new HashSet<>();
+
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
         return false;
     }
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         if (context.checkIfInDontApplySet(this, opRef.getValue())) {
             return false;
         }
-        Set<LogicalVariable> toRemove = new HashSet<LogicalVariable>();
-        collectUnusedAssignedVars((AbstractLogicalOperator) opRef.getValue(), toRemove, true, context);
-        boolean smthToRemove = !toRemove.isEmpty();
-        if (smthToRemove) {
-            removeUnusedAssigns(opRef, toRemove, context);
+
+        clear();
+        Set<LogicalVariable> accumulatedUsedVarFromRootSet = new HashSet<>();
+        collectUnusedAssignedVars(opRef, accumulatedUsedVarFromRootSet, true, context);
+
+        // If there are ASSIGN, UNNEST, AGGREGATE, UNION, and GROUP operators in the plan,
+        // we try to remove these operators if the produced variables from these
+        // operators are not used.
+        if (!assignedVarMap.isEmpty()) {
+            removeUnusedAssigns(opRef, context);
         }
-        return !toRemove.isEmpty();
+
+        return isTransformed;
     }
 
-    private void removeUnusedAssigns(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> toRemove,
-            IOptimizationContext context) throws AlgebricksException {
+    /**
+     * Collect the information from the given operator and removes assigned
+     * variables if they are used afterwards.
+     */
+    private Set<LogicalVariable> removeAssignVarFromConsideration(Mutable<ILogicalOperator> opRef) {
+        Set<LogicalVariable> assignVarsSetForThisOp = null;
+        Set<LogicalVariable> usedVarsSetForThisOp = null;
+
+        if (accumulatedUsedVarFromRootMap.containsKey(opRef)) {
+            usedVarsSetForThisOp = accumulatedUsedVarFromRootMap.get(opRef);
+        }
+
+        if (assignedVarMap.containsKey(opRef)) {
+            assignVarsSetForThisOp = assignedVarMap.get(opRef);
+        }
+
+        if (assignVarsSetForThisOp != null && !assignVarsSetForThisOp.isEmpty()) {
+            Iterator<LogicalVariable> varIter = assignVarsSetForThisOp.iterator();
+            while (varIter.hasNext()) {
+                LogicalVariable v = varIter.next();
+                if ((usedVarsSetForThisOp != null && usedVarsSetForThisOp.contains(v))
+                        || survivedUnionSourceVarSet.contains(v)) {
+                    varIter.remove();
+                }
+            }
+        }
+
+        // The source variables of the UNIONALL operator should be survived
+        // since we are sure that the output of UNIONALL operator is used
+        // afterwards.
+        if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.UNIONALL) {
+            Iterator<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> iter = ((UnionAllOperator) opRef
+                    .getValue()).getVariableMappings().iterator();
+            while (iter.hasNext()) {
+                Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping = iter.next();
+                survivedUnionSourceVarSet.add(varMapping.first);
+                survivedUnionSourceVarSet.add(varMapping.second);
+            }
+        }
+
+        return assignVarsSetForThisOp;
+    }
+
+    private void removeUnusedAssigns(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        while (removeFromAssigns(op, toRemove, context) == 0) {
+
+        Set<LogicalVariable> assignVarsSetForThisOp = removeAssignVarFromConsideration(opRef);
+
+        while (removeFromAssigns(op, assignVarsSetForThisOp, context) == 0) {
             if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
                 break;
             }
             op = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
             opRef.setValue(op);
+            assignVarsSetForThisOp = removeAssignVarFromConsideration(opRef);
         }
+
         Iterator<Mutable<ILogicalOperator>> childIter = op.getInputs().iterator();
         while (childIter.hasNext()) {
             Mutable<ILogicalOperator> cRef = childIter.next();
-            removeUnusedAssigns(cRef, toRemove, context);
+            removeUnusedAssigns(cRef, context);
         }
+
         if (op.hasNestedPlans()) {
             AbstractOperatorWithNestedPlans opWithNest = (AbstractOperatorWithNestedPlans) op;
             Iterator<ILogicalPlan> planIter = opWithNest.getNestedPlans().iterator();
             while (planIter.hasNext()) {
                 ILogicalPlan p = planIter.next();
                 for (Mutable<ILogicalOperator> r : p.getRoots()) {
-                    removeUnusedAssigns(r, toRemove, context);
+                    removeUnusedAssigns(r, context);
                 }
             }
 
@@ -125,31 +199,37 @@
                 AssignOperator assign = (AssignOperator) op;
                 if (removeUnusedVarsAndExprs(toRemove, assign.getVariables(), assign.getExpressions())) {
                     context.computeAndSetTypeEnvironmentForOperator(assign);
+                    isTransformed = true;
                 }
                 return assign.getVariables().size();
             case AGGREGATE:
                 AggregateOperator agg = (AggregateOperator) op;
                 if (removeUnusedVarsAndExprs(toRemove, agg.getVariables(), agg.getExpressions())) {
                     context.computeAndSetTypeEnvironmentForOperator(agg);
+                    isTransformed = true;
                 }
                 return agg.getVariables().size();
             case UNNEST:
                 UnnestOperator uOp = (UnnestOperator) op;
                 LogicalVariable pVar = uOp.getPositionalVariable();
-                if (pVar != null && toRemove.contains(pVar)) {
+                if (pVar != null && toRemove != null && toRemove.contains(pVar)) {
                     uOp.setPositionalVariable(null);
+                    assignedVarSet.remove(pVar);
+                    isTransformed = true;
                 }
                 break;
             case UNIONALL:
                 UnionAllOperator unionOp = (UnionAllOperator) op;
                 if (removeUnusedVarsFromUnionAll(unionOp, toRemove)) {
                     context.computeAndSetTypeEnvironmentForOperator(unionOp);
+                    isTransformed = true;
                 }
                 return unionOp.getVariableMappings().size();
             case GROUP:
                 GroupByOperator groupByOp = (GroupByOperator) op;
                 if (removeUnusedVarsFromGroupBy(groupByOp, toRemove)) {
                     context.computeAndSetTypeEnvironmentForOperator(groupByOp);
+                    isTransformed = true;
                 }
                 return groupByOp.getGroupByList().size() + groupByOp.getNestedPlans().size()
                         + groupByOp.getDecorList().size();
@@ -163,22 +243,28 @@
         Iterator<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> iter = unionOp.getVariableMappings()
                 .iterator();
         boolean modified = false;
-        Set<LogicalVariable> removeFromRemoveSet = new HashSet<LogicalVariable>();
-        while (iter.hasNext()) {
-            Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping = iter.next();
-            if (toRemove.contains(varMapping.third)) {
-                iter.remove();
-                modified = true;
+        if (toRemove != null && !toRemove.isEmpty()) {
+            while (iter.hasNext()) {
+                Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping = iter.next();
+                if (toRemove.contains(varMapping.third)) {
+                    iter.remove();
+                    assignedVarSet.remove(varMapping.third);
+                    modified = true;
+                } else {
+                    // In case when the output variable of Union is survived,
+                    // the source variables should not be removed.
+                    survivedUnionSourceVarSet.add(varMapping.first);
+                    survivedUnionSourceVarSet.add(varMapping.second);
+                }
             }
-            // In any case, make sure we do not removing these variables.
-            removeFromRemoveSet.add(varMapping.first);
-            removeFromRemoveSet.add(varMapping.second);
         }
-        toRemove.removeAll(removeFromRemoveSet);
         return modified;
     }
 
     private boolean removeUnusedVarsFromGroupBy(GroupByOperator groupByOp, Set<LogicalVariable> toRemove) {
+        if (toRemove == null || toRemove.isEmpty()) {
+            return false;
+        }
         Iterator<Pair<LogicalVariable, Mutable<ILogicalExpression>>> iter = groupByOp.getDecorList().iterator();
         boolean modified = false;
         while (iter.hasNext()) {
@@ -204,88 +290,127 @@
     private boolean removeUnusedVarsAndExprs(Set<LogicalVariable> toRemove, List<LogicalVariable> varList,
             List<Mutable<ILogicalExpression>> exprList) {
         boolean changed = false;
-        Iterator<LogicalVariable> varIter = varList.iterator();
-        Iterator<Mutable<ILogicalExpression>> exprIter = exprList.iterator();
-        while (varIter.hasNext()) {
-            LogicalVariable v = varIter.next();
-            exprIter.next();
-            if (toRemove.contains(v)) {
-                varIter.remove();
-                exprIter.remove();
-                changed = true;
+        if (toRemove != null && !toRemove.isEmpty()) {
+            Iterator<LogicalVariable> varIter = varList.iterator();
+            Iterator<Mutable<ILogicalExpression>> exprIter = exprList.iterator();
+            while (varIter.hasNext()) {
+                LogicalVariable v = varIter.next();
+                exprIter.next();
+                if (toRemove.contains(v)) {
+                    varIter.remove();
+                    exprIter.remove();
+                    assignedVarSet.remove(v);
+                    changed = true;
+                }
             }
         }
         return changed;
     }
 
-    private void collectUnusedAssignedVars(AbstractLogicalOperator op, Set<LogicalVariable> toRemove, boolean first,
-            IOptimizationContext context) throws AlgebricksException {
+    private void collectUnusedAssignedVars(Mutable<ILogicalOperator> opRef,
+            Set<LogicalVariable> accumulatedUsedVarFromRootSet, boolean first, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         if (!first) {
             context.addToDontApplySet(this, op);
         }
-        for (Mutable<ILogicalOperator> c : op.getInputs()) {
-            collectUnusedAssignedVars((AbstractLogicalOperator) c.getValue(), toRemove, false, context);
-        }
-        if (op.hasNestedPlans()) {
-            AbstractOperatorWithNestedPlans opWithNested = (AbstractOperatorWithNestedPlans) op;
-            for (ILogicalPlan plan : opWithNested.getNestedPlans()) {
-                for (Mutable<ILogicalOperator> r : plan.getRoots()) {
-                    collectUnusedAssignedVars((AbstractLogicalOperator) r.getValue(), toRemove, false, context);
-                }
-            }
-        }
-        boolean removeUsedVars = true;
-        Set<LogicalVariable> reBoundDecorVars = new HashSet<>();
+        Set<LogicalVariable> assignVarsSetInThisOp = new HashSet<>();
+        Set<LogicalVariable> usedVarsSetInThisOp = new HashSet<>();
+
+        // Add used variables in this operator to the accumulated used variables set?
+        boolean addUsedVarsInThisOp = true;
+        // ASSIGN, AGGREGATE, UNNEST, UNIONALL, or GROUP operator found?
+        boolean targetOpFound = false;
+
         switch (op.getOperatorTag()) {
             case ASSIGN:
                 AssignOperator assign = (AssignOperator) op;
-                toRemove.addAll(assign.getVariables());
+                assignVarsSetInThisOp.addAll(assign.getVariables());
+                targetOpFound = true;
                 break;
             case AGGREGATE:
                 AggregateOperator agg = (AggregateOperator) op;
-                toRemove.addAll(agg.getVariables());
+                assignVarsSetInThisOp.addAll(agg.getVariables());
+                targetOpFound = true;
                 break;
             case UNNEST:
                 UnnestOperator uOp = (UnnestOperator) op;
                 LogicalVariable pVar = uOp.getPositionalVariable();
                 if (pVar != null) {
-                    toRemove.add(pVar);
+                    assignVarsSetInThisOp.add(pVar);
+                    targetOpFound = true;
                 }
                 break;
             case UNIONALL:
                 UnionAllOperator unionOp = (UnionAllOperator) op;
                 for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping : unionOp
                         .getVariableMappings()) {
-                    toRemove.add(varMapping.third);
+                    assignVarsSetInThisOp.add(varMapping.third);
                 }
-                removeUsedVars = false;
+                targetOpFound = true;
+                // Don't add used variables in UNIONALL.
+                addUsedVarsInThisOp = false;
                 break;
             case GROUP:
                 GroupByOperator groupByOp = (GroupByOperator) op;
                 for (Pair<LogicalVariable, Mutable<ILogicalExpression>> decorMapping : groupByOp.getDecorList()) {
                     LogicalVariable decorVar = decorMapping.first;
                     if (decorVar != null) {
-                        toRemove.add(decorVar);
+                        assignVarsSetInThisOp.add(decorVar);
+                        targetOpFound = true;
                     } else {
                         // A decor var mapping can have a variable reference expression without a new variable
                         // definition, which is for rebinding the referred variable.
                         VariableReferenceExpression varExpr = (VariableReferenceExpression) decorMapping.second
                                 .getValue();
                         LogicalVariable reboundDecorVar = varExpr.getVariableReference();
-                        toRemove.add(reboundDecorVar);
-                        reBoundDecorVars.add(reboundDecorVar);
+                        assignVarsSetInThisOp.add(reboundDecorVar);
                     }
                 }
                 break;
             default:
                 break;
         }
-        if (removeUsedVars) {
-            List<LogicalVariable> used = new LinkedList<LogicalVariable>();
-            VariableUtilities.getUsedVariables(op, used);
-            toRemove.removeAll(used);
-            toRemove.addAll(reBoundDecorVars);
+
+        if (targetOpFound) {
+            assignedVarMap.put(opRef, assignVarsSetInThisOp);
+            assignedVarSet.addAll(assignVarsSetInThisOp);
+        }
+
+        if (addUsedVarsInThisOp) {
+            VariableUtilities.getUsedVariables(op, usedVarsSetInThisOp);
+            accumulatedUsedVarFromRootSet.addAll(usedVarsSetInThisOp);
+            // We may have visited this operator before if there are multiple
+            // paths in the plan.
+            if (accumulatedUsedVarFromRootMap.containsKey(opRef)) {
+                accumulatedUsedVarFromRootMap.get(opRef).addAll(usedVarsSetInThisOp);
+            } else {
+                accumulatedUsedVarFromRootMap.put(opRef, new HashSet<LogicalVariable>(accumulatedUsedVarFromRootSet));
+            }
+        } else {
+            accumulatedUsedVarFromRootMap.put(opRef, new HashSet<LogicalVariable>(accumulatedUsedVarFromRootSet));
+        }
+
+        for (Mutable<ILogicalOperator> c : op.getInputs()) {
+            collectUnusedAssignedVars(c, new HashSet<LogicalVariable>(accumulatedUsedVarFromRootSet), false, context);
+        }
+
+        if (op.hasNestedPlans()) {
+            AbstractOperatorWithNestedPlans opWithNested = (AbstractOperatorWithNestedPlans) op;
+            for (ILogicalPlan plan : opWithNested.getNestedPlans()) {
+                for (Mutable<ILogicalOperator> r : plan.getRoots()) {
+                    collectUnusedAssignedVars(r, new HashSet<LogicalVariable>(accumulatedUsedVarFromRootSet), false,
+                            context);
+                }
+            }
         }
     }
 
+    private void clear() {
+        assignedVarMap.clear();
+        assignedVarSet.clear();
+        accumulatedUsedVarFromRootMap.clear();
+        survivedUnionSourceVarSet.clear();
+        isTransformed = false;
+    }
 }