Fix ASTERIXDB-1205: cleanup union related rules.

-fixed EliminateSubplanWithInputCardinalityOneRule for general cases;
-fixed the type inference invocation in PushAssignBelowUnionAllRule;
-factored out ITypingContext and IVariableContext so that the operator deep copy
 visitor can also be used in the language translator without types;
-added a rule to remove Cartesian product with an ETS input.

Change-Id: I7ad982108a4dc119249222c8ebb9e5897d93783c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/628
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 813e58f..24578e6 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -21,7 +21,6 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableEvalSizeEnvironment;
@@ -33,13 +32,7 @@
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 
-public interface IOptimizationContext extends ITypingContext {
-
-    public abstract int getVarCounter();
-
-    public abstract void setVarCounter(int varCounter);
-
-    public abstract LogicalVariable newVar();
+public interface IOptimizationContext extends ITypingContext, IVariableContext {
 
     @Override
     public abstract IMetadataProvider<?, ?> getMetadataProvider();
@@ -87,10 +80,6 @@
 
     public abstract PhysicalOptimizationConfig getPhysicalOptimizationConfig();
 
-    public abstract void invalidateTypeEnvironmentForOperator(ILogicalOperator op);
-
-    public abstract void computeAndSetTypeEnvironmentForOperator(ILogicalOperator op) throws AlgebricksException;
-
     public abstract void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars);
 
     public abstract LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor();
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IVariableContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IVariableContext.java
new file mode 100644
index 0000000..e308a8d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IVariableContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.base;
+
+public interface IVariableContext {
+
+    public int getVarCounter();
+
+    public void setVarCounter(int varCounter);
+
+    public LogicalVariable newVar();
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
new file mode 100644
index 0000000..7b304ae
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * A visitor that provides the basic inference of tuple cardinalities of an operator's
+ * output.
+ * There are only two cases:
+ * 1. the cardinality is one in the worst case;
+ * 2. the cardinality is some unknown value.
+ */
+public class CardinalityInferenceVisitor implements ILogicalOperatorVisitor<Long, Void> {
+    private static final Long ONE = 1L;
+    private static final Long UNKNOWN = 1000L;
+
+    @Override
+    public Long visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+        return ONE;
+    }
+
+    @Override
+    public Long visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) throws AlgebricksException {
+        // Empty tuple source sends one empty tuple to kick off the pipeline.
+        return ONE;
+    }
+
+    @Override
+    public Long visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+        // This is only a worst-case estimate
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+        return visitJoin(op, arg);
+    }
+
+    @Override
+    public Long visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+        return visitJoin(op, arg);
+    }
+
+    @Override
+    public Long visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
+        return ONE;
+    }
+
+    @Override
+    public Long visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+        // This is only a worst-case inference.
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+        return UNKNOWN;
+    }
+
+    @Override
+    public Long visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+        return UNKNOWN;
+    }
+
+    @Override
+    public Long visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+        return UNKNOWN;
+    }
+
+    @Override
+    public Long visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+        return UNKNOWN;
+    }
+
+    @Override
+    public Long visitOuterUnnestOperator(OuterUnnestOperator op, Void arg) throws AlgebricksException {
+        return UNKNOWN;
+    }
+
+    @Override
+    public Long visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+        return UNKNOWN;
+    }
+
+    @Override
+    public Long visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+        return UNKNOWN;
+    }
+
+    @Override
+    public Long visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
+            throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
+    @Override
+    public Long visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+        return UNKNOWN;
+    }
+
+    @Override
+    public Long visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+        long cardinality = UNKNOWN;
+        for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+            Long branchCardinality = inputOpRef.getValue().accept(this, arg);
+            if (branchCardinality < cardinality) {
+                cardinality = branchCardinality;
+            }
+        }
+        return cardinality;
+    }
+
+    private long visitJoin(ILogicalOperator op, Void arg) throws AlgebricksException {
+        long cardinality = 1L;
+        for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+            cardinality *= inputOpRef.getValue().accept(this, arg);
+        }
+        if (cardinality > ONE) {
+            cardinality = UNKNOWN;
+        }
+        return cardinality;
+    }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
index da252d4..7f50db0 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
@@ -26,7 +26,7 @@
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.IVariableContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
@@ -40,13 +40,13 @@
 
 public class LogicalExpressionDeepCopyWithNewVariablesVisitor
         implements ILogicalExpressionVisitor<ILogicalExpression, Void> {
-    private final IOptimizationContext context;
+    private final IVariableContext varContext;
     private final Map<LogicalVariable, LogicalVariable> inVarMapping;
     private final Map<LogicalVariable, LogicalVariable> outVarMapping;
 
-    public LogicalExpressionDeepCopyWithNewVariablesVisitor(IOptimizationContext context,
+    public LogicalExpressionDeepCopyWithNewVariablesVisitor(IVariableContext varContext,
             Map<LogicalVariable, LogicalVariable> inVarMapping, Map<LogicalVariable, LogicalVariable> variableMapping) {
-        this.context = context;
+        this.varContext = varContext;
         this.inVarMapping = inVarMapping;
         this.outVarMapping = variableMapping;
     }
@@ -147,7 +147,7 @@
         }
         LogicalVariable varCopy = outVarMapping.get(var);
         if (varCopy == null) {
-            varCopy = context.newVar();
+            varCopy = varContext.newVar();
             outVarMapping.put(var, varCopy);
         }
         return new VariableReferenceExpression(varCopy);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 5e7ba79..3905d13 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.IVariableContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
@@ -64,6 +65,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
 
@@ -74,7 +76,8 @@
  */
 public class LogicalOperatorDeepCopyWithNewVariablesVisitor
         implements IQueryOperatorVisitor<ILogicalOperator, ILogicalOperator> {
-    private final IOptimizationContext context;
+    private final ITypingContext typeContext;
+    private final IVariableContext varContext;
     private final LogicalExpressionDeepCopyWithNewVariablesVisitor exprDeepCopyVisitor;
 
     // Key: Variable in the original plan. Value: New variable replacing the
@@ -88,11 +91,12 @@
      * @param IOptimizationContext,
      *            the optimization context
      */
-    public LogicalOperatorDeepCopyWithNewVariablesVisitor(IOptimizationContext context) {
-        this.context = context;
+    public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext varContext, ITypingContext typeContext) {
+        this.varContext = varContext;
+        this.typeContext = typeContext;
         this.inputVarToOutputVarMapping = new HashMap<>();
         this.outputVarToInputVarMapping = new HashMap<>();
-        this.exprDeepCopyVisitor = new LogicalExpressionDeepCopyWithNewVariablesVisitor(context,
+        this.exprDeepCopyVisitor = new LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext,
                 outputVarToInputVarMapping, inputVarToOutputVarMapping);
     }
 
@@ -104,12 +108,13 @@
      *            Those variables are replaced by their corresponding value in
      *            the map in the copied plan.
      */
-    public LogicalOperatorDeepCopyWithNewVariablesVisitor(IOptimizationContext context,
+    public LogicalOperatorDeepCopyWithNewVariablesVisitor(IVariableContext varContext, ITypingContext typeContext,
             Map<LogicalVariable, LogicalVariable> inVarMapping) {
-        this.context = context;
+        this.varContext = varContext;
+        this.typeContext = typeContext;
         this.inputVarToOutputVarMapping = inVarMapping;
         this.outputVarToInputVarMapping = new HashMap<>();
-        exprDeepCopyVisitor = new LogicalExpressionDeepCopyWithNewVariablesVisitor(context, inVarMapping,
+        exprDeepCopyVisitor = new LogicalExpressionDeepCopyWithNewVariablesVisitor(varContext, inVarMapping,
                 inputVarToOutputVarMapping);
     }
 
@@ -123,8 +128,13 @@
     }
 
     private ILogicalOperator deepCopy(ILogicalOperator op, ILogicalOperator arg) throws AlgebricksException {
+        if (op == null) {
+            return null;
+        }
         ILogicalOperator opCopy = op.accept(this, arg);
-        OperatorManipulationUtil.computeTypeEnvironmentBottomUp(opCopy, context);
+        if (typeContext != null) {
+            OperatorManipulationUtil.computeTypeEnvironmentBottomUp(opCopy, typeContext);
+        }
         return opCopy;
     }
 
@@ -198,7 +208,7 @@
         }
         LogicalVariable varCopy = inputVarToOutputVarMapping.get(var);
         if (varCopy == null) {
-            varCopy = context.newVar();
+            varCopy = varContext.newVar();
             inputVarToOutputVarMapping.put(var, varCopy);
         }
         return varCopy;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 3548497..55dc11a 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -406,8 +406,8 @@
     }
 
     @Override
-    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Pair<LogicalVariable, LogicalVariable> pair)
-            throws AlgebricksException {
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op,
+            Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
         op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);
         for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
             e.getValue().substituteVar(pair.first, pair.second);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index 28f783d..8859c03 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -22,6 +22,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -38,9 +39,9 @@
      * Adds the used variables in the logical operator to the list of used variables
      *
      * @param op
-     *          The target operator
+     *            The target operator
      * @param usedVariables
-     *          A list to be filled with variables used in the logical operator op.
+     *            A list to be filled with variables used in the logical operator op.
      * @throws AlgebricksException
      */
     public static void getUsedVariables(ILogicalOperator op, Collection<LogicalVariable> usedVariables)
@@ -51,10 +52,11 @@
 
     /**
      * Adds the variables produced in the logical operator in the list of produced variables
+     *
      * @param op
-     *          The target operator
+     *            The target operator
      * @param producedVariables
-     *          The variables produced in the logical operator
+     *            The variables produced in the logical operator
      * @throws AlgebricksException
      */
     public static void getProducedVariables(ILogicalOperator op, Collection<LogicalVariable> producedVariables)
@@ -65,10 +67,11 @@
 
     /**
      * Adds the variables that are live after the execution of this operator to the list of schema variables.
+     *
      * @param op
-     *          The target logical operator
+     *            The target logical operator
      * @param schemaVariables
-     *          The list of live variables. The output of the operator and the propagated outputs of its children
+     *            The list of live variables. The output of the operator and the propagated outputs of its children
      * @throws AlgebricksException
      */
     public static void getLiveVariables(ILogicalOperator op, Collection<LogicalVariable> schemaVariables)
@@ -131,6 +134,16 @@
         substituteVariables(op, v1, v2, true, ctx);
     }
 
+    public static void substituteVariablesInDescendantsAndSelf(ILogicalOperator op,
+            Map<LogicalVariable, LogicalVariable> varMap, ITypingContext ctx) throws AlgebricksException {
+        for (Entry<LogicalVariable, LogicalVariable> entry : varMap.entrySet()) {
+            for (Mutable<ILogicalOperator> childOp : op.getInputs()) {
+                substituteVariablesInDescendantsAndSelf(childOp.getValue(), entry.getKey(), entry.getValue(), ctx);
+            }
+            substituteVariables(op, entry.getKey(), entry.getValue(), true, ctx);
+        }
+    }
+
     public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,
             boolean goThroughNts, ITypingContext ctx) throws AlgebricksException {
         ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> visitor = new SubstituteVariableVisitor(
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/AbstractTypeEnvironment.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/AbstractTypeEnvironment.java
index c4ff55b..60d0740 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/AbstractTypeEnvironment.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/AbstractTypeEnvironment.java
@@ -42,7 +42,12 @@
 
     @Override
     public Object getType(ILogicalExpression expr) throws AlgebricksException {
-        return expressionTypeComputer.getType(expr, metadataProvider, this);
+        try {
+            return expressionTypeComputer.getType(expr, metadataProvider, this);
+        } catch (Exception e) {
+            throw new AlgebricksException("Could not resolve type for " + expr.toString() + ","
+                    + "please check whether the used variables has been defined!", e);
+        }
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/ITypingContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/ITypingContext.java
index 5f3ee3a..4d85111 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/ITypingContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/typing/ITypingContext.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.typing;
 
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
 import org.apache.hyracks.algebricks.core.algebra.expressions.INullableTypeComputer;
@@ -25,13 +26,18 @@
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 
 public interface ITypingContext {
-    public abstract IVariableTypeEnvironment getOutputTypeEnvironment(ILogicalOperator op);
+    public IVariableTypeEnvironment getOutputTypeEnvironment(ILogicalOperator op);
 
-    public abstract void setOutputTypeEnvironment(ILogicalOperator op, IVariableTypeEnvironment env);
+    public void setOutputTypeEnvironment(ILogicalOperator op, IVariableTypeEnvironment env);
 
-    public abstract IExpressionTypeComputer getExpressionTypeComputer();
+    public IExpressionTypeComputer getExpressionTypeComputer();
 
-    public abstract INullableTypeComputer getNullableTypeComputer();
+    public INullableTypeComputer getNullableTypeComputer();
 
-    public abstract IMetadataProvider<?, ?> getMetadataProvider();
+    public IMetadataProvider<?, ?> getMetadataProvider();
+
+    public void invalidateTypeEnvironmentForOperator(ILogicalOperator op);
+
+    public void computeAndSetTypeEnvironmentForOperator(ILogicalOperator op) throws AlgebricksException;
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 0aa6676..4089268 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -129,8 +129,9 @@
                     AbstractLogicalOperator inputOp = (AbstractLogicalOperator) i.getValue();
                     switch (inputOp.getExecutionMode()) {
                         case PARTITIONED: {
-                            if (forceUnpartitioned)
+                            if (forceUnpartitioned) {
                                 break;
+                            }
                             op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
                             change = true;
                             exit = true;
@@ -216,8 +217,9 @@
     public static ILogicalOperator bottomUpCopyOperators(ILogicalOperator op) throws AlgebricksException {
         ILogicalOperator newOp = deepCopy(op);
         newOp.getInputs().clear();
-        for (Mutable<ILogicalOperator> child : op.getInputs())
+        for (Mutable<ILogicalOperator> child : op.getInputs()) {
             newOp.getInputs().add(new MutableObject<ILogicalOperator>(bottomUpCopyOperators(child.getValue())));
+        }
         return newOp;
     }
 
@@ -241,7 +243,7 @@
      *            optimization context.
      * @throws AlgebricksException
      */
-    public static void computeTypeEnvironmentBottomUp(ILogicalOperator op, IOptimizationContext context)
+    public static void computeTypeEnvironmentBottomUp(ILogicalOperator op, ITypingContext context)
             throws AlgebricksException {
         for (Mutable<ILogicalOperator> children : op.getInputs()) {
             computeTypeEnvironmentBottomUp(children.getValue(), context);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index 39de006..8e69cec 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -23,7 +23,6 @@
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -39,6 +38,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.CardinalityInferenceVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 
 public class OperatorPropertiesUtil {
@@ -301,4 +301,9 @@
         }
         return false;
     }
+
+    public static boolean isCardinalityOne(ILogicalOperator operator) throws AlgebricksException {
+        CardinalityInferenceVisitor visitor = new CardinalityInferenceVisitor();
+        return operator.accept(visitor, null) == 1L;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index eb5e47e..0d601b0 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -40,6 +40,7 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
 
+@SuppressWarnings({ "unchecked", "rawtypes" })
 public class AlgebricksOptimizationContext implements IOptimizationContext {
 
     private int varCounter;
@@ -118,7 +119,6 @@
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public IMetadataProvider getMetadataProvider() {
         return metadataProvider;
     }
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
index 3e44173..bbb01dd 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
@@ -25,7 +25,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -66,7 +65,8 @@
 public class PushAssignBelowUnionAllRule implements IAlgebraicRewriteRule {
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         return false;
     }
 
@@ -127,11 +127,10 @@
 
     private AssignOperator createAssignBelowUnionAllBranch(UnionAllOperator unionOp, int inputIndex,
             AssignOperator originalAssignOp, Set<LogicalVariable> assignUsedVars, IOptimizationContext context)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         AssignOperator newAssignOp = cloneAssignOperator(originalAssignOp, context);
         newAssignOp.getInputs()
                 .add(new MutableObject<ILogicalOperator>(unionOp.getInputs().get(inputIndex).getValue()));
-        context.computeAndSetTypeEnvironmentForOperator(newAssignOp);
         unionOp.getInputs().get(inputIndex).setValue(newAssignOp);
         int numVarMappings = unionOp.getVariableMappings().size();
         for (int i = 0; i < numVarMappings; i++) {
@@ -146,6 +145,7 @@
                 VariableUtilities.substituteVariables(newAssignOp, varMapping.third, replacementVar, context);
             }
         }
+        context.computeAndSetTypeEnvironmentForOperator(newAssignOp);
         return newAssignOp;
     }
 
@@ -159,8 +159,8 @@
         int numVars = assignOp.getVariables().size();
         for (int i = 0; i < numVars; i++) {
             vars.add(context.newVar());
-            exprs.add(new MutableObject<ILogicalExpression>(assignOp.getExpressions().get(i).getValue()
-                    .cloneExpression()));
+            exprs.add(new MutableObject<ILogicalExpression>(
+                    assignOp.getExpressions().get(i).getValue().cloneExpression()));
         }
         AssignOperator assignCloneOp = new AssignOperator(vars, exprs);
         assignCloneOp.setExecutionMode(assignOp.getExecutionMode());
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveCartesianProductWithEmptyBranchRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveCartesianProductWithEmptyBranchRule.java
new file mode 100644
index 0000000..de35fb5
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/RemoveCartesianProductWithEmptyBranchRule.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.rewriter.rules;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Removes Cartesian product operators that have one input branch is EmptyTupleSource.
+ */
+public class RemoveCartesianProductWithEmptyBranchRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator op = opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.INNERJOIN
+                && op.getOperatorTag() != LogicalOperatorTag.LEFTOUTERJOIN) {
+            return false;
+        }
+        AbstractBinaryJoinOperator joinOperator = (AbstractBinaryJoinOperator) op;
+        ILogicalOperator left = joinOperator.getInputs().get(0).getValue();
+        ILogicalOperator right = joinOperator.getInputs().get(1).getValue();
+
+        if (!joinOperator.getCondition().getValue().equals(ConstantExpression.TRUE)) {
+            return false;
+        }
+        if (emptyBranch(left)) {
+            opRef.setValue(right);
+            return true;
+        }
+        if (emptyBranch(right)) {
+            opRef.setValue(left);
+            return true;
+        }
+        return false;
+    }
+
+    private boolean emptyBranch(ILogicalOperator op) {
+        return op.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE;
+    }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanWithInputCardinalityOneRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanWithInputCardinalityOneRule.java
index 784b6cd..b01bb43 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanWithInputCardinalityOneRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/EliminateSubplanWithInputCardinalityOneRule.java
@@ -23,7 +23,6 @@
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -34,6 +33,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -59,7 +59,8 @@
     }
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         if (!invoked) {
             rootRef = opRef;
             invoked = true;
@@ -104,14 +105,14 @@
                 if (rootRefs.size() != 1) {
                     continue;
                 }
-                Set<Mutable<ILogicalOperator>> ntsSet = new ListSet<Mutable<ILogicalOperator>>();
-                findNts(rootRefs.get(0), ntsSet);
 
-                /** Replaces nts with the input operator of the subplan. */
-                for (Mutable<ILogicalOperator> nts : ntsSet) {
-                    nts.setValue(subplanInputOperator);
-                }
-                subplanRef.setValue(rootRefs.get(0).getValue());
+                // Replaces all Nts' in the nested plan with the Subplan input operator or its deep copy.
+                ILogicalOperator topOperator = rootRefs.get(0).getValue();
+                ReplaceNtsWithSubplanInputOperatorVisitor visitor = new ReplaceNtsWithSubplanInputOperatorVisitor(
+                        context, subplan);
+                ILogicalOperator newTopOperator = topOperator.accept(visitor, null);
+                subplanRef.setValue(newTopOperator);
+                OperatorManipulationUtil.computeTypeEnvironmentBottomUp(newTopOperator, context);
                 changed = true;
             } else {
                 continue;
@@ -140,7 +141,7 @@
     }
 
     /**
-     * Recursively adding variables which has cardinality one and in int the input free variable set.
+     * Recursively adding variables which has cardinality one into the input free variable set.
      *
      * @param opRef
      *            , the current operator reference.
@@ -155,22 +156,27 @@
      */
     private void isCardinalityOne(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> freeVars,
             Set<LogicalVariable> varsWithCardinalityOne, Set<LogicalVariable> varsLiveAtUnnestAndJoin)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         AbstractLogicalOperator operator = (AbstractLogicalOperator) opRef.getValue();
-        List<LogicalVariable> producedVars = new ArrayList<LogicalVariable>();
-        VariableUtilities.getProducedVariables(operator, producedVars);
-        if (operator.getOperatorTag() == LogicalOperatorTag.UNNEST
-                || operator.getOperatorTag() == LogicalOperatorTag.INNERJOIN
-                || operator.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
-            VariableUtilities.getLiveVariables(operator, varsLiveAtUnnestAndJoin);
-        }
-        if (operator.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
-            for (LogicalVariable producedVar : producedVars) {
-                if (freeVars.contains(producedVar)) {
-                    varsWithCardinalityOne.add(producedVar);
+        List<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
+        VariableUtilities.getLiveVariables(operator, liveVars);
+
+        if (OperatorPropertiesUtil.isCardinalityOne(operator)) {
+            for (LogicalVariable liveVar : liveVars) {
+                if (freeVars.contains(liveVar)) {
+                    varsWithCardinalityOne.add(liveVar);
                 }
             }
+        } else {
+            // Operators with the following tags could still have have cardinality one,
+            // hence they are in this "else" branch.
+            if (operator.getOperatorTag() == LogicalOperatorTag.UNNEST
+                    || operator.getOperatorTag() == LogicalOperatorTag.INNERJOIN
+                    || operator.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+                VariableUtilities.getLiveVariables(operator, varsLiveAtUnnestAndJoin);
+            }
         }
+
         if (varsWithCardinalityOne.size() == freeVars.size()) {
             return;
         }
@@ -179,25 +185,4 @@
         }
     }
 
-    /**
-     * Find the NestedTupleSource operator in the direct/undirect input operators of opRef.
-     *
-     * @param opRef
-     *            , the current operator reference.
-     * @param ntsSet
-     *            , the set NestedTupleSource operator references.
-     */
-    private void findNts(Mutable<ILogicalOperator> opRef, Set<Mutable<ILogicalOperator>> ntsSet) {
-        int childSize = opRef.getValue().getInputs().size();
-        if (childSize == 0) {
-            AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-            if (op.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
-                ntsSet.add(opRef);
-            }
-            return;
-        }
-        for (Mutable<ILogicalOperator> childRef : opRef.getValue().getInputs()) {
-            findNts(childRef, ntsSet);
-        }
-    }
 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
new file mode 100644
index 0000000..551b5d0
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.rewriter.rules.subplan;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
+
+/**
+ * This visitor replaces NTS' in a subplan with its input operator or its deep copies.
+ * Note that this visitor can only be used in the rule EliminateSubplanWithInputCardinalityOneRule,
+ * for cases where the Subplan input operator is of cardinality one and its variables are not needed after
+ * the Subplan.
+ */
+class ReplaceNtsWithSubplanInputOperatorVisitor implements IQueryOperatorVisitor<ILogicalOperator, Void> {
+    // The optimization context.
+    private final IOptimizationContext ctx;
+
+    // The input operator to the subplan.
+    private final ILogicalOperator subplanInputOperator;
+
+    // The map that maps the input variables to the subplan to their deep-copied variables.
+    private final Map<LogicalVariable, LogicalVariable> varMap = new HashMap<>();
+
+    // Whether the original copy has been used.
+    private boolean isOriginalCopyUsed = false;
+
+    /**
+     * @param context
+     *            the optimization context
+     * @param subplanOperator
+     *            the subplan operator this visitor deals with.
+     * @throws AlgebricksException
+     */
+    public ReplaceNtsWithSubplanInputOperatorVisitor(IOptimizationContext context, ILogicalOperator subplanOperator)
+            throws AlgebricksException {
+        this.ctx = context;
+        this.subplanInputOperator = subplanOperator.getInputs().get(0).getValue();
+    }
+
+    @Override
+    public ILogicalOperator visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitRunningAggregateOperator(RunningAggregateOperator op, Void arg)
+            throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg)
+            throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg)
+            throws AlgebricksException {
+        if (!isOriginalCopyUsed) {
+            isOriginalCopyUsed = true;
+            return subplanInputOperator;
+        }
+        LogicalOperatorDeepCopyWithNewVariablesVisitor visitor = new LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx,
+                ctx);
+        ILogicalOperator copiedSubplanInputOperator = visitor.deepCopy(subplanInputOperator);
+        varMap.putAll(visitor.getInputToOutputVariableMapping());
+        return copiedSubplanInputOperator;
+    }
+
+    @Override
+    public ILogicalOperator visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
+            throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitOuterUnnestOperator(OuterUnnestOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
+    public ILogicalOperator visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    private ILogicalOperator visit(ILogicalOperator op) throws AlgebricksException {
+        List<Map<LogicalVariable, LogicalVariable>> varMapSnapshots = new ArrayList<>();
+        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+            ILogicalOperator newChild = childRef.getValue().accept(this, null);
+            childRef.setValue(newChild);
+            // Replaces variables in op with the mapping obtained from one child.
+            VariableUtilities.substituteVariables(op, varMap, ctx);
+            // Keep the map from current child and move to the next child.
+            varMapSnapshots.add(new HashMap<LogicalVariable, LogicalVariable>(varMap));
+            varMap.clear();
+        }
+
+        // Combine mappings from all children.
+        for (Map<LogicalVariable, LogicalVariable> map : varMapSnapshots) {
+            varMap.putAll(map);
+        }
+
+        // Only propagates necessary mappings to the parent operator.
+        Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+        VariableUtilities.getLiveVariables(op, liveVars);
+        varMap.values().retainAll(liveVars);
+        return op;
+    }
+
+}