Changes in this CL include:
1. fix asterixdb issue 810,
2. allow group-by logical operator to work with multiple nested plans.

Change-Id: I58ad59e7b3e8a9e14c3e14f7655c857a1890da6f
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/178
Reviewed-by: Preston Carman <ecarm002@ucr.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
index a456e71..a5b7074 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
@@ -101,6 +101,14 @@
         return varList;
     }
 
+    public List<LogicalVariable> getProducedGbyVarList() {
+        List<LogicalVariable> varList = new ArrayList<LogicalVariable>(gByList.size());
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : gByList) {
+            varList.add(ve.first);
+        }
+        return varList;
+    }
+
     public static String veListToString(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList) {
         StringBuilder sb = new StringBuilder();
         sb.append("[");
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/NestedTupleSourceOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/NestedTupleSourceOperator.java
index be23f48..f6f62b6 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/NestedTupleSourceOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/NestedTupleSourceOperator.java
@@ -32,7 +32,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
 public class NestedTupleSourceOperator extends AbstractLogicalOperator {
-    private final Mutable<ILogicalOperator> dataSourceReference;
+    private Mutable<ILogicalOperator> dataSourceReference;
 
     public NestedTupleSourceOperator(Mutable<ILogicalOperator> dataSourceReference) {
         this.dataSourceReference = dataSourceReference;
@@ -51,6 +51,10 @@
         return dataSourceReference;
     }
 
+    public void setDataSourceReference(Mutable<ILogicalOperator> dataSourceReference) {
+        this.dataSourceReference = dataSourceReference;
+    }
+
     @Override
     public void recomputeSchema() {
         schema = new ArrayList<LogicalVariable>();
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index be83320..b9b111d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -21,7 +21,6 @@
 import java.util.Map.Entry;
 
 import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
@@ -31,7 +30,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -66,14 +64,14 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
 public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boolean, ILogicalOperator> {
 
-    private Map<LogicalVariable, LogicalVariable> variableMapping = new HashMap<LogicalVariable, LogicalVariable>();
+    private final Map<LogicalVariable, LogicalVariable> variableMapping = new HashMap<LogicalVariable, LogicalVariable>();
 
     public IsomorphismOperatorVisitor() {
     }
@@ -161,7 +159,7 @@
             for (int j = 0; j < roots.size(); j++) {
                 ILogicalOperator topOp1 = roots.get(j).getValue();
                 ILogicalOperator topOp2 = rootsArg.get(j).getValue();
-                isomorphic = this.checkBottomUp(topOp1, topOp2);
+                isomorphic = IsomorphismUtilities.isOperatorIsomorphicPlanSegment(topOp1, topOp2);
                 if (!isomorphic)
                     return Boolean.FALSE;
             }
@@ -306,7 +304,7 @@
             for (int j = 0; j < roots.size(); j++) {
                 ILogicalOperator topOp1 = roots.get(j).getValue();
                 ILogicalOperator topOp2 = rootsArg.get(j).getValue();
-                boolean isomorphic = this.checkBottomUp(topOp1, topOp2);
+                boolean isomorphic = IsomorphismUtilities.isOperatorIsomorphicPlanSegment(topOp1, topOp2);
                 if (!isomorphic)
                     return Boolean.FALSE;
             }
@@ -478,8 +476,7 @@
     }
 
     @Override
-    public Boolean visitTokenizeOperator(TokenizeOperator op, ILogicalOperator arg)
-            throws AlgebricksException {
+    public Boolean visitTokenizeOperator(TokenizeOperator op, ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
         if (aop.getOperatorTag() != LogicalOperatorTag.TOKENIZE)
             return Boolean.FALSE;
@@ -507,6 +504,12 @@
         return Boolean.TRUE;
     }
 
+    @Override
+    public Boolean visitExternalDataLookupOperator(ExternalDataLookupOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        return Boolean.FALSE;
+    }
+
     private Boolean compareIOrderAndExpressions(List<Pair<IOrder, Mutable<ILogicalExpression>>> opOrderExprs,
             List<Pair<IOrder, Mutable<ILogicalExpression>>> argOrderExprs) {
         if (opOrderExprs.size() != argOrderExprs.size())
@@ -522,24 +525,9 @@
         return Boolean.TRUE;
     }
 
-    private Boolean checkBottomUp(ILogicalOperator op1, ILogicalOperator op2) throws AlgebricksException {
-        List<Mutable<ILogicalOperator>> inputs1 = op1.getInputs();
-        List<Mutable<ILogicalOperator>> inputs2 = op2.getInputs();
-        if (inputs1.size() != inputs2.size())
-            return Boolean.FALSE;
-        for (int i = 0; i < inputs1.size(); i++) {
-            ILogicalOperator input1 = inputs1.get(i).getValue();
-            ILogicalOperator input2 = inputs2.get(i).getValue();
-            boolean isomorphic = checkBottomUp(input1, input2);
-            if (!isomorphic)
-                return Boolean.FALSE;
-        }
-        return IsomorphismUtilities.isOperatorIsomorphic(op1, op2);
-    }
-
     private ILogicalOperator copyAndSubstituteVar(ILogicalOperator op, ILogicalOperator argOp)
             throws AlgebricksException {
-        ILogicalOperator newOp = IsomorphismOperatorVisitor.deepCopy(argOp);
+        ILogicalOperator newOp = OperatorManipulationUtil.deepCopy(argOp);
         variableMapping.clear();
         IsomorphismUtilities.mapVariablesTopDown(op, argOp, variableMapping);
 
@@ -575,27 +563,6 @@
         return list;
     }
 
-    private static ILogicalOperator deepCopy(ILogicalOperator op) throws AlgebricksException {
-        OperatorDeepCopyVisitor visitor = new OperatorDeepCopyVisitor();
-        return op.accept(visitor, null);
-    }
-
-    private static ILogicalPlan deepCopy(ILogicalPlan plan) throws AlgebricksException {
-        List<Mutable<ILogicalOperator>> roots = plan.getRoots();
-        List<Mutable<ILogicalOperator>> newRoots = new ArrayList<Mutable<ILogicalOperator>>();
-        for (Mutable<ILogicalOperator> opRef : roots)
-            newRoots.add(new MutableObject<ILogicalOperator>(bottomUpCopyOperators(opRef.getValue())));
-        return new ALogicalPlanImpl(newRoots);
-    }
-
-    private static ILogicalOperator bottomUpCopyOperators(ILogicalOperator op) throws AlgebricksException {
-        ILogicalOperator newOp = deepCopy(op);
-        newOp.getInputs().clear();
-        for (Mutable<ILogicalOperator> child : op.getInputs())
-            newOp.getInputs().add(new MutableObject<ILogicalOperator>(bottomUpCopyOperators(child.getValue())));
-        return newOp;
-    }
-
     private static boolean variableEqual(LogicalVariable var, LogicalVariable varArg) {
         if (var == null && varArg == null)
             return true;
@@ -605,316 +572,4 @@
             return false;
     }
 
-    private static class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogicalOperator, Void> {
-
-        @Override
-        public ILogicalOperator visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
-            ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
-            ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            newList.addAll(op.getVariables());
-            deepCopyExpressionRefs(newExpressions, op.getExpressions());
-            return new AggregateOperator(newList, newExpressions);
-        }
-
-        @Override
-        public ILogicalOperator visitRunningAggregateOperator(RunningAggregateOperator op, Void arg)
-                throws AlgebricksException {
-            ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
-            ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            newList.addAll(op.getVariables());
-            deepCopyExpressionRefs(newExpressions, op.getExpressions());
-            return new RunningAggregateOperator(newList, newExpressions);
-        }
-
-        @Override
-        public ILogicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg)
-                throws AlgebricksException {
-            return new EmptyTupleSourceOperator();
-        }
-
-        @Override
-        public ILogicalOperator visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
-            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
-            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decoList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
-            ArrayList<ILogicalPlan> newSubplans = new ArrayList<ILogicalPlan>();
-            for (Pair<LogicalVariable, Mutable<ILogicalExpression>> pair : op.getGroupByList())
-                groupByList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(pair.first,
-                        deepCopyExpressionRef(pair.second)));
-            for (Pair<LogicalVariable, Mutable<ILogicalExpression>> pair : op.getDecorList())
-                decoList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(pair.first,
-                        deepCopyExpressionRef(pair.second)));
-            for (ILogicalPlan plan : op.getNestedPlans()) {
-                newSubplans.add(IsomorphismOperatorVisitor.deepCopy(plan));
-            }
-            return new GroupByOperator(groupByList, decoList, newSubplans);
-        }
-
-        @Override
-        public ILogicalOperator visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
-            return new LimitOperator(deepCopyExpressionRef(op.getMaxObjects()).getValue(), deepCopyExpressionRef(
-                    op.getOffset()).getValue(), op.isTopmostLimitOp());
-        }
-
-        @Override
-        public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
-            return new InnerJoinOperator(deepCopyExpressionRef(op.getCondition()), op.getInputs().get(0), op
-                    .getInputs().get(1));
-        }
-
-        @Override
-        public ILogicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg)
-                throws AlgebricksException {
-            return new LeftOuterJoinOperator(deepCopyExpressionRef(op.getCondition()), op.getInputs().get(0), op
-                    .getInputs().get(1));
-        }
-
-        @Override
-        public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg)
-                throws AlgebricksException {
-            return new NestedTupleSourceOperator(null);
-        }
-
-        @Override
-        public ILogicalOperator visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
-            return new OrderOperator(this.deepCopyOrderAndExpression(op.getOrderExpressions()));
-        }
-
-        @Override
-        public ILogicalOperator visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
-            ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
-            ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            newList.addAll(op.getVariables());
-            deepCopyExpressionRefs(newExpressions, op.getExpressions());
-            return new AssignOperator(newList, newExpressions);
-        }
-
-        @Override
-        public ILogicalOperator visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
-            return new SelectOperator(deepCopyExpressionRef(op.getCondition()), op.getRetainNull(),
-                    op.getNullPlaceholderVariable());
-        }
-
-        @Override
-        public ILogicalOperator visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
-            ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
-            newList.addAll(op.getVariables());
-            return new ProjectOperator(newList);
-        }
-
-        @Override
-        public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
-                throws AlgebricksException {
-            ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            deepCopyExpressionRefs(newExpressions, op.getExpressions());
-            return new PartitioningSplitOperator(newExpressions, op.getDefaultBranchIndex());
-        }
-
-        @Override
-        public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
-            return new ReplicateOperator(op.getOutputArity());
-        }
-
-        @Override
-        public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
-            return new MaterializeOperator();
-        }
-
-        @Override
-        public ILogicalOperator visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
-            ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
-            ArrayList<LogicalVariable> newOutputList = new ArrayList<LogicalVariable>();
-            newInputList.addAll(op.getInputVariables());
-            newOutputList.addAll(op.getOutputVariables());
-            return new ScriptOperator(op.getScriptDescription(), newInputList, newOutputList);
-        }
-
-        @Override
-        public ILogicalOperator visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
-            ArrayList<ILogicalPlan> newSubplans = new ArrayList<ILogicalPlan>();
-            for (ILogicalPlan plan : op.getNestedPlans()) {
-                newSubplans.add(IsomorphismOperatorVisitor.deepCopy(plan));
-            }
-            return new SubplanOperator(newSubplans);
-        }
-
-        @Override
-        public ILogicalOperator visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
-            List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> newVarMap = new ArrayList<Triple<LogicalVariable, LogicalVariable, LogicalVariable>>();
-            List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap = op.getVariableMappings();
-            for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple : varMap)
-                newVarMap.add(new Triple<LogicalVariable, LogicalVariable, LogicalVariable>(triple.first,
-                        triple.second, triple.third));
-            return new UnionAllOperator(newVarMap);
-        }
-
-        @Override
-        public ILogicalOperator visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
-            return new UnnestOperator(op.getVariable(), deepCopyExpressionRef(op.getExpressionRef()),
-                    op.getPositionalVariable(), op.getPositionalVariableType(), op.getPositionWriter());
-        }
-
-        @Override
-        public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
-            ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
-            newInputList.addAll(op.getVariables());
-            return new UnnestMapOperator(newInputList, deepCopyExpressionRef(op.getExpressionRef()),
-                    new ArrayList<Object>(op.getVariableTypes()), op.propagatesInput());
-        }
-
-        @Override
-        public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
-            ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
-            newInputList.addAll(op.getVariables());
-            return new DataSourceScanOperator(newInputList, op.getDataSource());
-        }
-
-        @Override
-        public ILogicalOperator visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
-            ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            deepCopyExpressionRefs(newExpressions, op.getExpressions());
-            return new DistinctOperator(newExpressions);
-        }
-
-        @Override
-        public ILogicalOperator visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
-            return new ExchangeOperator();
-        }
-
-        @Override
-        public ILogicalOperator visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
-            ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            deepCopyExpressionRefs(newExpressions, op.getExpressions());
-            return new WriteOperator(newExpressions, op.getDataSink());
-        }
-
-        @Override
-        public ILogicalOperator visitDistributeResultOperator(DistributeResultOperator op, Void arg)
-                throws AlgebricksException {
-            ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            deepCopyExpressionRefs(newExpressions, op.getExpressions());
-            return new DistributeResultOperator(newExpressions, op.getDataSink());
-        }
-
-        @Override
-        public ILogicalOperator visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
-            ArrayList<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            deepCopyExpressionRefs(newKeyExpressions, op.getKeyExpressions());
-            List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            deepCopyExpressionRefs(newKeyExpressions, op.getAdditionalFilteringExpressions());
-            WriteResultOperator writeResultOp = new WriteResultOperator(op.getDataSource(),
-                    deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions);
-            writeResultOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
-            return writeResultOp;
-        }
-
-        @Override
-        public ILogicalOperator visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) throws AlgebricksException {
-            List<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            deepCopyExpressionRefs(newKeyExpressions, op.getPrimaryKeyExpressions());
-            List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            deepCopyExpressionRefs(newKeyExpressions, op.getAdditionalFilteringExpressions());
-            InsertDeleteOperator insertDeleteOp = new InsertDeleteOperator(op.getDataSource(),
-                    deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions, op.getOperation(), op.isBulkload());
-            insertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
-            return insertDeleteOp;
-        }
-
-        @Override
-        public ILogicalOperator visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg)
-                throws AlgebricksException {
-            List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
-            List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            deepCopyExpressionRefs(newSecondaryKeyExpressions, op.getSecondaryKeyExpressions());
-            Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(
-                    ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
-            List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            deepCopyExpressionRefs(newLSMComponentFilterExpressions, op.getAdditionalFilteringExpressions());
-            IndexInsertDeleteOperator indexInsertDeleteOp = new IndexInsertDeleteOperator(op.getDataSourceIndex(),
-                    newPrimaryKeyExpressions, newSecondaryKeyExpressions, newFilterExpression, op.getOperation(), op.isBulkload());
-            indexInsertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
-            return indexInsertDeleteOp;
-                }
-
-        @Override
-        public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Void arg)
-                throws AlgebricksException {
-            List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
-            List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
-            deepCopyExpressionRefs(newSecondaryKeyExpressions, op.getSecondaryKeyExpressions());
-            List<LogicalVariable> newTokenizeVars = new ArrayList<LogicalVariable>();
-            deepCopyVars(newTokenizeVars, op.getTokenizeVars());
-            Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(
-                    ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
-            List<Object> newTokenizeVarTypes = new ArrayList<Object>();
-            deepCopyObjects(newTokenizeVarTypes, op.getTokenizeVarTypes());
-
-            TokenizeOperator tokenizeOp = new TokenizeOperator(op.getDataSourceIndex(),
-                    newPrimaryKeyExpressions, newSecondaryKeyExpressions,
-                    newTokenizeVars, newFilterExpression, op.getOperation(),
-                    op.isBulkload(), op.isPartitioned(), newTokenizeVarTypes);
-            return tokenizeOp;
-        }
-
-
-        @Override
-        public ILogicalOperator visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
-            return new SinkOperator();
-        }
-
-        private void deepCopyExpressionRefs(List<Mutable<ILogicalExpression>> newExprs,
-                List<Mutable<ILogicalExpression>> oldExprs) {
-            for (Mutable<ILogicalExpression> oldExpr : oldExprs)
-                newExprs.add(new MutableObject<ILogicalExpression>(((AbstractLogicalExpression) oldExpr.getValue())
-                        .cloneExpression()));
-        }
-
-        private Mutable<ILogicalExpression> deepCopyExpressionRef(Mutable<ILogicalExpression> oldExpr) {
-            return new MutableObject<ILogicalExpression>(
-                    ((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression());
-        }
-
-        private List<LogicalVariable> deepCopyVars(List<LogicalVariable> newVars, List<LogicalVariable> oldVars) {
-            for (LogicalVariable oldVar : oldVars)
-                newVars.add(oldVar);
-            return newVars;
-        }
-
-        private List<Object> deepCopyObjects(List<Object> newObjs, List<Object> oldObjs) {
-            for (Object oldObj : oldObjs)
-                newObjs.add(oldObj);
-            return newObjs;
-        }
-
-        private List<Pair<IOrder, Mutable<ILogicalExpression>>> deepCopyOrderAndExpression(
-                List<Pair<IOrder, Mutable<ILogicalExpression>>> ordersAndExprs) {
-            List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrdersAndExprs = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
-            for (Pair<IOrder, Mutable<ILogicalExpression>> pair : ordersAndExprs)
-                newOrdersAndExprs.add(new Pair<IOrder, Mutable<ILogicalExpression>>(pair.first,
-                        deepCopyExpressionRef(pair.second)));
-            return newOrdersAndExprs;
-        }
-
-        @Override
-        public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
-            return new ExtensionOperator(op.getNewInstanceOfDelegateOperator());
-        }
-
-        @Override
-        public ILogicalOperator visitExternalDataLookupOperator(ExternalDataLookupOperator op, Void arg)
-                throws AlgebricksException {
-            ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
-            newInputList.addAll(op.getVariables());
-            return new ExternalDataLookupOperator(newInputList, deepCopyExpressionRef(op.getExpressionRef()),
-                    new ArrayList<Object>(op.getVariableTypes()), op.isPropagateInput());
-        }
-    }
-
-    @Override
-    public Boolean visitExternalDataLookupOperator(ExternalDataLookupOperator op, ILogicalOperator arg)
-            throws AlgebricksException {
-        return Boolean.FALSE;
-    }
-
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
index 575e062..d79853d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
@@ -14,10 +14,14 @@
  */
 package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;
 
+import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang3.mutable.Mutable;
+
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 
 public class IsomorphismUtilities {
@@ -33,4 +37,34 @@
         return op.accept(visitor, arg).booleanValue();
     }
 
+    public static boolean isOperatorIsomorphicPlanSegment(ILogicalOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        List<Mutable<ILogicalOperator>> inputs1 = op.getInputs();
+        List<Mutable<ILogicalOperator>> inputs2 = arg.getInputs();
+        if (inputs1.size() != inputs2.size())
+            return Boolean.FALSE;
+        for (int i = 0; i < inputs1.size(); i++) {
+            ILogicalOperator input1 = inputs1.get(i).getValue();
+            ILogicalOperator input2 = inputs2.get(i).getValue();
+            boolean isomorphic = isOperatorIsomorphicPlanSegment(input1, input2);
+            if (!isomorphic)
+                return Boolean.FALSE;
+        }
+        return IsomorphismUtilities.isOperatorIsomorphic(op, arg);
+    }
+
+    public static boolean isOperatorIsomorphicPlan(ILogicalPlan plan, ILogicalPlan arg) throws AlgebricksException {
+        if (plan.getRoots().size() != arg.getRoots().size()) {
+            return false;
+        }
+        for (int i = 0; i < plan.getRoots().size(); i++) {
+            ILogicalOperator topOp1 = plan.getRoots().get(i).getValue();
+            ILogicalOperator topOp2 = arg.getRoots().get(i).getValue();
+            if (!IsomorphismUtilities.isOperatorIsomorphicPlanSegment(topOp1, topOp2)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
new file mode 100644
index 0000000..ebb195f
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -0,0 +1,368 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogicalOperator, Void> {
+
+    @Override
+    public ILogicalOperator visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+        ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
+        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        newList.addAll(op.getVariables());
+        deepCopyExpressionRefs(newExpressions, op.getExpressions());
+        return new AggregateOperator(newList, newExpressions);
+    }
+
+    @Override
+    public ILogicalOperator visitRunningAggregateOperator(RunningAggregateOperator op, Void arg)
+            throws AlgebricksException {
+        ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
+        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        newList.addAll(op.getVariables());
+        deepCopyExpressionRefs(newExpressions, op.getExpressions());
+        return new RunningAggregateOperator(newList, newExpressions);
+    }
+
+    @Override
+    public ILogicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg)
+            throws AlgebricksException {
+        return new EmptyTupleSourceOperator();
+    }
+
+    @Override
+    public ILogicalOperator visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decoList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+        ArrayList<ILogicalPlan> newSubplans = new ArrayList<ILogicalPlan>();
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> pair : op.getGroupByList())
+            groupByList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(pair.first,
+                    deepCopyExpressionRef(pair.second)));
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> pair : op.getDecorList())
+            decoList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(pair.first,
+                    deepCopyExpressionRef(pair.second)));
+        for (ILogicalPlan plan : op.getNestedPlans()) {
+            newSubplans.add(OperatorManipulationUtil.deepCopy(plan));
+        }
+        return new GroupByOperator(groupByList, decoList, newSubplans);
+    }
+
+    @Override
+    public ILogicalOperator visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+        return new LimitOperator(deepCopyExpressionRef(op.getMaxObjects()).getValue(), deepCopyExpressionRef(
+                op.getOffset()).getValue(), op.isTopmostLimitOp());
+    }
+
+    @Override
+    public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+        return new InnerJoinOperator(deepCopyExpressionRef(op.getCondition()), op.getInputs().get(0), op.getInputs()
+                .get(1));
+    }
+
+    @Override
+    public ILogicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+        return new LeftOuterJoinOperator(deepCopyExpressionRef(op.getCondition()), op.getInputs().get(0), op
+                .getInputs().get(1));
+    }
+
+    @Override
+    public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg)
+            throws AlgebricksException {
+        return new NestedTupleSourceOperator(null);
+    }
+
+    @Override
+    public ILogicalOperator visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+        return new OrderOperator(this.deepCopyOrderAndExpression(op.getOrderExpressions()));
+    }
+
+    @Override
+    public ILogicalOperator visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+        ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
+        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        newList.addAll(op.getVariables());
+        deepCopyExpressionRefs(newExpressions, op.getExpressions());
+        return new AssignOperator(newList, newExpressions);
+    }
+
+    @Override
+    public ILogicalOperator visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+        return new SelectOperator(deepCopyExpressionRef(op.getCondition()), op.getRetainNull(),
+                op.getNullPlaceholderVariable());
+    }
+
+    @Override
+    public ILogicalOperator visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+        ArrayList<LogicalVariable> newList = new ArrayList<LogicalVariable>();
+        newList.addAll(op.getVariables());
+        return new ProjectOperator(newList);
+    }
+
+    @Override
+    public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
+            throws AlgebricksException {
+        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        deepCopyExpressionRefs(newExpressions, op.getExpressions());
+        return new PartitioningSplitOperator(newExpressions, op.getDefaultBranchIndex());
+    }
+
+    @Override
+    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+        return new ReplicateOperator(op.getOutputArity());
+    }
+
+    @Override
+    public ILogicalOperator visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+        ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
+        ArrayList<LogicalVariable> newOutputList = new ArrayList<LogicalVariable>();
+        newInputList.addAll(op.getInputVariables());
+        newOutputList.addAll(op.getOutputVariables());
+        return new ScriptOperator(op.getScriptDescription(), newInputList, newOutputList);
+    }
+
+    @Override
+    public ILogicalOperator visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+        ArrayList<ILogicalPlan> newSubplans = new ArrayList<ILogicalPlan>();
+        for (ILogicalPlan plan : op.getNestedPlans()) {
+            newSubplans.add(OperatorManipulationUtil.deepCopy(plan));
+        }
+        return new SubplanOperator(newSubplans);
+    }
+
+    @Override
+    public ILogicalOperator visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> newVarMap = new ArrayList<Triple<LogicalVariable, LogicalVariable, LogicalVariable>>();
+        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap = op.getVariableMappings();
+        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple : varMap)
+            newVarMap.add(new Triple<LogicalVariable, LogicalVariable, LogicalVariable>(triple.first, triple.second,
+                    triple.third));
+        return new UnionAllOperator(newVarMap);
+    }
+
+    @Override
+    public ILogicalOperator visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+        return new UnnestOperator(op.getVariable(), deepCopyExpressionRef(op.getExpressionRef()),
+                op.getPositionalVariable(), op.getPositionalVariableType(), op.getPositionWriter());
+    }
+
+    @Override
+    public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+        ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
+        newInputList.addAll(op.getVariables());
+        return new UnnestMapOperator(newInputList, deepCopyExpressionRef(op.getExpressionRef()), new ArrayList<Object>(
+                op.getVariableTypes()), op.propagatesInput());
+    }
+
+    @Override
+    public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+        ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
+        newInputList.addAll(op.getVariables());
+        return new DataSourceScanOperator(newInputList, op.getDataSource());
+    }
+
+    @Override
+    public ILogicalOperator visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        deepCopyExpressionRefs(newExpressions, op.getExpressions());
+        return new DistinctOperator(newExpressions);
+    }
+
+    @Override
+    public ILogicalOperator visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+        return new ExchangeOperator();
+    }
+
+    @Override
+    public ILogicalOperator visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
+        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        deepCopyExpressionRefs(newExpressions, op.getExpressions());
+        return new WriteOperator(newExpressions, op.getDataSink());
+    }
+
+    @Override
+    public ILogicalOperator visitDistributeResultOperator(DistributeResultOperator op, Void arg)
+            throws AlgebricksException {
+        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        deepCopyExpressionRefs(newExpressions, op.getExpressions());
+        return new DistributeResultOperator(newExpressions, op.getDataSink());
+    }
+
+    @Override
+    public ILogicalOperator visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
+        ArrayList<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        deepCopyExpressionRefs(newKeyExpressions, op.getKeyExpressions());
+        List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        deepCopyExpressionRefs(newKeyExpressions, op.getAdditionalFilteringExpressions());
+        WriteResultOperator writeResultOp = new WriteResultOperator(op.getDataSource(),
+                deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions);
+        writeResultOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
+        return writeResultOp;
+    }
+
+    @Override
+    public ILogicalOperator visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) throws AlgebricksException {
+        List<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        deepCopyExpressionRefs(newKeyExpressions, op.getPrimaryKeyExpressions());
+        List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        deepCopyExpressionRefs(newKeyExpressions, op.getAdditionalFilteringExpressions());
+        InsertDeleteOperator insertDeleteOp = new InsertDeleteOperator(op.getDataSource(),
+                deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions, op.getOperation(), op.isBulkload());
+        insertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
+        return insertDeleteOp;
+    }
+
+    @Override
+    public ILogicalOperator visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg)
+            throws AlgebricksException {
+        List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
+        List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        deepCopyExpressionRefs(newSecondaryKeyExpressions, op.getSecondaryKeyExpressions());
+        Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(
+                ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
+        List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        deepCopyExpressionRefs(newLSMComponentFilterExpressions, op.getAdditionalFilteringExpressions());
+        IndexInsertDeleteOperator indexInsertDeleteOp = new IndexInsertDeleteOperator(op.getDataSourceIndex(),
+                newPrimaryKeyExpressions, newSecondaryKeyExpressions, newFilterExpression, op.getOperation(),
+                op.isBulkload());
+        indexInsertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
+        return indexInsertDeleteOp;
+    }
+
+    @Override
+    public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+        List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
+        List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+        deepCopyExpressionRefs(newSecondaryKeyExpressions, op.getSecondaryKeyExpressions());
+        List<LogicalVariable> newTokenizeVars = new ArrayList<LogicalVariable>();
+        deepCopyVars(newTokenizeVars, op.getTokenizeVars());
+        Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(
+                ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
+        List<Object> newTokenizeVarTypes = new ArrayList<Object>();
+        deepCopyObjects(newTokenizeVarTypes, op.getTokenizeVarTypes());
+
+        TokenizeOperator tokenizeOp = new TokenizeOperator(op.getDataSourceIndex(), newPrimaryKeyExpressions,
+                newSecondaryKeyExpressions, newTokenizeVars, newFilterExpression, op.getOperation(), op.isBulkload(),
+                op.isPartitioned(), newTokenizeVarTypes);
+        return tokenizeOp;
+    }
+
+    @Override
+    public ILogicalOperator visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
+        return new SinkOperator();
+    }
+
+    private void deepCopyExpressionRefs(List<Mutable<ILogicalExpression>> newExprs,
+            List<Mutable<ILogicalExpression>> oldExprs) {
+        for (Mutable<ILogicalExpression> oldExpr : oldExprs)
+            newExprs.add(new MutableObject<ILogicalExpression>(((AbstractLogicalExpression) oldExpr.getValue())
+                    .cloneExpression()));
+    }
+
+    private Mutable<ILogicalExpression> deepCopyExpressionRef(Mutable<ILogicalExpression> oldExpr) {
+        return new MutableObject<ILogicalExpression>(((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression());
+    }
+
+    private List<LogicalVariable> deepCopyVars(List<LogicalVariable> newVars, List<LogicalVariable> oldVars) {
+        for (LogicalVariable oldVar : oldVars)
+            newVars.add(oldVar);
+        return newVars;
+    }
+
+    private List<Object> deepCopyObjects(List<Object> newObjs, List<Object> oldObjs) {
+        for (Object oldObj : oldObjs)
+            newObjs.add(oldObj);
+        return newObjs;
+    }
+
+    private List<Pair<IOrder, Mutable<ILogicalExpression>>> deepCopyOrderAndExpression(
+            List<Pair<IOrder, Mutable<ILogicalExpression>>> ordersAndExprs) {
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrdersAndExprs = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
+        for (Pair<IOrder, Mutable<ILogicalExpression>> pair : ordersAndExprs)
+            newOrdersAndExprs.add(new Pair<IOrder, Mutable<ILogicalExpression>>(pair.first,
+                    deepCopyExpressionRef(pair.second)));
+        return newOrdersAndExprs;
+    }
+
+    @Override
+    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+        return new ExtensionOperator(op.getNewInstanceOfDelegateOperator());
+    }
+
+    @Override
+    public ILogicalOperator visitExternalDataLookupOperator(ExternalDataLookupOperator op, Void arg)
+            throws AlgebricksException {
+        ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
+        newInputList.addAll(op.getVariables());
+        return new ExternalDataLookupOperator(newInputList, deepCopyExpressionRef(op.getExpressionRef()),
+                new ArrayList<Object>(op.getVariableTypes()), op.isPropagateInput());
+    }
+
+    @Override
+    public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+        return new MaterializeOperator();
+    }
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index fc926d2..adb7a8f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.algebricks.core.algebra.util;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -32,7 +33,9 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.OperatorDeepCopyVisitor;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
 
 public class OperatorManipulationUtil {
@@ -170,4 +173,53 @@
         }
     }
 
+    public static ILogicalPlan deepCopy(ILogicalPlan plan) throws AlgebricksException {
+        List<Mutable<ILogicalOperator>> roots = plan.getRoots();
+        List<Mutable<ILogicalOperator>> newRoots = clonePipeline(roots);
+        return new ALogicalPlanImpl(newRoots);
+    }
+
+    public static ILogicalPlan deepCopy(ILogicalPlan plan, IOptimizationContext ctx) throws AlgebricksException {
+        List<Mutable<ILogicalOperator>> roots = plan.getRoots();
+        List<Mutable<ILogicalOperator>> newRoots = clonePipeline(roots);
+        cloneTypeEnvironments(ctx, roots, newRoots);
+        return new ALogicalPlanImpl(newRoots);
+    }
+
+    private static List<Mutable<ILogicalOperator>> clonePipeline(List<Mutable<ILogicalOperator>> roots)
+            throws AlgebricksException {
+        List<Mutable<ILogicalOperator>> newRoots = new ArrayList<Mutable<ILogicalOperator>>();
+        for (Mutable<ILogicalOperator> opRef : roots) {
+            newRoots.add(new MutableObject<ILogicalOperator>(bottomUpCopyOperators(opRef.getValue())));
+        }
+        return newRoots;
+    }
+
+    private static void cloneTypeEnvironments(IOptimizationContext ctx, List<Mutable<ILogicalOperator>> roots,
+            List<Mutable<ILogicalOperator>> newRoots) {
+        for (int i = 0; i < newRoots.size(); i++) {
+            Mutable<ILogicalOperator> opRef = newRoots.get(i);
+            Mutable<ILogicalOperator> oldOpRef = roots.get(i);
+            while (opRef.getValue().getInputs().size() > 0) {
+                ctx.setOutputTypeEnvironment(opRef.getValue(), ctx.getOutputTypeEnvironment(oldOpRef.getValue()));
+                opRef = opRef.getValue().getInputs().get(0);
+                oldOpRef = oldOpRef.getValue().getInputs().get(0);
+            }
+            ctx.setOutputTypeEnvironment(opRef.getValue(), ctx.getOutputTypeEnvironment(oldOpRef.getValue()));
+        }
+    }
+
+    public static ILogicalOperator bottomUpCopyOperators(ILogicalOperator op) throws AlgebricksException {
+        ILogicalOperator newOp = deepCopy(op);
+        newOp.getInputs().clear();
+        for (Mutable<ILogicalOperator> child : op.getInputs())
+            newOp.getInputs().add(new MutableObject<ILogicalOperator>(bottomUpCopyOperators(child.getValue())));
+        return newOp;
+    }
+
+    public static ILogicalOperator deepCopy(ILogicalOperator op) throws AlgebricksException {
+        OperatorDeepCopyVisitor visitor = new OperatorDeepCopyVisitor();
+        return op.accept(visitor, null);
+    }
+
 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
index c8a76fb..67b2d0c 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
@@ -18,6 +18,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -48,9 +49,8 @@
     /**
      * Replace the original aggregate functions with their corresponding global aggregate function.
      */
-    public void replaceOriginalAggFuncs(Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap) {
-        for (Map.Entry<AggregateFunctionCallExpression, SimilarAggregatesInfo> entry : toReplaceMap.entrySet()) {
-            SimilarAggregatesInfo sai = entry.getValue();
+    protected void replaceOriginalAggFuncs(Set<SimilarAggregatesInfo> toReplaceSet) {
+        for (SimilarAggregatesInfo sai : toReplaceSet) {
             for (AggregateExprInfo aei : sai.simAggs) {
                 AbstractFunctionCallExpression afce = (AbstractFunctionCallExpression) aei.aggExprRef.getValue();
                 afce.setFunctionInfo(aei.newFunInfo);
@@ -61,8 +61,8 @@
     }
 
     protected Pair<Boolean, Mutable<ILogicalOperator>> tryToPushAgg(AggregateOperator initAgg,
-            GroupByOperator newGbyOp, Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap,
-            IOptimizationContext context) throws AlgebricksException {
+            GroupByOperator newGbyOp, Set<SimilarAggregatesInfo> toReplaceSet, IOptimizationContext context)
+            throws AlgebricksException {
 
         ArrayList<LogicalVariable> pushedVars = new ArrayList<LogicalVariable>();
         ArrayList<Mutable<ILogicalExpression>> pushedExprs = new ArrayList<Mutable<ILogicalExpression>>();
@@ -91,17 +91,15 @@
                 newArgs.add(new MutableObject<ILogicalExpression>(er.getValue().cloneExpression()));
             }
             IFunctionInfo fi2 = aggFun.getStepTwoAggregate();
-            SimilarAggregatesInfo inf = toReplaceMap.get(aggFun);
-            if (inf == null) {
-                inf = new SimilarAggregatesInfo();
-                LogicalVariable newAggVar = context.newVar();
-                pushedVars.add(newAggVar);
-                inf.stepOneResult = new VariableReferenceExpression(newAggVar);
-                inf.simAggs = new ArrayList<AggregateExprInfo>();
-                toReplaceMap.put(aggFun, inf);
-                AggregateFunctionCallExpression aggLocal = new AggregateFunctionCallExpression(fi1, false, newArgs);
-                pushedExprs.add(new MutableObject<ILogicalExpression>(aggLocal));
-            }
+
+            SimilarAggregatesInfo inf = new SimilarAggregatesInfo();
+            LogicalVariable newAggVar = context.newVar();
+            pushedVars.add(newAggVar);
+            inf.stepOneResult = new VariableReferenceExpression(newAggVar);
+            inf.simAggs = new ArrayList<AggregateExprInfo>();
+            toReplaceSet.add(inf);
+            AggregateFunctionCallExpression aggLocal = new AggregateFunctionCallExpression(fi1, false, newArgs);
+            pushedExprs.add(new MutableObject<ILogicalExpression>(aggLocal));
             AggregateExprInfo aei = new AggregateExprInfo();
             aei.aggExprRef = expRef;
             aei.newFunInfo = fi2;
@@ -114,11 +112,22 @@
             pushedAgg.setExecutionMode(ExecutionMode.LOCAL);
             // If newGbyOp is null, then we optimizing an aggregate without group by.
             if (newGbyOp != null) {
+                // Cut and paste nested input pipelines of initAgg to pushedAgg's input
+                Mutable<ILogicalOperator> inputRef = initAgg.getInputs().get(0);
+                Mutable<ILogicalOperator> bottomRef = inputRef;
+                while (bottomRef.getValue().getInputs().size() > 0) {
+                    bottomRef = bottomRef.getValue().getInputs().get(0);
+                }
+                ILogicalOperator oldNts = bottomRef.getValue();
+                initAgg.getInputs().clear();
+                initAgg.getInputs().add(new MutableObject<ILogicalOperator>(oldNts));
+
                 // Hook up the nested aggregate op with the outer group by.
                 NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(
                         newGbyOp));
                 nts.setExecutionMode(ExecutionMode.LOCAL);
-                pushedAgg.getInputs().add(new MutableObject<ILogicalOperator>(nts));
+                bottomRef.setValue(nts);
+                pushedAgg.getInputs().add(inputRef);
             } else {
                 // The local aggregate operator is fed by the input of the original aggregate operator.
                 pushedAgg.getInputs().add(new MutableObject<ILogicalOperator>(initAgg.getInputs().get(0).getValue()));
@@ -144,7 +153,6 @@
     }
 
     protected class BookkeepingInfo {
-        Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap = new HashMap<AggregateFunctionCallExpression, SimilarAggregatesInfo>();
         Map<GroupByOperator, List<LogicalVariable>> modifyGbyMap = new HashMap<GroupByOperator, List<LogicalVariable>>();
     }
 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
index aba68f9..93f181f 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
@@ -16,6 +16,7 @@
 
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -25,6 +26,7 @@
 import org.apache.commons.lang3.mutable.MutableObject;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -39,6 +41,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.IsomorphismUtilities;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import edu.uci.ics.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
@@ -66,12 +69,18 @@
             return false;
         }
 
-        replaceOriginalAggFuncs(bi.toReplaceMap);
-
+        Set<LogicalVariable> newGbyLiveVars = new ListSet<LogicalVariable>();
+        VariableUtilities.getLiveVariables(newGbyOp, newGbyLiveVars);
         for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyOp.getDecorList()) {
-            LogicalVariable newDecorVar = context.newVar();
-            newGbyOp.addDecorExpression(newDecorVar, p.second.getValue());
-            p.second.setValue(new VariableReferenceExpression(newDecorVar));
+            List<LogicalVariable> usedDecorVars = new ArrayList<LogicalVariable>();
+            // p.second.getValue() should always return a VariableReferenceExpression, hence
+            // usedDecorVars should always contain only one variable.
+            p.second.getValue().getUsedVariables(usedDecorVars);
+            if (!newGbyLiveVars.contains(usedDecorVars.get(0))) {
+                LogicalVariable newDecorVar = context.newVar();
+                newGbyOp.addDecorExpression(newDecorVar, p.second.getValue());
+                p.second.setValue(new VariableReferenceExpression(newDecorVar));
+            }
         }
         newGbyOp.setExecutionMode(ExecutionMode.LOCAL);
         Object v = gbyOp.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY);
@@ -175,8 +184,9 @@
             GroupByOperator newGbyOp, BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context)
             throws AlgebricksException {
         List<Mutable<ILogicalOperator>> pushedRoots = new ArrayList<Mutable<ILogicalOperator>>();
+        Set<SimilarAggregatesInfo> toReplaceSet = new HashSet<SimilarAggregatesInfo>();
         for (Mutable<ILogicalOperator> r : nestedPlan.getRoots()) {
-            if (!tryToPushRoot(r, oldGbyOp, newGbyOp, bi, gbyVars, context, pushedRoots)) {
+            if (!tryToPushRoot(r, oldGbyOp, newGbyOp, bi, gbyVars, context, pushedRoots, toReplaceSet)) {
                 // For now, if we cannot push everything, give up.
                 return new Pair<Boolean, ILogicalPlan>(false, null);
             }
@@ -184,21 +194,69 @@
         if (pushedRoots.isEmpty()) {
             return new Pair<Boolean, ILogicalPlan>(true, null);
         } else {
-            return new Pair<Boolean, ILogicalPlan>(true, new ALogicalPlanImpl(pushedRoots));
+            // Replaces the aggregation expressions in the original group-by op with new ones.
+            ILogicalPlan newPlan = new ALogicalPlanImpl(pushedRoots);
+            ILogicalPlan plan = fingIdenticalPlan(newGbyOp, newPlan);
+            replaceOriginalAggFuncs(toReplaceSet);
+            if (plan == null) {
+                return new Pair<Boolean, ILogicalPlan>(true, newPlan);
+            } else {
+                // Does not add a nested subplan to newGbyOp if there already exists an isomorphic plan.
+                Set<LogicalVariable> originalVars = new ListSet<LogicalVariable>();
+                Set<LogicalVariable> newVars = new ListSet<LogicalVariable>();
+                for (Mutable<ILogicalOperator> rootRef : pushedRoots) {
+                    VariableUtilities.getProducedVariables(rootRef.getValue(), originalVars);
+                }
+                for (Mutable<ILogicalOperator> rootRef : plan.getRoots()) {
+                    VariableUtilities.getProducedVariables(rootRef.getValue(), newVars);
+                }
+
+                // Replaces variable exprs referring to the variables produced by newPlan by 
+                // those produced by plan.
+                Iterator<LogicalVariable> originalVarIter = originalVars.iterator();
+                Iterator<LogicalVariable> newVarIter = newVars.iterator();
+                while (originalVarIter.hasNext()) {
+                    LogicalVariable originalVar = originalVarIter.next();
+                    LogicalVariable newVar = newVarIter.next();
+                    for (SimilarAggregatesInfo sai : toReplaceSet) {
+                        for (AggregateExprInfo aei : sai.simAggs) {
+                            ILogicalExpression afce = aei.aggExprRef.getValue();
+                            afce.substituteVar(originalVar, newVar);
+                        }
+                    }
+                }
+                return new Pair<Boolean, ILogicalPlan>(true, null);
+            }
         }
     }
 
+    private ILogicalPlan fingIdenticalPlan(GroupByOperator newGbyOp, ILogicalPlan plan) throws AlgebricksException {
+        for (ILogicalPlan nestedPlan : newGbyOp.getNestedPlans()) {
+            if (IsomorphismUtilities.isOperatorIsomorphicPlan(plan, nestedPlan)) {
+                return nestedPlan;
+            }
+        }
+        return null;
+    }
+
     private boolean tryToPushRoot(Mutable<ILogicalOperator> root, GroupByOperator oldGbyOp, GroupByOperator newGbyOp,
             BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context,
-            List<Mutable<ILogicalOperator>> toPushAccumulate) throws AlgebricksException {
+            List<Mutable<ILogicalOperator>> toPushAccumulate, Set<SimilarAggregatesInfo> toReplaceSet)
+            throws AlgebricksException {
         AbstractLogicalOperator op1 = (AbstractLogicalOperator) root.getValue();
         if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
             return false;
         }
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
-        if (op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+        // Finds nested group-by if any.
+        AbstractLogicalOperator op3 = op2;
+        while (op3.getOperatorTag() != LogicalOperatorTag.GROUP && op3.getInputs().size() == 1) {
+            op3 = (AbstractLogicalOperator) op3.getInputs().get(0).getValue();
+        }
+
+        if (op3.getOperatorTag() != LogicalOperatorTag.GROUP) {
             AggregateOperator initAgg = (AggregateOperator) op1;
-            Pair<Boolean, Mutable<ILogicalOperator>> pOpRef = tryToPushAgg(initAgg, newGbyOp, bi.toReplaceMap, context);
+            Pair<Boolean, Mutable<ILogicalOperator>> pOpRef = tryToPushAgg(initAgg, newGbyOp, toReplaceSet, context);
             if (!pOpRef.first) {
                 return false;
             }
@@ -209,19 +267,14 @@
             bi.modifyGbyMap.put(oldGbyOp, gbyVars);
             return true;
         } else {
-            while (op2.getOperatorTag() != LogicalOperatorTag.GROUP && op2.getInputs().size() == 1) {
-                op2 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue();
-            }
-            if (op2.getOperatorTag() != LogicalOperatorTag.GROUP) {
-                return false;
-            }
-            GroupByOperator nestedGby = (GroupByOperator) op2;
+            GroupByOperator nestedGby = (GroupByOperator) op3;
             List<LogicalVariable> gbyVars2 = nestedGby.getGbyVarList();
             List<LogicalVariable> concatGbyVars = new ArrayList<LogicalVariable>(gbyVars);
             concatGbyVars.addAll(gbyVars2);
             for (ILogicalPlan p : nestedGby.getNestedPlans()) {
                 for (Mutable<ILogicalOperator> r2 : p.getRoots()) {
-                    if (!tryToPushRoot(r2, nestedGby, newGbyOp, bi, concatGbyVars, context, toPushAccumulate)) {
+                    if (!tryToPushRoot(r2, nestedGby, newGbyOp, bi, concatGbyVars, context, toPushAccumulate,
+                            toReplaceSet)) {
                         return false;
                     }
                 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 3da9e76..07621ce 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -49,12 +49,12 @@
 
 public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
 
-    private HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childrenToParents = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();
-    private List<Mutable<ILogicalOperator>> roots = new ArrayList<Mutable<ILogicalOperator>>();
-    private List<List<Mutable<ILogicalOperator>>> equivalenceClasses = new ArrayList<List<Mutable<ILogicalOperator>>>();
-    private HashMap<Mutable<ILogicalOperator>, BitSet> opToCandidateInputs = new HashMap<Mutable<ILogicalOperator>, BitSet>();
-    private HashMap<Mutable<ILogicalOperator>, MutableInt> clusterMap = new HashMap<Mutable<ILogicalOperator>, MutableInt>();
-    private HashMap<Integer, BitSet> clusterWaitForMap = new HashMap<Integer, BitSet>();
+    private final HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childrenToParents = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();
+    private final List<Mutable<ILogicalOperator>> roots = new ArrayList<Mutable<ILogicalOperator>>();
+    private final List<List<Mutable<ILogicalOperator>>> equivalenceClasses = new ArrayList<List<Mutable<ILogicalOperator>>>();
+    private final HashMap<Mutable<ILogicalOperator>, BitSet> opToCandidateInputs = new HashMap<Mutable<ILogicalOperator>, BitSet>();
+    private final HashMap<Mutable<ILogicalOperator>, MutableInt> clusterMap = new HashMap<Mutable<ILogicalOperator>, MutableInt>();
+    private final HashMap<Integer, BitSet> clusterWaitForMap = new HashMap<Integer, BitSet>();
     private int lastUsedClusterId = 0;
 
     @Override
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
index e702d9f..51f2025 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
@@ -28,7 +28,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public class IntroHashPartitionMergeExchange implements IAlgebraicRewriteRule {
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
index ee1b7df..b0dbb1e 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
@@ -14,8 +14,8 @@
  */
 package edu.uci.ics.hyracks.algebricks.rewriter.rules;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 
@@ -24,7 +24,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
@@ -46,12 +45,12 @@
         if (!aggOp.isGlobal() || aggOp.getExecutionMode() == ExecutionMode.LOCAL) {
             return false;
         }
-        Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap = new HashMap<AggregateFunctionCallExpression, SimilarAggregatesInfo>();
-        Pair<Boolean, Mutable<ILogicalOperator>> result = tryToPushAgg(aggOp, null, toReplaceMap, context);
+        Set<SimilarAggregatesInfo> toReplaceSet = new HashSet<SimilarAggregatesInfo>();
+        Pair<Boolean, Mutable<ILogicalOperator>> result = tryToPushAgg(aggOp, null, toReplaceSet, context);
         if (!result.first || result.second == null) {
             return false;
         }
-        replaceOriginalAggFuncs(toReplaceMap);
+        replaceOriginalAggFuncs(toReplaceSet);
         context.computeAndSetTypeEnvironmentForOperator(aggOp);
         return true;
     }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSubplanIntoGroupByRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSubplanIntoGroupByRule.java
index 4dfede0..1eef9f8 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSubplanIntoGroupByRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSubplanIntoGroupByRule.java
@@ -1,25 +1,40 @@
 /*
+
  * Copyright 2009-2013 by The Regents of the University of California
+
  * Licensed under the Apache License, Version 2.0 (the "License");
+
  * you may not use this file except in compliance with the License.
+
  * you may obtain a copy of the License from
+
  * 
+
  *     http://www.apache.org/licenses/LICENSE-2.0
+
  * 
+
  * Unless required by applicable law or agreed to in writing, software
+
  * distributed under the License is distributed on an "AS IS" BASIS,
+
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+
  * See the License for the specific language governing permissions and
+
  * limitations under the License.
+
  */
 
 package edu.uci.ics.hyracks.algebricks.rewriter.rules;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
@@ -29,92 +44,168 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 /**
- * This rule pushes a subplan on top of a group-by into the
+ * This rule pushes an array of subplans on top of a group-by into the
  * nested plan of the group-by.
  * 
  * @author yingyib
  */
-public class PushSubplanIntoGroupByRule implements IAlgebraicRewriteRule {
 
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
-        return false;
-    }
+public class PushSubplanIntoGroupByRule implements IAlgebraicRewriteRule {
+    /** Stores used variables above the current operator. */
+    private final Set<LogicalVariable> usedVarsSoFar = new HashSet<LogicalVariable>();
 
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
         ILogicalOperator parentOperator = opRef.getValue();
+        if (context.checkIfInDontApplySet(this, parentOperator)) {
+            return false;
+        }
+        context.addToDontApplySet(this, parentOperator);
+        VariableUtilities.getUsedVariables(parentOperator, usedVarsSoFar);
         if (parentOperator.getInputs().size() <= 0) {
             return false;
         }
         boolean changed = false;
+        GroupByOperator gby = null;
         for (Mutable<ILogicalOperator> ref : parentOperator.getInputs()) {
             AbstractLogicalOperator op = (AbstractLogicalOperator) ref.getValue();
             /** Only processes subplan operator. */
+            List<SubplanOperator> subplans = new ArrayList<SubplanOperator>();
             if (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
-                AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-                /** Only processes the case a group-by operator is the input of the subplan operator. */
-                if (child.getOperatorTag() == LogicalOperatorTag.GROUP) {
-                    SubplanOperator subplan = (SubplanOperator) op;
-                    GroupByOperator gby = (GroupByOperator) child;
-                    List<ILogicalPlan> subplanNestedPlans = subplan.getNestedPlans();
-                    List<ILogicalPlan> gbyNestedPlans = gby.getNestedPlans();
-                    List<ILogicalPlan> subplanNestedPlansToRemove = new ArrayList<ILogicalPlan>();
-                    for (ILogicalPlan subplanNestedPlan : subplanNestedPlans) {
-                        List<Mutable<ILogicalOperator>> rootOpRefs = subplanNestedPlan.getRoots();
-                        List<Mutable<ILogicalOperator>> rootOpRefsToRemove = new ArrayList<Mutable<ILogicalOperator>>();
-                        for (Mutable<ILogicalOperator> rootOpRef : rootOpRefs) {
-                            /** Gets free variables in the root operator of a nested plan and its descent. */
-                            Set<LogicalVariable> freeVars = new ListSet<LogicalVariable>();
-                            VariableUtilities.getUsedVariablesInDescendantsAndSelf(rootOpRef.getValue(), freeVars);
-                            Set<LogicalVariable> producedVars = new ListSet<LogicalVariable>();
-                            VariableUtilities.getProducedVariablesInDescendantsAndSelf(rootOpRef.getValue(),
-                                    producedVars);
-                            freeVars.removeAll(producedVars);
+                while (op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
+                    SubplanOperator currentSubplan = (SubplanOperator) op;
+                    subplans.add(currentSubplan);
+                    op = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+                }
+                /** Only processes the case a group-by operator is the input of the subplan operators. */
+                if (op.getOperatorTag() == LogicalOperatorTag.GROUP) {
+                    gby = (GroupByOperator) op;
+                    List<ILogicalPlan> newGbyNestedPlans = new ArrayList<ILogicalPlan>();
+                    for (SubplanOperator subplan : subplans) {
+                        List<ILogicalPlan> subplanNestedPlans = subplan.getNestedPlans();
+                        List<ILogicalPlan> gbyNestedPlans = gby.getNestedPlans();
+                        List<ILogicalPlan> subplanNestedPlansToRemove = new ArrayList<ILogicalPlan>();
+                        for (ILogicalPlan subplanNestedPlan : subplanNestedPlans) {
+                            List<Mutable<ILogicalOperator>> rootOpRefs = subplanNestedPlan.getRoots();
+                            List<Mutable<ILogicalOperator>> rootOpRefsToRemove = new ArrayList<Mutable<ILogicalOperator>>();
+                            for (Mutable<ILogicalOperator> rootOpRef : rootOpRefs) {
+                                /** Gets free variables in the root operator of a nested plan and its descent. */
+                                Set<LogicalVariable> freeVars = new ListSet<LogicalVariable>();
+                                VariableUtilities.getUsedVariablesInDescendantsAndSelf(rootOpRef.getValue(), freeVars);
+                                Set<LogicalVariable> producedVars = new ListSet<LogicalVariable>();
+                                VariableUtilities.getProducedVariablesInDescendantsAndSelf(rootOpRef.getValue(),
+                                        producedVars);
+                                freeVars.removeAll(producedVars);
+                                /** * Checks whether the above freeVars are all contained in live variables * of one nested plan inside the group-by operator. * If yes, then the subplan can be pushed into the nested plan of the group-by. */
+                                for (ILogicalPlan gbyNestedPlanOriginal : gbyNestedPlans) {
+                                    // add a subplan in the original gby
+                                    if (!newGbyNestedPlans.contains(gbyNestedPlanOriginal)) {
+                                        newGbyNestedPlans.add(gbyNestedPlanOriginal);
+                                    }
 
-                            /**
-                             * Checks whether the above freeVars are all contained in live variables
-                             * of one nested plan inside the group-by operator.
-                             * If yes, then the subplan can be pushed into the nested plan of the group-by.
-                             */
-                            for (ILogicalPlan gbyNestedPlan : gbyNestedPlans) {
-                                List<Mutable<ILogicalOperator>> gbyRootOpRefs = gbyNestedPlan.getRoots();
-                                for (Mutable<ILogicalOperator> gbyRootOpRef : gbyRootOpRefs) {
-                                    Set<LogicalVariable> liveVars = new ListSet<LogicalVariable>();
-                                    VariableUtilities.getLiveVariables(gbyRootOpRef.getValue(), liveVars);
-                                    if (liveVars.containsAll(freeVars)) {
-                                        /** Does the actual push. */
-                                        Mutable<ILogicalOperator> ntsRef = downToNts(rootOpRef);
-                                        ntsRef.setValue(gbyRootOpRef.getValue());
-                                        gbyRootOpRef.setValue(rootOpRef.getValue());
-                                        rootOpRefsToRemove.add(rootOpRef);
-                                        changed = true;
+                                    // add a pushed subplan
+                                    ILogicalPlan gbyNestedPlan = OperatorManipulationUtil.deepCopy(
+                                            gbyNestedPlanOriginal, context);
+                                    List<Mutable<ILogicalOperator>> gbyRootOpRefs = gbyNestedPlan.getRoots();
+                                    for (int rootIndex = 0; rootIndex < gbyRootOpRefs.size(); rootIndex++) {
+                                        //set the nts for a original subplan
+                                        Mutable<ILogicalOperator> originalGbyRootOpRef = gbyNestedPlanOriginal
+                                                .getRoots().get(rootIndex);
+                                        Mutable<ILogicalOperator> originalGbyNtsRef = downToNts(originalGbyRootOpRef);
+                                        NestedTupleSourceOperator originalNts = (NestedTupleSourceOperator) originalGbyNtsRef
+                                                .getValue();
+                                        originalNts.setDataSourceReference(new MutableObject<ILogicalOperator>(gby));
+
+                                        //push a new subplan if possible
+                                        Mutable<ILogicalOperator> gbyRootOpRef = gbyRootOpRefs.get(rootIndex);
+                                        Set<LogicalVariable> liveVars = new ListSet<LogicalVariable>();
+                                        VariableUtilities.getLiveVariables(gbyRootOpRef.getValue(), liveVars);
+                                        if (liveVars.containsAll(freeVars)) {
+                                            /** Does the actual push. */
+                                            Mutable<ILogicalOperator> ntsRef = downToNts(rootOpRef);
+                                            ntsRef.setValue(gbyRootOpRef.getValue());
+                                            // Removes unused vars.
+                                            AggregateOperator aggOp = (AggregateOperator) gbyRootOpRef.getValue();
+                                            for (int varIndex = aggOp.getVariables().size() - 1; varIndex >= 0; varIndex--) {
+                                                if (!freeVars.contains(aggOp.getVariables().get(varIndex))) {
+                                                    aggOp.getVariables().remove(varIndex);
+                                                    aggOp.getExpressions().remove(varIndex);
+                                                }
+                                            }
+
+                                            gbyRootOpRef.setValue(rootOpRef.getValue());
+                                            rootOpRefsToRemove.add(rootOpRef);
+
+                                            // Sets the nts for a new pushed plan.
+                                            Mutable<ILogicalOperator> oldGbyNtsRef = downToNts(gbyRootOpRef);
+                                            NestedTupleSourceOperator nts = (NestedTupleSourceOperator) oldGbyNtsRef
+                                                    .getValue();
+                                            nts.setDataSourceReference(new MutableObject<ILogicalOperator>(gby));
+
+                                            newGbyNestedPlans.add(gbyNestedPlan);
+                                            changed = true;
+                                            continue;
+                                        }
                                     }
                                 }
                             }
+                            rootOpRefs.removeAll(rootOpRefsToRemove);
+                            if (rootOpRefs.size() == 0) {
+                                subplanNestedPlansToRemove.add(subplanNestedPlan);
+                            }
                         }
-                        rootOpRefs.removeAll(rootOpRefsToRemove);
-                        if (rootOpRefs.size() == 0) {
-                            subplanNestedPlansToRemove.add(subplanNestedPlan);
-                        }
+                        subplanNestedPlans.removeAll(subplanNestedPlansToRemove);
                     }
-                    subplanNestedPlans.removeAll(subplanNestedPlansToRemove);
-                    if (subplanNestedPlans.size() == 0) {
+                    if (changed) {
                         ref.setValue(gby);
+                        gby.getNestedPlans().clear();
+                        gby.getNestedPlans().addAll(newGbyNestedPlans);
                     }
                 }
             }
         }
+        if (changed) {
+            cleanup(gby);
+        }
         return changed;
     }
 
+    /**
+     * Removes unused aggregation variables (and expressions)
+     * 
+     * @param gby
+     * @throws AlgebricksException
+     */
+    private void cleanup(GroupByOperator gby) throws AlgebricksException {
+        for (ILogicalPlan nestedPlan : gby.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> rootRef : nestedPlan.getRoots()) {
+                AggregateOperator aggOp = (AggregateOperator) rootRef.getValue();
+                for (int varIndex = aggOp.getVariables().size() - 1; varIndex >= 0; varIndex--) {
+                    if (!usedVarsSoFar.contains(aggOp.getVariables().get(varIndex))) {
+                        aggOp.getVariables().remove(varIndex);
+                        aggOp.getExpressions().remove(varIndex);
+                    }
+                }
+            }
+
+        }
+    }
+
     private Mutable<ILogicalOperator> downToNts(Mutable<ILogicalOperator> opRef) {
         Mutable<ILogicalOperator> currentOpRef = opRef;
         while (currentOpRef.getValue().getInputs().size() > 0) {
@@ -122,4 +213,5 @@
         }
         return currentOpRef;
     }
-}
+
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
index 6e0d55c..de4fbfb 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.algebricks.rewriter.rules;
 
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -23,6 +24,7 @@
 import org.apache.commons.lang3.mutable.Mutable;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
 import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -87,6 +89,26 @@
                     removeUnusedAssigns(r, toRemove, context);
                 }
             }
+
+            // Removes redundant nested plans that produces nothing
+            for (int i = opWithNest.getNestedPlans().size() - 1; i >= 0; i--) {
+                ILogicalPlan nestedPlan = opWithNest.getNestedPlans().get(i);
+                List<Mutable<ILogicalOperator>> rootsToBeRemoved = new ArrayList<Mutable<ILogicalOperator>>();
+                for (Mutable<ILogicalOperator> r : nestedPlan.getRoots()) {
+                    ILogicalOperator topOp = r.getValue();
+                    Set<LogicalVariable> producedVars = new ListSet<LogicalVariable>();
+                    VariableUtilities.getProducedVariablesInDescendantsAndSelf(topOp, producedVars);
+                    if (producedVars.size() == 0) {
+                        rootsToBeRemoved.add(r);
+                    }
+                }
+                // Makes sure the operator should have at least ONE nested plan even it is empty 
+                // (because a lot of places uses this assumption,  TODO(yingyib): clean them up).
+                if (nestedPlan.getRoots().size() == rootsToBeRemoved.size() && opWithNest.getNestedPlans().size() > 1) {
+                    nestedPlan.getRoots().removeAll(rootsToBeRemoved);
+                    opWithNest.getNestedPlans().remove(nestedPlan);
+                }
+            }
         }
     }