Merged hyracks_lsm_tree r2452:r2463.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_length_filter@2464 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 3a82ccd..8fed84f 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;

 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;

+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;

@@ -33,6 +34,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;

@@ -49,13 +51,17 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;

+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;

 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;

 

 public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void> {

@@ -106,8 +112,49 @@
     }

 

     @Override

-    public Void visitExchangeOperator(ExchangeOperator op, Void arg) {

-        // does not use any variable

+    public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {

+        // Used variables depend on the physical operator.

+        if (op.getPhysicalOperator() != null) {

+            IPhysicalOperator physOp = op.getPhysicalOperator();

+            switch (physOp.getOperatorTag()) {

+                case BROADCAST_EXCHANGE:

+                case ONE_TO_ONE_EXCHANGE:

+                case RANDOM_MERGE_EXCHANGE: {

+                    // No variables used.

+                    break;

+                }

+                case HASH_PARTITION_EXCHANGE: {

+                    HashPartitionExchangePOperator concreteOp = (HashPartitionExchangePOperator) physOp;

+                    usedVariables.addAll(concreteOp.getHashFields());

+                    break;

+                }

+                case HASH_PARTITION_MERGE_EXCHANGE: {

+                    HashPartitionMergeExchangePOperator concreteOp = (HashPartitionMergeExchangePOperator) physOp;

+                    usedVariables.addAll(concreteOp.getPartitionFields());

+                    for (OrderColumn orderCol : concreteOp.getOrderColumns()) {

+                        usedVariables.add(orderCol.getColumn());

+                    }

+                    break;

+                }

+                case SORT_MERGE_EXCHANGE: {

+                    SortMergeExchangePOperator concreteOp = (SortMergeExchangePOperator) physOp;

+                    for (OrderColumn orderCol : concreteOp.getSortColumns()) {

+                        usedVariables.add(orderCol.getColumn());

+                    }

+                    break;

+                }

+                case RANGE_PARTITION_EXCHANGE: {

+                    RangePartitionPOperator concreteOp = (RangePartitionPOperator) physOp;

+                    for (OrderColumn partCol : concreteOp.getPartitioningFields()) {

+                        usedVariables.add(partCol.getColumn());

+                    }

+                    break;

+                }

+                default: {

+                    throw new AlgebricksException("Unhandled physical operator tag '" + physOp.getOperatorTag() + "'.");

+                }

+            }

+        }

         return null;

     }

 

diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index 25f22e7..f66f99b 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -61,14 +61,22 @@
             ITypingContext ctx) throws AlgebricksException {

         substituteVariables(op, v1, v2, true, ctx);

     }

-

+    

+    public static void substituteVariablesInDescendantsAndSelf(ILogicalOperator op, LogicalVariable v1,

+            LogicalVariable v2, ITypingContext ctx) throws AlgebricksException {

+        for (Mutable<ILogicalOperator> childOp : op.getInputs()) {

+            substituteVariablesInDescendantsAndSelf(childOp.getValue(), v1, v2, ctx);

+        }

+        substituteVariables(op, v1, v2, true, ctx);

+    }

+    

     public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,

             boolean goThroughNts, ITypingContext ctx) throws AlgebricksException {

         ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> visitor = new SubstituteVariableVisitor(

                 goThroughNts, ctx);

         op.accept(visitor, new Pair<LogicalVariable, LogicalVariable>(v1, v2));

     }

-

+    

     public static <T> boolean varListEqualUnordered(List<T> var, List<T> varArg) {

         Set<T> varSet = new HashSet<T>();

         Set<T> varArgSet = new HashSet<T>();

diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
index f3c9e5a..61d4880 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
@@ -158,5 +158,13 @@
                 comparatorFactories);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
     }
+    
+    public List<LogicalVariable> getPartitionFields() {
+        return partitionFields;
+    }
+    
+    public List<OrderColumn> getOrderColumns() {
+        return orderColumns;
+    }
 
 }
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
index 7e3c935..8875f6c 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
@@ -16,6 +16,7 @@
 
 import java.util.ArrayList;
 import java.util.LinkedList;
+import java.util.List;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
@@ -69,5 +70,9 @@
             ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
         throw new NotImplementedException();
     }
+    
+    public List<OrderColumn> getPartitioningFields() {
+        return partitioningFields;
+    }
 
 }
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java
index d54833e..431fca1 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java
@@ -87,6 +87,7 @@
         projectOp.getInputs().add(new MutableObject<ILogicalOperator>(parentOp));
         opUnion.getInputs().get(branch).setValue(projectOp);
         projectOp.setPhysicalOperator(new StreamProjectPOperator());
+        context.computeAndSetTypeEnvironmentForOperator(projectOp);
         context.computeAndSetTypeEnvironmentForOperator(parentOp);
     }
 
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
new file mode 100644
index 0000000..a057f4f
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Projects away unused variables at the earliest possible point.
+ * Does a full DFS sweep of the plan adding ProjectOperators in the bottom-up pass.
+ * Also, removes projects that have become useless.
+ * TODO: This rule 'recklessly' adds as many projects as possible, but there is no guarantee
+ * that the overall cost of the plan is reduced since project operators also add a cost.
+ */
+public class IntroduceProjectsRule implements IAlgebraicRewriteRule {
+
+    private final Set<LogicalVariable> usedVars = new HashSet<LogicalVariable>();
+    private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+    private final Set<LogicalVariable> producedVars = new HashSet<LogicalVariable>();
+    private final List<LogicalVariable> projectVars = new ArrayList<LogicalVariable>();
+    protected boolean hasRun = false;
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        if (hasRun) {
+            return false;
+        }
+        hasRun = true;
+        return introduceProjects(null, -1, opRef, Collections.<LogicalVariable> emptySet(), context);
+    }
+
+    protected boolean introduceProjects(AbstractLogicalOperator parentOp, int parentInputIndex,
+            Mutable<ILogicalOperator> opRef, Set<LogicalVariable> parentUsedVars, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        boolean modified = false;
+
+        usedVars.clear();
+        VariableUtilities.getUsedVariables(op, usedVars);
+
+        // In the top-down pass, maintain a set of variables that are used in op and all its parents.
+        HashSet<LogicalVariable> parentsUsedVars = new HashSet<LogicalVariable>();
+        parentsUsedVars.addAll(parentUsedVars);
+        parentsUsedVars.addAll(usedVars);
+
+        // Descend into children.        
+        for (int i = 0; i < op.getInputs().size(); i++) {
+            Mutable<ILogicalOperator> inputOpRef = op.getInputs().get(i);
+            if (introduceProjects(op, i, inputOpRef, parentsUsedVars, context)) {
+                modified = true;
+            }
+        }
+
+        if (modified) {
+            context.computeAndSetTypeEnvironmentForOperator(op);
+        }
+        // In the bottom-up pass, determine which live variables are not used by op's parents.
+        // Such variables are be projected away.
+        liveVars.clear();
+        VariableUtilities.getLiveVariables(op, liveVars);
+        producedVars.clear();
+        VariableUtilities.getProducedVariables(op, producedVars);
+        liveVars.removeAll(producedVars);
+
+        projectVars.clear();
+        for (LogicalVariable liveVar : liveVars) {
+            if (parentsUsedVars.contains(liveVar)) {
+                projectVars.add(liveVar);
+            }
+        }
+
+        // Some of the variables that are live at this op are not used above.
+        if (projectVars.size() != liveVars.size()) {
+            // Add a project operator under each of op's qualifying input branches.
+            for (int i = 0; i < op.getInputs().size(); i++) {
+                ILogicalOperator childOp = op.getInputs().get(i).getValue();
+                liveVars.clear();
+                VariableUtilities.getLiveVariables(childOp, liveVars);
+                List<LogicalVariable> vars = new ArrayList<LogicalVariable>();
+                vars.addAll(projectVars);
+                // Only retain those variables that are live in the i-th input branch.
+                vars.retainAll(liveVars);
+                if (vars.size() != liveVars.size()) {
+                    ProjectOperator projectOp = new ProjectOperator(vars);
+                    projectOp.getInputs().add(new MutableObject<ILogicalOperator>(childOp));
+                    op.getInputs().get(i).setValue(projectOp);
+                    context.computeAndSetTypeEnvironmentForOperator(projectOp);
+                    modified = true;
+                }
+            }
+        } else if (op.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+            // Check if the existing project has become useless.
+            liveVars.clear();
+            VariableUtilities.getLiveVariables(op.getInputs().get(0).getValue(), liveVars);
+            ProjectOperator projectOp = (ProjectOperator) op;
+            List<LogicalVariable> projectVars = projectOp.getVariables();
+            if (liveVars.size() == projectVars.size() && liveVars.containsAll(projectVars)) {
+                boolean eliminateProject = true;
+                // For UnionAll the variables must also be in exactly the correct order.
+                if (parentOp.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
+                    eliminateProject = canEliminateProjectBelowUnion((UnionAllOperator) parentOp, projectOp,
+                            parentInputIndex);
+                }
+                if (eliminateProject) {
+                    // The existing project has become useless. Remove it.
+                    parentOp.getInputs().get(parentInputIndex).setValue(op.getInputs().get(0).getValue());
+                }
+            }
+        }
+
+        if (modified) {
+            context.computeAndSetTypeEnvironmentForOperator(op);
+        }
+        return modified;
+    }
+    
+    private boolean canEliminateProjectBelowUnion(UnionAllOperator unionOp, ProjectOperator projectOp,
+            int unionInputIndex) throws AlgebricksException {
+        List<LogicalVariable> orderedLiveVars = new ArrayList<LogicalVariable>();
+        VariableUtilities.getLiveVariables(projectOp.getInputs().get(0).getValue(), orderedLiveVars);
+        int numVars = orderedLiveVars.size();
+        for (int i = 0; i < numVars; i++) {
+            Triple<LogicalVariable, LogicalVariable, LogicalVariable> varTriple = unionOp.getVariableMappings().get(i);
+            if (unionInputIndex == 0) {
+                if (varTriple.first != orderedLiveVars.get(i)) {
+                    return false;
+                }
+            } else {
+                if (varTriple.second != orderedLiveVars.get(i)) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
new file mode 100644
index 0000000..1bcf95a
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
@@ -0,0 +1,149 @@
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Pushes an AssignOperator below a UnionAll operator by creating an new AssignOperator below each of 
+ * the UnionAllOperator's branches with appropriate variable replacements.
+ * This rule can help to enable other rules that are difficult to fire across a UnionAllOperator, 
+ * for example, eliminating common sub-expressions.
+ * 
+ * Example:
+ * 
+ * Before plan:
+ * ...
+ * assign [$$20, $$21] <- [funcA($$3), funcB($$6)]
+ *   union ($$1, $$2, $$3) ($$4, $$5, $$6)
+ *     union_branch_0
+ *       ...
+ *     union_branch_1
+ *       ...
+ *     
+ * After plan:
+ * ...
+ * union ($$1, $$2, $$3) ($$4, $$5, $$6) ($$22, $$24, $$20) ($$23, $$25, $$21)
+ *   assign [$$22, $$23] <- [funcA($$1), funcB($$4)]
+ *     union_branch_0
+ *       ...
+ *   assign [$$24, $$25] <- [funcA($$2), funcB($$5)]
+ *     union_branch_1
+ *       ...
+ */
+public class PushAssignBelowUnionAllRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (!op.hasInputs()) {
+            return false;
+        }
+
+        boolean modified = false;
+        for (int i = 0; i < op.getInputs().size(); i++) {
+            AbstractLogicalOperator childOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+            if (childOp.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+                continue;
+            }
+            AssignOperator assignOp = (AssignOperator) childOp;
+
+            AbstractLogicalOperator childOfChildOp = (AbstractLogicalOperator) assignOp.getInputs().get(0).getValue();
+            if (childOfChildOp.getOperatorTag() != LogicalOperatorTag.UNIONALL) {
+                continue;
+            }
+            UnionAllOperator unionOp = (UnionAllOperator) childOfChildOp;
+
+            Set<LogicalVariable> assignUsedVars = new HashSet<LogicalVariable>();
+            VariableUtilities.getUsedVariables(assignOp, assignUsedVars);
+
+            List<LogicalVariable> assignVars = assignOp.getVariables();
+
+            AssignOperator[] newAssignOps = new AssignOperator[2];
+            for (int j = 0; j < unionOp.getInputs().size(); j++) {
+                newAssignOps[j] = createAssignBelowUnionAllBranch(unionOp, j, assignOp, assignUsedVars, context);
+            }
+            // Add original assign variables to the union variable mappings.
+            for (int j = 0; j < assignVars.size(); j++) {
+                LogicalVariable first = newAssignOps[0].getVariables().get(j);
+                LogicalVariable second = newAssignOps[1].getVariables().get(j);
+                Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping = new Triple<LogicalVariable, LogicalVariable, LogicalVariable>(
+                        first, second, assignVars.get(j));
+                unionOp.getVariableMappings().add(varMapping);
+            }
+            context.computeAndSetTypeEnvironmentForOperator(unionOp);
+
+            // Remove original assign operator.
+            op.getInputs().set(i, assignOp.getInputs().get(0));
+            context.computeAndSetTypeEnvironmentForOperator(op);
+            modified = true;
+        }
+
+        return modified;
+    }
+
+    private AssignOperator createAssignBelowUnionAllBranch(UnionAllOperator unionOp, int inputIndex,
+            AssignOperator originalAssignOp, Set<LogicalVariable> assignUsedVars, IOptimizationContext context)
+            throws AlgebricksException {
+        AssignOperator newAssignOp = cloneAssignOperator(originalAssignOp, context);
+        newAssignOp.getInputs()
+                .add(new MutableObject<ILogicalOperator>(unionOp.getInputs().get(inputIndex).getValue()));
+        context.computeAndSetTypeEnvironmentForOperator(newAssignOp);
+        unionOp.getInputs().get(inputIndex).setValue(newAssignOp);
+        int numVarMappings = unionOp.getVariableMappings().size();
+        for (int i = 0; i < numVarMappings; i++) {
+            Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping = unionOp.getVariableMappings().get(i);
+            if (assignUsedVars.contains(varMapping.third)) {
+                LogicalVariable replacementVar;
+                if (inputIndex == 0) {
+                    replacementVar = varMapping.first;
+                } else {
+                    replacementVar = varMapping.second;
+                }
+                VariableUtilities.substituteVariables(newAssignOp, varMapping.third, replacementVar, context);
+            }
+        }
+        return newAssignOp;
+    }
+
+    /**
+     * Clones the given assign operator changing the returned variables to be new ones.
+     * Also, leaves the inputs of the clone clear.
+     */
+    private AssignOperator cloneAssignOperator(AssignOperator assignOp, IOptimizationContext context) {
+        List<LogicalVariable> vars = new ArrayList<LogicalVariable>();
+        List<Mutable<ILogicalExpression>> exprs = new ArrayList<Mutable<ILogicalExpression>>();
+        int numVars = assignOp.getVariables().size();
+        for (int i = 0; i < numVars; i++) {
+            vars.add(context.newVar());
+            exprs.add(new MutableObject<ILogicalExpression>(assignOp.getExpressions().get(i).getValue()
+                    .cloneExpression()));
+        }
+        AssignOperator assignCloneOp = new AssignOperator(vars, exprs);
+        assignCloneOp.setExecutionMode(assignOp.getExecutionMode());
+        return assignCloneOp;
+    }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
index 54db2c2..ec57be5 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
@@ -24,6 +24,7 @@
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -39,6 +40,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -136,6 +138,11 @@
             if (replaceProjectVars((ProjectOperator) op)) {
                 modified = true;
             }
+        } else if(op.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
+            // Replace redundant variables manually in the UnionAll operator.
+            if (replaceUnionAllVars((UnionAllOperator) op)) {
+                modified = true;
+            }
         } else {
             if (op.acceptExpressionTransform(substVisitor)) {
                 modified = true;
@@ -227,6 +234,24 @@
         return modified;
     }
 
+    private boolean replaceUnionAllVars(UnionAllOperator op) throws AlgebricksException {
+        boolean modified = false;
+        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping : op.getVariableMappings()) {
+            List<LogicalVariable> firstEquivalentVars = equivalentVarsMap.get(varMapping.first);
+            List<LogicalVariable> secondEquivalentVars = equivalentVarsMap.get(varMapping.second);
+            // Replace variables with their representative.
+            if (firstEquivalentVars != null) {
+                varMapping.first = firstEquivalentVars.get(0);
+                modified = true;
+            }
+            if (secondEquivalentVars != null) {
+                varMapping.second = secondEquivalentVars.get(0);
+                modified = true;
+            }
+        }
+        return modified;
+    }
+    
     private class VariableSubstitutionVisitor implements ILogicalExpressionReferenceTransform {
         @Override
         public boolean transform(Mutable<ILogicalExpression> exprRef) {
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
index c53ea0a..e0c2741 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
@@ -23,6 +23,7 @@
 import org.apache.commons.lang3.mutable.Mutable;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -33,10 +34,14 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
+/**
+ * Removes unused variables from Assign, Unnest, Aggregate, and UnionAll operators.
+ */
 public class RemoveUnusedAssignAndAggregateRule implements IAlgebraicRewriteRule {
 
     @Override
@@ -55,7 +60,7 @@
         if (smthToRemove) {
             removeUnusedAssigns(opRef, toRemove, context);
         }
-        return smthToRemove;
+        return !toRemove.isEmpty();
     }
 
     private void removeUnusedAssigns(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> toRemove,
@@ -87,28 +92,59 @@
 
     private int removeFromAssigns(AbstractLogicalOperator op, Set<LogicalVariable> toRemove,
             IOptimizationContext context) throws AlgebricksException {
-        if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
-            AssignOperator assign = (AssignOperator) op;
-            if (removeUnusedVarsAndExprs(toRemove, assign.getVariables(), assign.getExpressions())) {
-                context.computeAndSetTypeEnvironmentForOperator(assign);
+        switch (op.getOperatorTag()) {
+            case ASSIGN: {
+                AssignOperator assign = (AssignOperator) op;
+                if (removeUnusedVarsAndExprs(toRemove, assign.getVariables(), assign.getExpressions())) {
+                    context.computeAndSetTypeEnvironmentForOperator(assign);
+                }
+                return assign.getVariables().size();
             }
-            return assign.getVariables().size();
-        } else if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
-            AggregateOperator agg = (AggregateOperator) op;
-            if (removeUnusedVarsAndExprs(toRemove, agg.getVariables(), agg.getExpressions())) {
-                context.computeAndSetTypeEnvironmentForOperator(agg);
+            case AGGREGATE: {
+                AggregateOperator agg = (AggregateOperator) op;
+                if (removeUnusedVarsAndExprs(toRemove, agg.getVariables(), agg.getExpressions())) {
+                    context.computeAndSetTypeEnvironmentForOperator(agg);
+                }
+                return agg.getVariables().size();
             }
-            return agg.getVariables().size();
-        } else if (op.getOperatorTag() == LogicalOperatorTag.UNNEST) {
-            UnnestOperator uOp = (UnnestOperator) op;
-            LogicalVariable pVar = uOp.getPositionalVariable();
-            if (pVar != null && toRemove.contains(pVar)) {
-                uOp.setPositionalVariable(null);
+            case UNNEST: {
+                UnnestOperator uOp = (UnnestOperator) op;
+                LogicalVariable pVar = uOp.getPositionalVariable();
+                if (pVar != null && toRemove.contains(pVar)) {
+                    uOp.setPositionalVariable(null);
+                }
+                break;
+            }
+            case UNIONALL: {
+                UnionAllOperator unionOp = (UnionAllOperator) op;
+                if (removeUnusedVarsFromUnionAll(unionOp, toRemove)) {
+                    context.computeAndSetTypeEnvironmentForOperator(unionOp);
+                }
+                return unionOp.getVariableMappings().size();
             }
         }
         return -1;
     }
 
+    private boolean removeUnusedVarsFromUnionAll(UnionAllOperator unionOp, Set<LogicalVariable> toRemove) {
+        Iterator<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> iter = unionOp.getVariableMappings()
+                .iterator();
+        boolean modified = false;
+        Set<LogicalVariable> removeFromRemoveSet = new HashSet<LogicalVariable>();
+        while (iter.hasNext()) {
+            Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping = iter.next();
+            if (toRemove.contains(varMapping.third)) {
+                iter.remove();
+                modified = true;
+            }
+            // In any case, make sure we do not removing these variables.
+            removeFromRemoveSet.add(varMapping.first);
+            removeFromRemoveSet.add(varMapping.second);
+        }
+        toRemove.removeAll(removeFromRemoveSet);
+        return modified;
+    }
+
     private boolean removeUnusedVarsAndExprs(Set<LogicalVariable> toRemove, List<LogicalVariable> varList,
             List<Mutable<ILogicalExpression>> exprList) {
         boolean changed = false;
@@ -142,22 +178,41 @@
                 }
             }
         }
-        if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
-            AssignOperator assign = (AssignOperator) op;
-            toRemove.addAll(assign.getVariables());
-        } else if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
-            AggregateOperator agg = (AggregateOperator) op;
-            toRemove.addAll(agg.getVariables());
-        } else if (op.getOperatorTag() == LogicalOperatorTag.UNNEST) {
-            UnnestOperator uOp = (UnnestOperator) op;
-            LogicalVariable pVar = uOp.getPositionalVariable();
-            if (pVar != null) {
-                toRemove.add(pVar);
+        boolean removeUsedVars = true;
+        switch (op.getOperatorTag()) {
+            case ASSIGN: {
+                AssignOperator assign = (AssignOperator) op;
+                toRemove.addAll(assign.getVariables());
+                break;
+            }
+            case AGGREGATE: {
+                AggregateOperator agg = (AggregateOperator) op;
+                toRemove.addAll(agg.getVariables());
+                break;
+            }
+            case UNNEST: {
+                UnnestOperator uOp = (UnnestOperator) op;
+                LogicalVariable pVar = uOp.getPositionalVariable();
+                if (pVar != null) {
+                    toRemove.add(pVar);
+                }
+                break;
+            }
+            case UNIONALL: {
+                UnionAllOperator unionOp = (UnionAllOperator) op;
+                for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping : unionOp
+                        .getVariableMappings()) {
+                    toRemove.add(varMapping.third);
+                }
+                removeUsedVars = false;
+                break;
             }
         }
-        List<LogicalVariable> used = new LinkedList<LogicalVariable>();
-        VariableUtilities.getUsedVariables(op, used);
-        toRemove.removeAll(used);
+        if (removeUsedVars) {
+            List<LogicalVariable> used = new LinkedList<LogicalVariable>();
+            VariableUtilities.getUsedVariables(op, used);
+            toRemove.removeAll(used);
+        }
     }
 
 }
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeOpContext.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeOpContext.java
index 2f4131d..7f32f6d 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeOpContext.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeOpContext.java
@@ -75,15 +75,21 @@
             IBinaryComparatorFactory[] cmpFactories, IModificationOperationCallback modificationCallback,
             ISearchOperationCallback searchCallback) {
         this.accessor = accessor;
-        this.cmp = MultiComparator.create(cmpFactories);
+        
+        if (cmpFactories[0] != null) {
+            this.cmp = MultiComparator.create(cmpFactories);
+        } else {
+            this.cmp = null;
+        }
+        
         this.leafFrameFactory = leafFrameFactory;
         this.leafFrame = (IBTreeLeafFrame) leafFrameFactory.createFrame();
-        if (leafFrame != null) {
+        if (leafFrame != null && this.cmp != null) {
             leafFrame.setMultiComparator(cmp);
         }
         this.interiorFrameFactory = interiorFrameFactory;
         this.interiorFrame = (IBTreeInteriorFrame) interiorFrameFactory.createFrame();
-        if (interiorFrame != null) {
+        if (interiorFrame != null && this.cmp != null) {
             interiorFrame.setMultiComparator(cmp);
         }
         this.metaFrame = metaFrame;
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index 78e0848..04997b5 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -15,6 +15,7 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.btree.impls;
 
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOpContext;
@@ -43,16 +44,21 @@
     public LSMBTreeOpContext(BTree memBTree, ITreeIndexFrameFactory insertLeafFrameFactory,
             ITreeIndexFrameFactory deleteLeafFrameFactory, IModificationOperationCallback modificationCallback,
             ISearchOperationCallback searchCallback) {
-        this.cmp = MultiComparator.create(memBTree.getComparatorFactories());
+        IBinaryComparatorFactory cmpFactories[] = memBTree.getComparatorFactories();
+        if (cmpFactories[0] != null) {
+            this.cmp = MultiComparator.create(memBTree.getComparatorFactories());
+        } else {
+            this.cmp = null;
+        }
         this.memBTree = memBTree;
         this.insertLeafFrameFactory = insertLeafFrameFactory;
         this.deleteLeafFrameFactory = deleteLeafFrameFactory;
         this.insertLeafFrame = (IBTreeLeafFrame) insertLeafFrameFactory.createFrame();
         this.deleteLeafFrame = (IBTreeLeafFrame) deleteLeafFrameFactory.createFrame();
-        if (insertLeafFrame != null) {
+        if (insertLeafFrame != null && this.cmp != null) {
             insertLeafFrame.setMultiComparator(cmp);
         }
-        if (deleteLeafFrame != null) {
+        if (deleteLeafFrame != null && this.cmp != null) {
             deleteLeafFrame.setMultiComparator(cmp);
         }
         this.modificationCallback = modificationCallback;
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
index 135db80..52e9c32 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
@@ -44,7 +44,7 @@
             IIndexDataflowHelperFactory dataflowHelperFactory, boolean retainInput,
             ISearchOperationCallbackFactory searchOpCallbackFactory) {
         super(spec, 1, 1, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, dataflowHelperFactory, null, false, NoOpLocalResourceFactoryProvider.INSTANCE,
+                comparatorFactories, dataflowHelperFactory, null, retainInput, NoOpLocalResourceFactoryProvider.INSTANCE,
                 searchOpCallbackFactory, NoOpOperationCallbackFactory.INSTANCE);
 
         this.keyFields = keyFields;
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
index 6ff55bc..219ab30 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
@@ -53,7 +53,13 @@
     public RTreeOpContext(IRTreeLeafFrame leafFrame, IRTreeInteriorFrame interiorFrame,
             ITreeIndexMetaDataFrame metaFrame, IBinaryComparatorFactory[] cmpFactories, int treeHeightHint,
             IModificationOperationCallback modificationCallback) {
-        this.cmp = MultiComparator.create(cmpFactories);
+        
+        if (cmpFactories[0] != null) { 
+            this.cmp = MultiComparator.create(cmpFactories);
+        } else {
+            this.cmp = null;
+        }
+        
         this.interiorFrame = interiorFrame;
         this.leafFrame = leafFrame;
         this.metaFrame = metaFrame;