Fix for ASTERIXDB-1018, ASTERIXDB-1017, ASTERIXDB-1019,
ASTERIXDB-1020, ASTERIXDB-1029, ASTERIXDB-1030, ASTERIXDB-1034:

1. Let the keys of introduced nested group-bys (group-bys in a subplan) be only the
variables that are produced in the subplan;

2. In PushSelectIntoJoinRule, push independent operators (e.g., a current-datetime() call)
into the first branch from which the join condition refers some variables.

3. In SimpleUnnestToJoinRule, move the boundary between the two join branches of a added join
which results from pipelined datascans to be below operators that doesn't use any variables
(e.g., a current-datetime() call), therefore potentially, the upper (left) join branch
can be rewritten to index lookups.

Change-Id: I18cfa3875d676f71b26e91433ff101a7e725c890
Reviewed-on: https://asterix-gerrit.ics.uci.edu/488
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/AlgebricksBuiltinFunctions.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/AlgebricksBuiltinFunctions.java
index 70342b6..874b751 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/AlgebricksBuiltinFunctions.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/AlgebricksBuiltinFunctions.java
@@ -54,6 +54,7 @@
     public final static FunctionIdentifier IS_NULL = new FunctionIdentifier(ALGEBRICKS_NS, "is-null", 1);
 
     private static final Map<FunctionIdentifier, ComparisonKind> comparisonFunctions = new HashMap<FunctionIdentifier, ComparisonKind>();
+
     static {
         comparisonFunctions.put(AlgebricksBuiltinFunctions.EQ, ComparisonKind.EQ);
         comparisonFunctions.put(AlgebricksBuiltinFunctions.LE, ComparisonKind.LE);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index cfb64d4..5e9eef6 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -69,7 +69,6 @@
 
     public AbstractLogicalOperator() {
         inputs = new ArrayList<Mutable<ILogicalOperator>>();
-        // outputs = new ArrayList<LogicalOperatorReference>();
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
index e88b0da..7d3306e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
@@ -23,7 +23,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ExternalDataLookupOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ExternalDataLookupOperator.java
index fde5fe7..60aec27 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ExternalDataLookupOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ExternalDataLookupOperator.java
@@ -21,14 +21,13 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
@@ -40,7 +39,7 @@
 
     private final List<Object> variableTypes;
     protected final Mutable<ILogicalExpression> expression;
-    private final boolean propagateInput;
+    private boolean propagateInput;
 
     public ExternalDataLookupOperator(List<LogicalVariable> variables, Mutable<ILogicalExpression> expression,
             List<Object> variableTypes, boolean propagateInput, IDataSource<?> dataSource) {
@@ -85,6 +84,10 @@
         return propagateInput;
     }
 
+    public void setPropagateInput(boolean propagateInput) {
+        this.propagateInput = propagateInput;
+    }
+
     @Override
     public VariablePropagationPolicy getVariablePropagationPolicy() {
         return new VariablePropagationPolicy() {
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ScriptOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ScriptOperator.java
index ee2e7da..b04b28c 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ScriptOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ScriptOperator.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.logical;
 
-import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -34,22 +34,22 @@
 
 public class ScriptOperator extends AbstractLogicalOperator {
 
-    private ArrayList<LogicalVariable> inputVariables;
-    private ArrayList<LogicalVariable> outputVariables;
+    private List<LogicalVariable> inputVariables;
+    private List<LogicalVariable> outputVariables;
     private IScriptDescription scriptDesc;
 
-    public ScriptOperator(IScriptDescription scriptDesc, ArrayList<LogicalVariable> inputVariables,
-            ArrayList<LogicalVariable> outputVariables) {
+    public ScriptOperator(IScriptDescription scriptDesc, List<LogicalVariable> inputVariables,
+            List<LogicalVariable> outputVariables) {
         this.inputVariables = inputVariables;
         this.outputVariables = outputVariables;
         this.scriptDesc = scriptDesc;
     }
 
-    public ArrayList<LogicalVariable> getInputVariables() {
+    public List<LogicalVariable> getInputVariables() {
         return inputVariables;
     }
 
-    public ArrayList<LogicalVariable> getOutputVariables() {
+    public List<LogicalVariable> getOutputVariables() {
         return outputVariables;
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
index 881f6ae..815c36c 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
@@ -21,7 +21,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -105,6 +104,10 @@
         return propagateInput;
     }
 
+    public void setPropagatesInput(boolean propagateInput) {
+        this.propagateInput = propagateInput;
+    }
+
     public List<LogicalVariable> getMinFilterVars() {
         return minFilterVars;
     }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitor.java
new file mode 100644
index 0000000..d8089bd
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitor.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
+
+/**
+ * This visitor is to add back variables that are killed in the query plan rooted at an input operator.
+ * After visiting, it also provides a variable map for variables that have been
+ * mapped in the query plan, e.g., by group-by, assign, and union.
+ */
+class EnforceVariablesVisitor implements IQueryOperatorVisitor<ILogicalOperator, Collection<LogicalVariable>> {
+    private final IOptimizationContext context;
+    private final Map<LogicalVariable, LogicalVariable> inputVarToOutputVarMap = new HashMap<>();
+
+    public EnforceVariablesVisitor(IOptimizationContext context) {
+        this.context = context;
+    }
+
+    public Map<LogicalVariable, LogicalVariable> getInputVariableToOutputVariableMap() {
+        return inputVarToOutputVarMap;
+    }
+
+    @Override
+    public ILogicalOperator visitAggregateOperator(AggregateOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        return rewriteAggregateOperator(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitRunningAggregateOperator(RunningAggregateOperator op,
+            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
+        return rewriteAggregateOperator(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op,
+            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitGroupByOperator(GroupByOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        Set<LogicalVariable> liveVars = new HashSet<>();
+        VariableUtilities.getLiveVariables(op, liveVars);
+        varsToRecover.removeAll(liveVars);
+
+        // Maps group by key variables if the corresponding expressions are VariableReferenceExpressions.
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> keyVarExprRef : op.getGroupByList()) {
+            ILogicalExpression expr = keyVarExprRef.second.getValue();
+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                VariableReferenceExpression varExpr = (VariableReferenceExpression) expr;
+                LogicalVariable sourceVar = varExpr.getVariableReference();
+                updateVarMapping(sourceVar, keyVarExprRef.first);
+                varsToRecover.remove(sourceVar);
+            }
+        }
+
+        for (LogicalVariable varToRecover : varsToRecover) {
+            // This limits the visitor can only be applied to a nested logical plan inside a Subplan operator,
+            // where the varsToRecover forms a candidate key which can uniquely identify a tuple out of the nested-tuple-source.
+            op.getDecorList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(null,
+                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(varToRecover))));
+        }
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitLimitOperator(LimitOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op,
+            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op,
+            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitOrderOperator(OrderOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitAssignOperator(AssignOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        List<Mutable<ILogicalExpression>> assignedExprRefs = op.getExpressions();
+        List<LogicalVariable> assignedVars = op.getVariables();
+
+        // Maps assigning variables if assignment expressions are VariableReferenceExpressions.
+        for (int index = 0; index < assignedVars.size(); ++index) {
+            ILogicalExpression expr = assignedExprRefs.get(index).getValue();
+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                VariableReferenceExpression varExpr = (VariableReferenceExpression) expr;
+                LogicalVariable sourceVar = varExpr.getVariableReference();
+                updateVarMapping(sourceVar, assignedVars.get(index));
+                varsToRecover.remove(sourceVar);
+            }
+        }
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitSelectOperator(SelectOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitProjectOperator(ProjectOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        varsToRecover.removeAll(op.getVariables());
+        // Adds all missing variables that should propagates up.
+        op.getVariables().addAll(varsToRecover);
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op,
+            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitScriptOperator(ScriptOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        throw new UnsupportedOperationException("Script operators in a subplan are not supported!");
+    }
+
+    @Override
+    public ILogicalOperator visitSubplanOperator(SubplanOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitUnionOperator(UnionAllOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        // Update the variable mappings
+        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varTriples = op.getVariableMappings();
+        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple : varTriples) {
+            updateVarMapping(triple.second, triple.first);
+            updateVarMapping(triple.third, triple.first);
+        }
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitUnnestOperator(UnnestOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitOuterUnnestOperator(OuterUnnestOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        Set<LogicalVariable> liveVars = new HashSet<>();
+        VariableUtilities.getLiveVariables(op, liveVars);
+        varsToRecover.remove(liveVars);
+        if (!varsToRecover.isEmpty()) {
+            op.setPropagatesInput(true);
+            return visitsInputs(op, varsToRecover);
+        }
+        return op;
+    }
+
+    @Override
+    public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitDistinctOperator(DistinctOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitExchangeOperator(ExchangeOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    @Override
+    public ILogicalOperator visitExternalDataLookupOperator(ExternalDataLookupOperator op,
+            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
+        Set<LogicalVariable> liveVars = new HashSet<>();
+        VariableUtilities.getLiveVariables(op, liveVars);
+        varsToRecover.retainAll(liveVars);
+        if (!varsToRecover.isEmpty()) {
+            op.setPropagateInput(true);
+            return visitsInputs(op, varsToRecover);
+        }
+        return op;
+    }
+
+    @Override
+    public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        return visitsInputs(op, varsToRecover);
+    }
+
+    /**
+     * Wraps an AggregateOperator or RunningAggregateOperator with a group-by operator where
+     * the group-by keys are variables in varsToRecover.
+     * Note that the function here prevents this visitor being used to rewrite arbitrary query plans.
+     * Instead, it could only be used for rewriting a nested plan within a subplan operator.
+     *
+     * @param op
+     *            the logical operator for aggregate or running aggregate.
+     * @param varsToRecover
+     *            the set of variables that needs to preserve.
+     * @return the wrapped group-by operator if {@code varsToRecover} is not empty, and {@code op} otherwise.
+     * @throws AlgebricksException
+     */
+    private ILogicalOperator rewriteAggregateOperator(ILogicalOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        Set<LogicalVariable> liveVars = new HashSet<>();
+        VariableUtilities.getLiveVariables(op, liveVars);
+        varsToRecover.removeAll(liveVars);
+
+        GroupByOperator gbyOp = new GroupByOperator();
+        for (LogicalVariable varToRecover : varsToRecover) {
+            // This limits the visitor can only be applied to a nested logical plan inside a Subplan operator,
+            // where the varsToRecover forms a candidate key which can uniquely identify a tuple out of the nested-tuple-source.
+            LogicalVariable newVar = context.newVar();
+            gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
+                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(varToRecover))));
+            updateVarMapping(varToRecover, newVar);
+        }
+
+        NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(gbyOp));
+        op.getInputs().clear();
+        op.getInputs().add(new MutableObject<ILogicalOperator>(nts));
+
+        ILogicalOperator inputOp = op.getInputs().get(0).getValue();
+        ILogicalPlan nestedPlan = new ALogicalPlanImpl();
+        nestedPlan.getRoots().add(new MutableObject<ILogicalOperator>(op));
+        gbyOp.getNestedPlans().add(nestedPlan);
+        gbyOp.getInputs().add(new MutableObject<ILogicalOperator>(inputOp));
+
+        OperatorManipulationUtil.computeTypeEnvironmentBottomUp(op, context);
+        return visitsInputs(gbyOp, varsToRecover);
+    }
+
+    private ILogicalOperator visitsInputs(ILogicalOperator op, Collection<LogicalVariable> varsToRecover)
+            throws AlgebricksException {
+        if (op.getInputs().size() == 0 || varsToRecover.isEmpty()) {
+            return op;
+        }
+        Set<LogicalVariable> producedVars = new HashSet<>();
+        VariableUtilities.getProducedVariables(op, producedVars);
+        varsToRecover.removeAll(producedVars);
+        if (!varsToRecover.isEmpty()) {
+            if (op.getInputs().size() == 1) {
+                // Deals with single input operators.
+                ILogicalOperator newOp = op.getInputs().get(0).getValue().accept(this, varsToRecover);
+                op.getInputs().get(0).setValue(newOp);
+            } else {
+                // Deals with multi-input operators.
+                for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+                    ILogicalOperator child = childRef.getValue();
+                    Set<LogicalVariable> varsToRecoverInChild = new HashSet<>();
+                    VariableUtilities.getProducedVariablesInDescendantsAndSelf(child, varsToRecoverInChild);
+                    // Obtains the variables that this particular child should propagate.
+                    varsToRecoverInChild.retainAll(varsToRecover);
+                    ILogicalOperator newChild = child.accept(this, varsToRecoverInChild);
+                    childRef.setValue(newChild);
+                }
+            }
+        }
+        return op;
+    }
+
+    private void updateVarMapping(LogicalVariable oldVar, LogicalVariable newVar) {
+        if (oldVar.equals(newVar)) {
+            return;
+        }
+        LogicalVariable mappedVar = newVar;
+        if (inputVarToOutputVarMap.containsKey(newVar)) {
+            mappedVar = inputVarToOutputVarMap.get(newVar);
+            inputVarToOutputVarMap.remove(newVar);
+        }
+        inputVarToOutputVarMap.put(oldVar, mappedVar);
+    }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
new file mode 100644
index 0000000..f0bcd34
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+
+public class LogicalExpressionDeepCopyWithNewVariablesVisitor
+        implements ILogicalExpressionVisitor<ILogicalExpression, Void> {
+    private final IOptimizationContext context;
+    private final Map<LogicalVariable, LogicalVariable> inVarMapping;
+    private final Map<LogicalVariable, LogicalVariable> outVarMapping;
+
+    public LogicalExpressionDeepCopyWithNewVariablesVisitor(IOptimizationContext context,
+            Map<LogicalVariable, LogicalVariable> inVarMapping, Map<LogicalVariable, LogicalVariable> variableMapping) {
+        this.context = context;
+        this.inVarMapping = inVarMapping;
+        this.outVarMapping = variableMapping;
+    }
+
+    public ILogicalExpression deepCopy(ILogicalExpression expr) throws AlgebricksException {
+        return expr.accept(this, null);
+    }
+
+    private void deepCopyAnnotations(AbstractFunctionCallExpression src, AbstractFunctionCallExpression dest) {
+        Map<Object, IExpressionAnnotation> srcAnnotations = src.getAnnotations();
+        Map<Object, IExpressionAnnotation> destAnnotations = dest.getAnnotations();
+        for (Object k : srcAnnotations.keySet()) {
+            IExpressionAnnotation annotation = srcAnnotations.get(k).copy();
+            destAnnotations.put(k, annotation);
+        }
+    }
+
+    private void deepCopyOpaqueParameters(AbstractFunctionCallExpression src, AbstractFunctionCallExpression dest) {
+        Object[] srcOpaqueParameters = src.getOpaqueParameters();
+        Object[] newOpaqueParameters = null;
+        if (srcOpaqueParameters != null) {
+            newOpaqueParameters = new Object[srcOpaqueParameters.length];
+            for (int i = 0; i < srcOpaqueParameters.length; i++) {
+                newOpaqueParameters[i] = srcOpaqueParameters[i];
+            }
+        }
+        dest.setOpaqueParameters(newOpaqueParameters);
+    }
+
+    public MutableObject<ILogicalExpression> deepCopyExpressionReference(Mutable<ILogicalExpression> exprRef)
+            throws AlgebricksException {
+        return new MutableObject<ILogicalExpression>(deepCopy(exprRef.getValue()));
+    }
+
+    // TODO return List<...>
+    public ArrayList<Mutable<ILogicalExpression>> deepCopyExpressionReferenceList(
+            List<Mutable<ILogicalExpression>> list) throws AlgebricksException {
+        ArrayList<Mutable<ILogicalExpression>> listCopy = new ArrayList<Mutable<ILogicalExpression>>(list.size());
+        for (Mutable<ILogicalExpression> exprRef : list) {
+            listCopy.add(deepCopyExpressionReference(exprRef));
+        }
+        return listCopy;
+    }
+
+    @Override
+    public ILogicalExpression visitAggregateFunctionCallExpression(AggregateFunctionCallExpression expr, Void arg)
+            throws AlgebricksException {
+        AggregateFunctionCallExpression exprCopy = new AggregateFunctionCallExpression(expr.getFunctionInfo(),
+                expr.isTwoStep(), deepCopyExpressionReferenceList(expr.getArguments()));
+        deepCopyAnnotations(expr, exprCopy);
+        deepCopyOpaqueParameters(expr, exprCopy);
+        return exprCopy;
+    }
+
+    @Override
+    public ILogicalExpression visitConstantExpression(ConstantExpression expr, Void arg) throws AlgebricksException {
+        return new ConstantExpression(expr.getValue());
+    }
+
+    @Override
+    public ILogicalExpression visitScalarFunctionCallExpression(ScalarFunctionCallExpression expr, Void arg)
+            throws AlgebricksException {
+        ScalarFunctionCallExpression exprCopy = new ScalarFunctionCallExpression(expr.getFunctionInfo(),
+                deepCopyExpressionReferenceList(expr.getArguments()));
+        deepCopyAnnotations(expr, exprCopy);
+        deepCopyOpaqueParameters(expr, exprCopy);
+        return exprCopy;
+
+    }
+
+    @Override
+    public ILogicalExpression visitStatefulFunctionCallExpression(StatefulFunctionCallExpression expr, Void arg)
+            throws AlgebricksException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ILogicalExpression visitUnnestingFunctionCallExpression(UnnestingFunctionCallExpression expr, Void arg)
+            throws AlgebricksException {
+        UnnestingFunctionCallExpression exprCopy = new UnnestingFunctionCallExpression(expr.getFunctionInfo(),
+                deepCopyExpressionReferenceList(expr.getArguments()));
+        deepCopyAnnotations(expr, exprCopy);
+        deepCopyOpaqueParameters(expr, exprCopy);
+        return exprCopy;
+    }
+
+    @Override
+    public ILogicalExpression visitVariableReferenceExpression(VariableReferenceExpression expr, Void arg)
+            throws AlgebricksException {
+        LogicalVariable var = expr.getVariableReference();
+        LogicalVariable givenVarReplacement = inVarMapping.get(var);
+        if (givenVarReplacement != null) {
+            outVarMapping.put(var, givenVarReplacement);
+            return new VariableReferenceExpression(givenVarReplacement);
+        }
+        LogicalVariable varCopy = outVarMapping.get(var);
+        if (varCopy == null) {
+            varCopy = context.newVar();
+            outVarMapping.put(var, varCopy);
+        }
+        return new VariableReferenceExpression(varCopy);
+    }
+}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
new file mode 100644
index 0000000..ec7d7fe
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
+
+/**
+ * This visitor deep-copies a query plan but uses a new set of variables.
+ * Method getInputToOutputVariableMapping() will return a map that maps
+ * input variables to their corresponding output variables.
+ */
+public class LogicalOperatorDeepCopyWithNewVariablesVisitor
+        implements IQueryOperatorVisitor<ILogicalOperator, ILogicalOperator> {
+    private final IOptimizationContext context;
+    private final LogicalExpressionDeepCopyWithNewVariablesVisitor exprDeepCopyVisitor;
+
+    // Key: Variable in the original plan. Value: New variable replacing the
+    // original one in the copied plan.
+    private final Map<LogicalVariable, LogicalVariable> inputVarToOutputVarMapping;
+
+    // Key: New variable in the new plan. Value: The old variable in the original plan.
+    private final Map<LogicalVariable, LogicalVariable> outputVarToInputVarMapping;
+
+    /**
+     * @param IOptimizationContext,
+     *            the optimization context
+     */
+    public LogicalOperatorDeepCopyWithNewVariablesVisitor(IOptimizationContext context) {
+        this.context = context;
+        this.inputVarToOutputVarMapping = new HashMap<>();
+        this.outputVarToInputVarMapping = new HashMap<>();
+        this.exprDeepCopyVisitor = new LogicalExpressionDeepCopyWithNewVariablesVisitor(context,
+                outputVarToInputVarMapping, inputVarToOutputVarMapping);
+    }
+
+    /**
+     * @param IOptimizationContext
+     *            the optimization context
+     * @param inVarMapping
+     *            Variable mapping keyed by variables in the original plan.
+     *            Those variables are replaced by their corresponding value in
+     *            the map in the copied plan.
+     */
+    public LogicalOperatorDeepCopyWithNewVariablesVisitor(IOptimizationContext context,
+            Map<LogicalVariable, LogicalVariable> inVarMapping) {
+        this.context = context;
+        this.inputVarToOutputVarMapping = inVarMapping;
+        this.outputVarToInputVarMapping = new HashMap<>();
+        exprDeepCopyVisitor = new LogicalExpressionDeepCopyWithNewVariablesVisitor(context, inVarMapping,
+                inputVarToOutputVarMapping);
+    }
+
+    private void copyAnnotations(ILogicalOperator src, ILogicalOperator dest) {
+        dest.getAnnotations().putAll(src.getAnnotations());
+    }
+
+    public ILogicalOperator deepCopy(ILogicalOperator op, ILogicalOperator arg) throws AlgebricksException {
+        ILogicalOperator opCopy = op.accept(this, arg);
+        OperatorManipulationUtil.computeTypeEnvironmentBottomUp(opCopy, context);
+        return opCopy;
+    }
+
+    private void deepCopyInputs(ILogicalOperator src, ILogicalOperator dest, ILogicalOperator arg)
+            throws AlgebricksException {
+        List<Mutable<ILogicalOperator>> inputs = src.getInputs();
+        List<Mutable<ILogicalOperator>> inputsCopy = dest.getInputs();
+        for (Mutable<ILogicalOperator> input : inputs) {
+            inputsCopy.add(deepCopyOperatorReference(input, arg));
+        }
+    }
+
+    private Mutable<ILogicalOperator> deepCopyOperatorReference(Mutable<ILogicalOperator> opRef, ILogicalOperator arg)
+            throws AlgebricksException {
+        return new MutableObject<ILogicalOperator>(deepCopy(opRef.getValue(), arg));
+    }
+
+    private List<Mutable<ILogicalOperator>> deepCopyOperatorReferenceList(List<Mutable<ILogicalOperator>> list,
+            ILogicalOperator arg) throws AlgebricksException {
+        List<Mutable<ILogicalOperator>> listCopy = new ArrayList<Mutable<ILogicalOperator>>(list.size());
+        for (Mutable<ILogicalOperator> opRef : list) {
+            listCopy.add(deepCopyOperatorReference(opRef, arg));
+        }
+        return listCopy;
+    }
+
+    private IOrder deepCopyOrder(IOrder order) {
+        switch (order.getKind()) {
+            case ASC:
+            case DESC:
+                return order;
+            case FUNCTIONCALL:
+            default:
+                throw new UnsupportedOperationException();
+        }
+    }
+
+    private List<Pair<IOrder, Mutable<ILogicalExpression>>> deepCopyOrderExpressionReferencePairList(
+            List<Pair<IOrder, Mutable<ILogicalExpression>>> list) throws AlgebricksException {
+        ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>> listCopy = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>(
+                list.size());
+        for (Pair<IOrder, Mutable<ILogicalExpression>> pair : list) {
+            listCopy.add(new Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>>(deepCopyOrder(pair.first),
+                    exprDeepCopyVisitor.deepCopyExpressionReference(pair.second)));
+        }
+        return listCopy;
+    }
+
+    private ILogicalPlan deepCopyPlan(ILogicalPlan plan, ILogicalOperator arg) throws AlgebricksException {
+        List<Mutable<ILogicalOperator>> rootsCopy = deepCopyOperatorReferenceList(plan.getRoots(), arg);
+        ILogicalPlan planCopy = new ALogicalPlanImpl(rootsCopy);
+        return planCopy;
+    }
+
+    private List<ILogicalPlan> deepCopyPlanList(List<ILogicalPlan> list, List<ILogicalPlan> listCopy,
+            ILogicalOperator arg) throws AlgebricksException {
+        for (ILogicalPlan plan : list) {
+            listCopy.add(deepCopyPlan(plan, arg));
+        }
+        return listCopy;
+    }
+
+    private LogicalVariable deepCopyVariable(LogicalVariable var) {
+        if (var == null) {
+            return null;
+        }
+        LogicalVariable givenVarReplacement = outputVarToInputVarMapping.get(var);
+        if (givenVarReplacement != null) {
+            inputVarToOutputVarMapping.put(var, givenVarReplacement);
+            return givenVarReplacement;
+        }
+        LogicalVariable varCopy = inputVarToOutputVarMapping.get(var);
+        if (varCopy == null) {
+            varCopy = context.newVar();
+            inputVarToOutputVarMapping.put(var, varCopy);
+        }
+        return varCopy;
+    }
+
+    private List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> deepCopyVariableExpressionReferencePairList(
+            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> list) throws AlgebricksException {
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> listCopy = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>(
+                list.size());
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> pair : list) {
+            listCopy.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(deepCopyVariable(pair.first),
+                    exprDeepCopyVisitor.deepCopyExpressionReference(pair.second)));
+        }
+        return listCopy;
+    }
+
+    private List<LogicalVariable> deepCopyVariableList(List<LogicalVariable> list) {
+        ArrayList<LogicalVariable> listCopy = new ArrayList<LogicalVariable>(list.size());
+        for (LogicalVariable var : list) {
+            listCopy.add(deepCopyVariable(var));
+        }
+        return listCopy;
+    }
+
+    private void deepCopyInputsAnnotationsAndExecutionMode(ILogicalOperator op, ILogicalOperator arg,
+            AbstractLogicalOperator opCopy) throws AlgebricksException {
+        deepCopyInputs(op, opCopy, arg);
+        copyAnnotations(op, opCopy);
+        opCopy.setExecutionMode(op.getExecutionMode());
+    }
+
+    public void reset() {
+        inputVarToOutputVarMapping.clear();
+        outputVarToInputVarMapping.clear();
+    }
+
+    public void updatePrimaryKeys(IOptimizationContext context) {
+        for (Map.Entry<LogicalVariable, LogicalVariable> entry : inputVarToOutputVarMapping.entrySet()) {
+            List<LogicalVariable> primaryKey = context.findPrimaryKey(entry.getKey());
+            if (primaryKey != null) {
+                List<LogicalVariable> head = new ArrayList<LogicalVariable>();
+                for (LogicalVariable variable : primaryKey) {
+                    head.add(inputVarToOutputVarMapping.get(variable));
+                }
+                List<LogicalVariable> tail = new ArrayList<LogicalVariable>(1);
+                tail.add(entry.getValue());
+                context.addPrimaryKey(new FunctionalDependency(head, tail));
+            }
+        }
+    }
+
+    public LogicalVariable varCopy(LogicalVariable var) throws AlgebricksException {
+        return inputVarToOutputVarMapping.get(var);
+    }
+
+    @Override
+    public ILogicalOperator visitAggregateOperator(AggregateOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        AggregateOperator opCopy = new AggregateOperator(deepCopyVariableList(op.getVariables()),
+                exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getExpressions()));
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitAssignOperator(AssignOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AssignOperator opCopy = new AssignOperator(deepCopyVariableList(op.getVariables()),
+                exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getExpressions()));
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        DataSourceScanOperator opCopy = new DataSourceScanOperator(deepCopyVariableList(op.getVariables()),
+                op.getDataSource());
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitDistinctOperator(DistinctOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        DistinctOperator opCopy = new DistinctOperator(
+                exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getExpressions()));
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, ILogicalOperator arg) {
+        EmptyTupleSourceOperator opCopy = new EmptyTupleSourceOperator();
+        opCopy.setExecutionMode(op.getExecutionMode());
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitExchangeOperator(ExchangeOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        ExchangeOperator opCopy = new ExchangeOperator();
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitGroupByOperator(GroupByOperator op, ILogicalOperator arg) throws AlgebricksException {
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByListCopy = deepCopyVariableExpressionReferencePairList(
+                op.getGroupByList());
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorListCopy = deepCopyVariableExpressionReferencePairList(
+                op.getDecorList());
+        List<ILogicalPlan> nestedPlansCopy = new ArrayList<ILogicalPlan>();
+
+        GroupByOperator opCopy = new GroupByOperator(groupByListCopy, decorListCopy, nestedPlansCopy);
+        deepCopyPlanList(op.getNestedPlans(), nestedPlansCopy, opCopy);
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        InnerJoinOperator opCopy = new InnerJoinOperator(
+                exprDeepCopyVisitor.deepCopyExpressionReference(op.getCondition()),
+                deepCopyOperatorReference(op.getInputs().get(0), null),
+                deepCopyOperatorReference(op.getInputs().get(1), null));
+        copyAnnotations(op, opCopy);
+        opCopy.setExecutionMode(op.getExecutionMode());
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        LeftOuterJoinOperator opCopy = new LeftOuterJoinOperator(
+                exprDeepCopyVisitor.deepCopyExpressionReference(op.getCondition()),
+                deepCopyOperatorReference(op.getInputs().get(0), null),
+                deepCopyOperatorReference(op.getInputs().get(1), null));
+        copyAnnotations(op, opCopy);
+        opCopy.setExecutionMode(op.getExecutionMode());
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitLimitOperator(LimitOperator op, ILogicalOperator arg) throws AlgebricksException {
+        LimitOperator opCopy = new LimitOperator(exprDeepCopyVisitor.deepCopy(op.getMaxObjects().getValue()),
+                exprDeepCopyVisitor.deepCopy(op.getOffset().getValue()), op.isTopmostLimitOp());
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        NestedTupleSourceOperator opCopy = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(arg));
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitOrderOperator(OrderOperator op, ILogicalOperator arg) throws AlgebricksException {
+        OrderOperator opCopy = new OrderOperator(deepCopyOrderExpressionReferencePairList(op.getOrderExpressions()));
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        PartitioningSplitOperator opCopy = new PartitioningSplitOperator(
+                exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getExpressions()), op.getDefaultBranchIndex());
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitProjectOperator(ProjectOperator op, ILogicalOperator arg) throws AlgebricksException {
+        ProjectOperator opCopy = new ProjectOperator(deepCopyVariableList(op.getVariables()));
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        boolean[] outputMatFlags = op.getOutputMaterializationFlags();
+        boolean[] copiedOutputMatFlags = new boolean[outputMatFlags.length];
+        System.arraycopy(outputMatFlags, 0, copiedOutputMatFlags, 0, outputMatFlags.length);
+        ReplicateOperator opCopy = new ReplicateOperator(op.getOutputArity(), copiedOutputMatFlags);
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        MaterializeOperator opCopy = new MaterializeOperator();
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitRunningAggregateOperator(RunningAggregateOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        RunningAggregateOperator opCopy = new RunningAggregateOperator(deepCopyVariableList(op.getVariables()),
+                exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getExpressions()));
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitScriptOperator(ScriptOperator op, ILogicalOperator arg) throws AlgebricksException {
+        ScriptOperator opCopy = new ScriptOperator(op.getScriptDescription(),
+                deepCopyVariableList(op.getInputVariables()), deepCopyVariableList(op.getOutputVariables()));
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitSelectOperator(SelectOperator op, ILogicalOperator arg) throws AlgebricksException {
+        SelectOperator opCopy = new SelectOperator(exprDeepCopyVisitor.deepCopyExpressionReference(op.getCondition()),
+                op.getRetainNull(), deepCopyVariable(op.getNullPlaceholderVariable()));
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitSubplanOperator(SubplanOperator op, ILogicalOperator arg) throws AlgebricksException {
+        List<ILogicalPlan> nestedPlansCopy = new ArrayList<ILogicalPlan>();
+        SubplanOperator opCopy = new SubplanOperator(nestedPlansCopy);
+        deepCopyPlanList(op.getNestedPlans(), nestedPlansCopy, opCopy);
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitUnionOperator(UnionAllOperator op, ILogicalOperator arg) throws AlgebricksException {
+        List<Mutable<ILogicalOperator>> copiedInputs = new ArrayList<>();
+        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+            copiedInputs.add(deepCopyOperatorReference(childRef, null));
+        }
+        List<List<LogicalVariable>> liveVarsInInputs = new ArrayList<>();
+        for (Mutable<ILogicalOperator> inputOpRef : copiedInputs) {
+            List<LogicalVariable> liveVars = new ArrayList<>();
+            VariableUtilities.getLiveVariables(inputOpRef.getValue(), liveVars);
+            liveVarsInInputs.add(liveVars);
+        }
+        List<LogicalVariable> liveVarsInLeftInput = liveVarsInInputs.get(0);
+        List<LogicalVariable> liveVarsInRightInput = liveVarsInInputs.get(1);
+        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> copiedTriples = new ArrayList<>();
+        int index = 0;
+        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple : op.getVariableMappings()) {
+            LogicalVariable producedVar = deepCopyVariable(triple.third);
+            Triple<LogicalVariable, LogicalVariable, LogicalVariable> copiedTriple = new Triple<>(
+                    liveVarsInLeftInput.get(index), liveVarsInRightInput.get(index), producedVar);
+            copiedTriples.add(copiedTriple);
+            ++index;
+        }
+        UnionAllOperator opCopy = new UnionAllOperator(copiedTriples);
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        UnnestMapOperator opCopy = new UnnestMapOperator(deepCopyVariableList(op.getVariables()),
+                exprDeepCopyVisitor.deepCopyExpressionReference(op.getExpressionRef()), op.getVariableTypes(),
+                op.propagatesInput());
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitUnnestOperator(UnnestOperator op, ILogicalOperator arg) throws AlgebricksException {
+        UnnestOperator opCopy = new UnnestOperator(deepCopyVariable(op.getVariable()),
+                exprDeepCopyVisitor.deepCopyExpressionReference(op.getExpressionRef()),
+                deepCopyVariable(op.getPositionalVariable()), op.getPositionalVariableType(), op.getPositionWriter());
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        TokenizeOperator opCopy = new TokenizeOperator(op.getDataSourceIndex(),
+                exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getPrimaryKeyExpressions()),
+                exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getSecondaryKeyExpressions()),
+                this.deepCopyVariableList(op.getTokenizeVars()),
+                exprDeepCopyVisitor.deepCopyExpressionReference(op.getFilterExpression()), op.getOperation(),
+                op.isBulkload(), op.isPartitioned(), op.getTokenizeVarTypes());
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
+    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ILogicalOperator visitExternalDataLookupOperator(ExternalDataLookupOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        ExternalDataLookupOperator opCopy = new ExternalDataLookupOperator(deepCopyVariableList(op.getVariables()),
+                exprDeepCopyVisitor.deepCopyExpressionReference(op.getExpressionRef()), op.getVariableTypes(),
+                op.isPropagateInput(), op.getDataSource());
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+
+    }
+
+    @Override
+    public ILogicalOperator visitOuterUnnestOperator(OuterUnnestOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        OuterUnnestOperator opCopy = new OuterUnnestOperator(deepCopyVariable(op.getVariable()),
+                exprDeepCopyVisitor.deepCopyExpressionReference(op.getExpressionRef()),
+                deepCopyVariable(op.getPositionalVariable()), op.getPositionalVariableType(), op.getPositionWriter());
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    public Map<LogicalVariable, LogicalVariable> getOutputToInputVariableMapping() {
+        return outputVarToInputVarMapping;
+    }
+
+    public Map<LogicalVariable, LogicalVariable> getInputToOutputVariableMapping() {
+        return inputVarToOutputVarMapping;
+    }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 2268a9e..ddcf6c8 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -18,11 +18,9 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -31,7 +29,6 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestNonMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -69,10 +66,10 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
-public class SubstituteVariableVisitor implements ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> {
+public class SubstituteVariableVisitor
+        implements ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> {
 
     private final boolean goThroughNts;
     private final ITypingContext ctx;
@@ -149,7 +146,8 @@
     }
 
     @Override
-    public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Pair<LogicalVariable, LogicalVariable> pair) {
+    public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op,
+            Pair<LogicalVariable, LogicalVariable> pair) {
         // does not use any variable
         return null;
     }
@@ -167,8 +165,7 @@
         subst(pair.first, pair.second, op.getDecorList());
         for (ILogicalPlan p : op.getNestedPlans()) {
             for (Mutable<ILogicalOperator> r : p.getRoots()) {
-                OperatorManipulationUtil.substituteVarRec((AbstractLogicalOperator) r.getValue(), pair.first,
-                        pair.second, goThroughNts, ctx);
+                VariableUtilities.substituteVariablesInDescendantsAndSelf(r.getValue(), pair.first, pair.second, ctx);
             }
         }
         substVarTypes(op, pair);
@@ -204,8 +201,8 @@
     }
 
     @Override
-    public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Pair<LogicalVariable, LogicalVariable> pair)
-            throws AlgebricksException {
+    public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op,
+            Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
         return null;
     }
 
@@ -220,8 +217,8 @@
     }
 
     @Override
-    public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Pair<LogicalVariable, LogicalVariable> pair)
-            throws AlgebricksException {
+    public Void visitPartitioningSplitOperator(PartitioningSplitOperator op,
+            Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
         for (Mutable<ILogicalExpression> e : op.getExpressions()) {
             e.getValue().substituteVar(pair.first, pair.second);
         }
@@ -280,8 +277,7 @@
             throws AlgebricksException {
         for (ILogicalPlan p : op.getNestedPlans()) {
             for (Mutable<ILogicalOperator> r : p.getRoots()) {
-                OperatorManipulationUtil.substituteVarRec((AbstractLogicalOperator) r.getValue(), pair.first,
-                        pair.second, goThroughNts, ctx);
+                VariableUtilities.substituteVariablesInDescendantsAndSelf(r.getValue(), pair.first, pair.second, ctx);
             }
         }
         return null;
@@ -369,7 +365,7 @@
         }
     }
 
-    private void substInArray(ArrayList<LogicalVariable> varArray, LogicalVariable v1, LogicalVariable v2) {
+    private void substInArray(List<LogicalVariable> varArray, LogicalVariable v1, LogicalVariable v2) {
         for (int i = 0; i < varArray.size(); i++) {
             LogicalVariable v = varArray.get(i);
             if (v == v1) {
@@ -403,8 +399,8 @@
     }
 
     @Override
-    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Pair<LogicalVariable, LogicalVariable> pair)
-            throws AlgebricksException {
+    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op,
+            Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
         for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
             e.getValue().substituteVar(pair.first, pair.second);
         }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index a3ea549..70eb544 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -21,15 +21,17 @@
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
 public class VariableUtilities {
@@ -98,4 +100,32 @@
         return varSet.equals(varArgSet);
     }
 
+    /**
+     * Recursively modifies the query plan to make sure every variable in {@code varsToEnforce}
+     * be part of the output schema of {@code opRef}.
+     *
+     * @param opRef,
+     *            the operator to enforce.
+     * @param varsToEnforce,
+     *            the variables that needs to be live after the operator of {@code opRef}.
+     * @param context,
+     *            the optimization context.
+     * @return a map that maps a variable in {@code varsToEnforce} to yet-another-variable if
+     *         a mapping happens in the query plan under {@code opRef}, e.g., by grouping and assigning.
+     * @throws AlgebricksException
+     */
+    public static Map<LogicalVariable, LogicalVariable> enforceVariablesInDescendantsAndSelf(
+            Mutable<ILogicalOperator> opRef, Collection<LogicalVariable> varsToEnforce, IOptimizationContext context)
+                    throws AlgebricksException {
+        Set<LogicalVariable> copiedVarsToEnforce = new HashSet<>();
+        copiedVarsToEnforce.addAll(varsToEnforce);
+        // Rewrites the query plan
+        EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(context);
+        ILogicalOperator result = opRef.getValue().accept(visitor, copiedVarsToEnforce);
+        opRef.setValue(result);
+        // Re-computes the type environment bottom up.
+        OperatorManipulationUtil.computeTypeEnvironmentBottomUp(result, context);
+        return visitor.getInputVariableToOutputVariableMap();
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index 74200b4..27d06f0 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -61,16 +61,19 @@
         return getOperatorTag().toString();
     }
 
+    @Override
     public void setHostQueryContext(Object context) {
         this.hostQueryContext = context;
     }
 
+    @Override
     public Object getHostQueryContext() {
         return hostQueryContext;
     }
 
     protected PhysicalRequirements emptyUnaryRequirements() {
-        StructuralPropertiesVector[] req = new StructuralPropertiesVector[] { StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR };
+        StructuralPropertiesVector[] req = new StructuralPropertiesVector[] {
+                StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR };
         return new PhysicalRequirements(req, IPartitioningRequirementsCoordinator.NO_COORDINATION);
     }
 
@@ -103,7 +106,8 @@
         return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
     }
 
-    protected void contributeOpDesc(IHyracksJobBuilder builder, AbstractLogicalOperator op, IOperatorDescriptor opDesc) {
+    protected void contributeOpDesc(IHyracksJobBuilder builder, AbstractLogicalOperator op,
+            IOperatorDescriptor opDesc) {
         if (op.getExecutionMode() == ExecutionMode.UNPARTITIONED) {
             AlgebricksPartitionConstraint apc = new AlgebricksCountPartitionConstraint(1);
             builder.contributeAlgebricksPartitionConstraint(opDesc, apc);
@@ -113,7 +117,7 @@
 
     protected AlgebricksPipeline[] compileSubplans(IOperatorSchema outerPlanSchema,
             AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, JobGenContext context)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         AlgebricksPipeline[] subplans = new AlgebricksPipeline[npOp.getNestedPlans().size()];
         PlanCompiler pc = new PlanCompiler(context);
         int i = 0;
@@ -124,7 +128,8 @@
     }
 
     private AlgebricksPipeline buildPipelineWithProjection(ILogicalPlan p, IOperatorSchema outerPlanSchema,
-            AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, PlanCompiler pc) throws AlgebricksException {
+            AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, PlanCompiler pc)
+                    throws AlgebricksException {
         if (p.getRoots().size() > 1) {
             throw new NotImplementedException("Nested plans with several roots are not supported.");
         }
@@ -136,10 +141,8 @@
 
         Map<OperatorDescriptorId, IOperatorDescriptor> opMap = nestedJob.getOperatorMap();
         if (opMap.size() != 1) {
-            throw new AlgebricksException(
-                    "Attempting to construct a nested plan with "
-                            + opMap.size()
-                            + " operator descriptors. Currently, nested plans can only consist in linear pipelines of Asterix micro operators.");
+            throw new AlgebricksException("Attempting to construct a nested plan with " + opMap.size()
+                    + " operator descriptors. Currently, nested plans can only consist in linear pipelines of Asterix micro operators.");
         }
 
         for (OperatorDescriptorId oid : opMap.keySet()) {
@@ -158,4 +161,4 @@
 
         throw new IllegalStateException();
     }
-}
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
index ecb7186..617e7ec 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
@@ -22,7 +22,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
@@ -85,7 +84,7 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         SubplanOperator subplan = (SubplanOperator) op;
         if (subplan.getNestedPlans().size() != 1) {
             throw new NotImplementedException("Subplan currently works only for one nested plan with one root.");
@@ -93,8 +92,8 @@
         AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], subplan, opSchema, context);
         assert (subplans.length == 1);
         AlgebricksPipeline np = subplans[0];
-        RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op.getInputs().get(0).getValue()),
-                inputSchemas[0], context);
+        RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(
+                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
         INullWriterFactory[] nullWriterFactories = new INullWriterFactory[np.getOutputWidth()];
         for (int i = 0; i < nullWriterFactories.length; i++) {
             nullWriterFactories[i] = context.getNullWriterFactory();
@@ -112,4 +111,4 @@
     public boolean expensiveThanMaterialization() {
         return true;
     }
-}
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 61a191f..b4569e4 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -23,7 +23,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -226,4 +225,21 @@
         return op.accept(visitor, null);
     }
 
+    /**
+     * Compute type environment of a newly generated operator {@code op} and its input.
+     *
+     * @param op,
+     *            the logical operator.
+     * @param context,the
+     *            optimization context.
+     * @throws AlgebricksException
+     */
+    public static void computeTypeEnvironmentBottomUp(ILogicalOperator op, IOptimizationContext context)
+            throws AlgebricksException {
+        for (Mutable<ILogicalOperator> children : op.getInputs()) {
+            computeTypeEnvironmentBottomUp(children.getValue(), context);
+        }
+        context.computeAndSetTypeEnvironmentForOperator(op);
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
new file mode 100644
index 0000000..b5ce212
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.visitors;
+
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+
+public interface IQueryOperatorVisitor<R, T> extends ILogicalOperatorVisitor<R, T> {
+
+    @Override
+    public default R visitWriteOperator(WriteOperator op, T arg) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public default R visitDistributeResultOperator(DistributeResultOperator op, T arg) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public default R visitWriteResultOperator(WriteResultOperator op, T arg) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public default R visitInsertDeleteOperator(InsertDeleteOperator op, T arg) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public default R visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, T arg) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public default R visitSinkOperator(SinkOperator op, T arg) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java
index 83740de..128c372 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java
@@ -24,8 +24,34 @@
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 
 public interface IAlgebraicRewriteRule {
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException;
 
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException;
+    /**
+     * This method is invoked in pre-order traversal of a query plan.
+     *
+     * @param opRef,
+     *            the reference of the current operator to look at,
+     * @param context
+     *            the optimization context
+     * @return true if any change is introduced to the query plan; false otherwise.
+     * @throws AlgebricksException
+     */
+    public default boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+    /**
+     * This method is invoked in the post-order traversal of a query plan.
+     *
+     * @param opRef,
+     *            the reference of the current operator to look at,
+     * @param context
+     *            the optimization context
+     * @return true if any change is introduced to the query plan; false otherwise.
+     * @throws AlgebricksException
+     */
+    public default boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
 }
diff --git a/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitorTest.java b/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitorTest.java
new file mode 100644
index 0000000..d0676cf
--- /dev/null
+++ b/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitorTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class EnforceVariablesVisitorTest {
+
+    /**
+     * Tests the processing of project operator in RecoverVariablesVisitor.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testProject() throws Exception {
+        // Constructs the input operator.
+        LogicalVariable var = new LogicalVariable(1);
+        List<LogicalVariable> inputVarList = new ArrayList<>();
+        inputVarList.add(var);
+        ProjectOperator projectOp = new ProjectOperator(inputVarList);
+
+        // Constructs the visitor.
+        IOptimizationContext mockedContext = mock(IOptimizationContext.class);
+        EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
+
+        // Calls the visitor.
+        LogicalVariable varToEnforce = new LogicalVariable(2);
+        ProjectOperator op = (ProjectOperator) projectOp.accept(visitor,
+                Arrays.asList(new LogicalVariable[] { varToEnforce }));
+
+        // Checks the result.
+        List<LogicalVariable> expectedVars = Arrays.asList(new LogicalVariable[] { var, varToEnforce });
+        Assert.assertEquals(expectedVars, op.getVariables());
+        Assert.assertTrue(visitor.getInputVariableToOutputVariableMap().isEmpty());
+    }
+
+    /**
+     * Tests the processing of group-by operator in RecoverVariablesVisitor.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testGroupby() throws Exception {
+        // Constructs the group-by operator.
+        LogicalVariable keyVar = new LogicalVariable(2);
+        LogicalVariable keyExprVar = new LogicalVariable(1);
+        GroupByOperator gbyOp = new GroupByOperator();
+        gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(keyVar,
+                new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyExprVar))));
+
+        // Constructs the visitor.
+        IOptimizationContext mockedContext = mock(IOptimizationContext.class);
+        EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
+
+        // Calls the visitor.
+        LogicalVariable varToEnforce = new LogicalVariable(3);
+        Set<LogicalVariable> varsToEnforce = new HashSet<>();
+        varsToEnforce.add(keyExprVar);
+        varsToEnforce.add(varToEnforce);
+        GroupByOperator op = (GroupByOperator) gbyOp.accept(visitor, varsToEnforce);
+
+        // Checks the result.
+        Map<LogicalVariable, LogicalVariable> expectedVarMap = new HashMap<>();
+        expectedVarMap.put(keyExprVar, keyVar);
+        Assert.assertEquals(expectedVarMap, visitor.getInputVariableToOutputVariableMap());
+        VariableReferenceExpression decorVarExpr = (VariableReferenceExpression) op.getDecorList().get(0).second
+                .getValue();
+        Assert.assertEquals(decorVarExpr.getVariableReference(), varToEnforce);
+    }
+
+    /**
+     * Tests the processing of aggregate operator in RecoverVariablesVisitor.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testAggregate() throws Exception {
+        // Constructs the group-by operator.
+        List<LogicalVariable> aggVars = new ArrayList<>();
+        List<Mutable<ILogicalExpression>> aggExprRefs = new ArrayList<>();
+        AggregateOperator aggOp = new AggregateOperator(aggVars, aggExprRefs);
+
+        // Constructs the visitor.
+        LogicalVariable var = new LogicalVariable(3);
+        IOptimizationContext mockedContext = mock(IOptimizationContext.class);
+        when(mockedContext.newVar()).thenReturn(var);
+        EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
+
+        // Calls the visitor.
+        LogicalVariable varToEnforce = new LogicalVariable(2);
+        Set<LogicalVariable> varsToEnforce = new HashSet<>();
+        varsToEnforce.add(varToEnforce);
+        GroupByOperator op = (GroupByOperator) aggOp.accept(visitor, varsToEnforce);
+
+        // Checks the result.
+        Map<LogicalVariable, LogicalVariable> expectedVarMap = new HashMap<>();
+        expectedVarMap.put(varToEnforce, var);
+        Assert.assertEquals(expectedVarMap, visitor.getInputVariableToOutputVariableMap());
+        VariableReferenceExpression keyExpr = (VariableReferenceExpression) op.getGroupByList().get(0).second
+                .getValue();
+        Assert.assertEquals(keyExpr.getVariableReference(), varToEnforce);
+        LogicalVariable expectedGbyVar = op.getGroupByList().get(0).first;
+        Assert.assertEquals(expectedGbyVar, var);
+    }
+
+    /**
+     * Tests the processing of two serial group-by operators in RecoverVariablesVisitor.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testTwoGroupbys() throws Exception {
+        // Constructs the group-by operators.
+        LogicalVariable keyVar = new LogicalVariable(1);
+        LogicalVariable keyExprVar = new LogicalVariable(2);
+        GroupByOperator gbyOp = new GroupByOperator();
+        gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(keyVar,
+                new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyExprVar))));
+        LogicalVariable keyVar2 = new LogicalVariable(2);
+        LogicalVariable keyExprVar2 = new LogicalVariable(3);
+        GroupByOperator gbyOp2 = new GroupByOperator();
+        gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(keyVar2,
+                new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyExprVar2))));
+        gbyOp.getInputs().add(new MutableObject<ILogicalOperator>(gbyOp2));
+
+        // Constructs the visitor.
+        IOptimizationContext mockedContext = mock(IOptimizationContext.class);
+        EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
+
+        // Calls the visitor.
+        LogicalVariable varToEnforce = new LogicalVariable(4);
+        Set<LogicalVariable> varsToEnforce = new HashSet<>();
+        varsToEnforce.add(keyExprVar2);
+        varsToEnforce.add(varToEnforce);
+        GroupByOperator op = (GroupByOperator) gbyOp.accept(visitor, varsToEnforce);
+
+        // Checks the result.
+        Map<LogicalVariable, LogicalVariable> expectedVarMap = new HashMap<>();
+        expectedVarMap.put(keyExprVar2, keyVar);
+        Assert.assertEquals(expectedVarMap, visitor.getInputVariableToOutputVariableMap());
+        VariableReferenceExpression decorVarExpr = (VariableReferenceExpression) op.getDecorList().get(0).second
+                .getValue();
+        Assert.assertEquals(decorVarExpr.getVariableReference(), varToEnforce);
+        GroupByOperator op2 = (GroupByOperator) op.getInputs().get(0).getValue();
+        VariableReferenceExpression decorVarExpr2 = (VariableReferenceExpression) op2.getDecorList().get(0).second
+                .getValue();
+        Assert.assertEquals(decorVarExpr2.getVariableReference(), varToEnforce);
+    }
+
+}
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
index 6ff1477..952c7c1 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
@@ -27,7 +27,6 @@
 import org.apache.hyracks.algebricks.rewriter.rules.ComplexJoinInferenceRule;
 import org.apache.hyracks.algebricks.rewriter.rules.ConsolidateAssignsRule;
 import org.apache.hyracks.algebricks.rewriter.rules.ConsolidateSelectsRule;
-import org.apache.hyracks.algebricks.rewriter.rules.EliminateSubplanRule;
 import org.apache.hyracks.algebricks.rewriter.rules.EnforceStructuralPropertiesRule;
 import org.apache.hyracks.algebricks.rewriter.rules.ExtractCommonOperatorsRule;
 import org.apache.hyracks.algebricks.rewriter.rules.ExtractGbyExpressionsRule;
@@ -45,6 +44,7 @@
 import org.apache.hyracks.algebricks.rewriter.rules.RemoveUnusedAssignAndAggregateRule;
 import org.apache.hyracks.algebricks.rewriter.rules.SetAlgebricksPhysicalOperatorsRule;
 import org.apache.hyracks.algebricks.rewriter.rules.SetExecutionModeRule;
+import org.apache.hyracks.algebricks.rewriter.rules.subplan.EliminateSubplanRule;
 
 public class PigletRewriteRuleset {
 
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index 1ecb893..f2645f5 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -85,7 +85,8 @@
     }
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         if (hasRun) {
             return false;
         }
@@ -145,7 +146,7 @@
             List<Mutable<ILogicalExpression>> exprs = assignOp.getExpressions();
             for (int i = 0; i < vars.size(); i++) {
                 ILogicalExpression expr = exprs.get(i).getValue();
-                // Ignore functions that are either in the doNotInline set or are non-functional               
+                // Ignore functions that are either in the doNotInline set or are non-functional
                 if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
                     AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
                     if (doNotInlineFuncs.contains(funcExpr.getFunctionIdentifier())
@@ -174,6 +175,14 @@
             }
         }
 
+        // References to variables generated in the right branch of a left-outer-join cannot be inlined
+        // in operators above the left-outer-join.
+        if (op.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+            Set<LogicalVariable> rightLiveVars = new HashSet<LogicalVariable>();
+            VariableUtilities.getLiveVariables(op.getInputs().get(1).getValue(), rightLiveVars);
+            varAssignRhs.keySet().removeAll(rightLiveVars);
+        }
+
         if (performBottomUpAction(op)) {
             modified = true;
         }
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java
index d6f1889..12e5ebe 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java
@@ -28,7 +28,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -64,6 +63,7 @@
 
         List<ILogicalOperator> pushedOnLeft = new ArrayList<ILogicalOperator>();
         List<ILogicalOperator> pushedOnRight = new ArrayList<ILogicalOperator>();
+        List<ILogicalOperator> pushedOnEither = new ArrayList<ILogicalOperator>();
         LinkedList<ILogicalOperator> notPushedStack = new LinkedList<ILogicalOperator>();
         Collection<LogicalVariable> usedVars = new HashSet<LogicalVariable>();
         Collection<LogicalVariable> producedVars = new HashSet<LogicalVariable>();
@@ -107,7 +107,9 @@
                 } else {
                     VariableUtilities.getUsedVariables(opIter, usedVars);
                     VariableUtilities.getProducedVariables(opIter, producedVars);
-                    if (joinLiveVarsLeft.containsAll(usedVars)) {
+                    if (usedVars.size() == 0) {
+                        pushedOnEither.add(opIter);
+                    } else if (joinLiveVarsLeft.containsAll(usedVars)) {
                         pushedOnLeft.add(opIter);
                         liveInOpsToPushLeft.addAll(producedVars);
                     } else if (joinLiveVarsRight.containsAll(usedVars)) {
@@ -149,6 +151,12 @@
             return false;
         }
         if (needToPushOps) {
+            //We should push independent ops into the first branch that the selection depends on
+            if (intersectsBranch[0]) {
+                pushOps(pushedOnEither, joinBranchLeftRef, context);
+            } else {
+                pushOps(pushedOnEither, joinBranchRightRef, context);
+            }
             pushOps(pushedOnLeft, joinBranchLeftRef, context);
             pushOps(pushedOnRight, joinBranchRightRef, context);
         }
@@ -226,8 +234,8 @@
             if (cond.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
                 AbstractFunctionCallExpression fcond = (AbstractFunctionCallExpression) cond;
                 if (fcond.getFunctionIdentifier().equals(AlgebricksBuiltinFunctions.AND)) {
-                    AbstractFunctionCallExpression newCond = new ScalarFunctionCallExpression(context
-                            .getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.AND));
+                    AbstractFunctionCallExpression newCond = new ScalarFunctionCallExpression(
+                            context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.AND));
                     newCond.getArguments().add(select.getCondition());
                     newCond.getArguments().addAll(fcond.getArguments());
                     join.getCondition().setValue(newCond);
@@ -235,9 +243,9 @@
                 }
             }
             if (!bAddedToConj) {
-                AbstractFunctionCallExpression newCond = new ScalarFunctionCallExpression(context.getMetadataProvider()
-                        .lookupFunction(AlgebricksBuiltinFunctions.AND), select.getCondition(),
-                        new MutableObject<ILogicalExpression>(join.getCondition().getValue()));
+                AbstractFunctionCallExpression newCond = new ScalarFunctionCallExpression(
+                        context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.AND),
+                        select.getCondition(), new MutableObject<ILogicalExpression>(join.getCondition().getValue()));
                 join.getCondition().setValue(newCond);
             }
         }
@@ -255,7 +263,7 @@
 
     /**
      * Whether the expression contains a not-null filtering
-     * 
+     *
      * @param expr
      * @return true if the expression contains a not-null filtering function call; false otherwise.
      */
@@ -288,7 +296,7 @@
 
     /**
      * Whether the expression contains a null filtering
-     * 
+     *
      * @param expr
      * @return true if the expression contains a null filtering function call; false otherwise.
      */
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
index 50b4ea9..21e9c06 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
@@ -36,7 +36,6 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -80,19 +79,41 @@
         Mutable<ILogicalOperator> tupleSourceOpRef = currentOpRef;
         currentOpRef = opRef;
         if (tupleSourceOpRef.getValue().getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
-            NestedTupleSourceOperator nts = (NestedTupleSourceOperator) tupleSourceOpRef.getValue();
-            // If the subplan input is a trivial plan, do not do the rewriting.
-            if (nts.getSourceOperator().getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE) {
-                while (currentOpRef.getValue().getInputs().size() == 1
-                        && currentOpRef.getValue() instanceof AbstractScanOperator
-                        && descOrSelfIsSourceScan((AbstractLogicalOperator) currentOpRef.getValue())) {
-                    if (opsAreIndependent(currentOpRef.getValue(), tupleSourceOpRef.getValue())) {
-                        /** move down the boundary if the operator is independent of the tuple source */
-                        boundaryOpRef = currentOpRef.getValue().getInputs().get(0);
-                    } else {
-                        break;
-                    }
-                    currentOpRef = currentOpRef.getValue().getInputs().get(0);
+            while (currentOpRef.getValue().getInputs().size() == 1
+                    /*
+                     * When this rule is fired,
+                     * Unnests with a dataset function have been rewritten to DataSourceScans and
+                     * AccessMethod related rewriting hasn't been done. Therefore, we only need
+                     * to check if currentOpRef holds a DataSourceScanOperator.
+                     */
+                    && currentOpRef.getValue().getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN
+                    && descOrSelfIsSourceScan((AbstractLogicalOperator) currentOpRef.getValue())) {
+                if (opsAreIndependent(currentOpRef.getValue(), tupleSourceOpRef.getValue())) {
+                    /** move down the boundary if the operator is independent of the tuple source */
+                    boundaryOpRef = currentOpRef.getValue().getInputs().get(0);
+                } else {
+                    break;
+                }
+                currentOpRef = currentOpRef.getValue().getInputs().get(0);
+            }
+        } else {
+            //Move the boundary below any top const assigns.
+            boundaryOpRef = opRef.getValue().getInputs().get(0);
+            while (boundaryOpRef.getValue().getInputs().size() == 1
+                    /*
+                     * When this rule is fired,
+                     * Unnests with a dataset function have been rewritten to DataSourceScans and
+                     * AccessMethod related rewriting hasn't been done. Therefore, we only need
+                     * to check if boundaryOpRef holds a DataSourceScanOperator.
+                     */
+                    && boundaryOpRef.getValue().getOperatorTag() != LogicalOperatorTag.DATASOURCESCAN) {
+                List<LogicalVariable> opUsedVars = new ArrayList<LogicalVariable>();
+                VariableUtilities.getUsedVariables(boundaryOpRef.getValue(), opUsedVars);
+                if (opUsedVars.size() == 0) {
+                    // move down the boundary if the operator is a const assigns.
+                    boundaryOpRef = boundaryOpRef.getValue().getInputs().get(0);
+                } else {
+                    break;
                 }
             }
         }
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanRule.java
similarity index 92%
rename from algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanRule.java
rename to algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanRule.java
index 7327e94..32c7e03 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanRule.java
@@ -16,13 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.rewriter.rules;
+package org.apache.hyracks.algebricks.rewriter.rules.subplan;
 
 import java.util.LinkedList;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -31,7 +30,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
@@ -46,12 +45,11 @@
 
     /**
      * Eliminate Subplan above ETS
-     * and Subplan that has only ops. with one input and no free vars. (could we
-     * modify it to consider free vars which are sources of Unnest or Assign, if
-     * there are no aggregates?)
+     * and Subplan that has only ops. with one input and no free vars.
      */
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         if (op.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
             return false;
@@ -117,8 +115,8 @@
                 if (topOp == null) {
                     topOp = r.getValue();
                 } else {
-                    LeftOuterJoinOperator j = new LeftOuterJoinOperator(new MutableObject<ILogicalExpression>(
-                            ConstantExpression.TRUE));
+                    InnerJoinOperator j = new InnerJoinOperator(
+                            new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
                     j.getInputs().add(new MutableObject<ILogicalOperator>(topOp));
                     j.getInputs().add(r);
                     ctx.setOutputTypeEnvironment(j, j.computeOutputTypeEnvironment(ctx));
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanWithInputCardinalityOneRule.java
similarity index 99%
rename from algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java
rename to algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanWithInputCardinalityOneRule.java
index df6e1ee..f7b6dea 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanWithInputCardinalityOneRule.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.rewriter.rules;
+package org.apache.hyracks.algebricks.rewriter.rules.subplan;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java
similarity index 94%
rename from algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java
rename to algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java
index 67e8d42..3e65d91 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceGroupByForSubplanRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceGroupByForSubplanRule.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.rewriter.rules;
+package org.apache.hyracks.algebricks.rewriter.rules.subplan;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -29,7 +29,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -70,7 +69,7 @@
  *
  * <pre>
  * Before
- * 
+ *
  *   plan__parent
  *   SUBPLAN {
  *     PROJECT?
@@ -80,11 +79,11 @@
  *       plan__nested_B
  *   }
  *   plan__child
- * 
+ *
  *   where $condition does not equal a constant true.
- * 
+ *
  * After (This is a general application of the rule, specifics may vary based on the query plan.)
- * 
+ *
  *   plan__parent
  *   GROUP_BY {
  *     PROJECT?
@@ -106,7 +105,8 @@
 public class IntroduceGroupByForSubplanRule implements IAlgebraicRewriteRule {
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         return false;
     }
 
@@ -188,9 +188,18 @@
         Set<LogicalVariable> pkVars = computeGbyVars(outerNts, free, context);
         if (pkVars == null || pkVars.size() < 1) {
             // there is no non-trivial primary key, group-by keys are all live variables
+            // that were produced by descendant or self
             ILogicalOperator subplanInput = subplan.getInputs().get(0).getValue();
             pkVars = new HashSet<LogicalVariable>();
+            //get live variables
             VariableUtilities.getLiveVariables(subplanInput, pkVars);
+
+            //get produced variables
+            Set<LogicalVariable> producedVars = new HashSet<LogicalVariable>();
+            VariableUtilities.getProducedVariablesInDescendantsAndSelf(subplanInput, producedVars);
+
+            //retain the intersection
+            pkVars.retainAll(producedVars);
         }
         AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Found FD for introducing group-by: " + pkVars);
 
@@ -222,11 +231,13 @@
                 }
                 break;
             }
+            default:
+                break;
         }
         if (testForNull == null) {
             testForNull = context.newVar();
-            AssignOperator tmpAsgn = new AssignOperator(testForNull, new MutableObject<ILogicalExpression>(
-                    ConstantExpression.TRUE));
+            AssignOperator tmpAsgn = new AssignOperator(testForNull,
+                    new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
             tmpAsgn.getInputs().add(new MutableObject<ILogicalOperator>(rightRef.getValue()));
             rightRef.setValue(tmpAsgn);
             context.computeAndSetTypeEnvironmentForOperator(tmpAsgn);
@@ -261,9 +272,8 @@
         Map<LogicalVariable, LogicalVariable> mappedVars = buildVarExprList(pkVars, context, g, g.getGroupByList());
         context.updatePrimaryKeys(mappedVars);
         for (LogicalVariable uv : underVars) {
-            g.getDecorList().add(
-                    new Pair<LogicalVariable, Mutable<ILogicalExpression>>(null, new MutableObject<ILogicalExpression>(
-                            new VariableReferenceExpression(uv))));
+            g.getDecorList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(null,
+                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(uv))));
         }
         OperatorPropertiesUtil.typeOpRec(subplanRoot, context);
         OperatorPropertiesUtil.typeOpRec(gPlan.getRoots().get(0), context);
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InsertOuterJoinRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceLeftOuterJoinForSubplanRule.java
similarity index 96%
rename from algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InsertOuterJoinRule.java
rename to algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceLeftOuterJoinForSubplanRule.java
index b8c75bd..daf27ac 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InsertOuterJoinRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceLeftOuterJoinForSubplanRule.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.rewriter.rules;
+package org.apache.hyracks.algebricks.rewriter.rules.subplan;
 
 import java.util.Iterator;
 
@@ -34,7 +34,7 @@
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
-public class InsertOuterJoinRule implements IAlgebraicRewriteRule {
+public class IntroduceLeftOuterJoinForSubplanRule implements IAlgebraicRewriteRule {
 
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/MoveFreeVariableOperatorOutOfSubplanRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/MoveFreeVariableOperatorOutOfSubplanRule.java
similarity index 97%
rename from algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/MoveFreeVariableOperatorOutOfSubplanRule.java
rename to algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/MoveFreeVariableOperatorOutOfSubplanRule.java
index 06063b8..959466a 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/MoveFreeVariableOperatorOutOfSubplanRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/MoveFreeVariableOperatorOutOfSubplanRule.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.rewriter.rules;
+package org.apache.hyracks.algebricks.rewriter.rules.subplan;
 
 import java.util.HashSet;
 import java.util.ListIterator;
@@ -34,6 +34,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import org.apache.hyracks.algebricks.rewriter.rules.AbstractDecorrelationRule;
 
 /**
  * The rule searches for operators that can be moved outside the subplan.
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/NestedSubplanToJoinRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/NestedSubplanToJoinRule.java
similarity index 98%
rename from algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/NestedSubplanToJoinRule.java
rename to algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/NestedSubplanToJoinRule.java
index f8a93b0..35c7e4e 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/NestedSubplanToJoinRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/NestedSubplanToJoinRule.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.rewriter.rules;
+package org.apache.hyracks.algebricks.rewriter.rules.subplan;
 
 import java.util.ArrayList;
 import java.util.HashSet;
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSubplanIntoGroupByRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
similarity index 99%
rename from algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSubplanIntoGroupByRule.java
rename to algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
index 9dce6c9..74ff21f 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushSubplanIntoGroupByRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
@@ -18,7 +18,7 @@
  * under the License.
  */
 
-package org.apache.hyracks.algebricks.rewriter.rules;
+package org.apache.hyracks.algebricks.rewriter.rules.subplan;
 
 import java.util.ArrayList;
 import java.util.HashSet;
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SubplanOutOfGroupRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/SubplanOutOfGroupRule.java
similarity index 98%
rename from algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SubplanOutOfGroupRule.java
rename to algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/SubplanOutOfGroupRule.java
index d0d3dd7..0aba194 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SubplanOutOfGroupRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/SubplanOutOfGroupRule.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.algebricks.rewriter.rules;
+package org.apache.hyracks.algebricks.rewriter.rules.subplan;
 
 import java.util.ArrayList;
 import java.util.Iterator;