ASTERIXDB-1005, ASTERIXDB-1263: clean up subplan flattening, including:
1. Fixed the data property progation in HashJoin, NestedLoopJoin, PreClusteredGroupBy, and BroadcastExchange;
2. Fixed race conditions in SplitOperatorDescriptor;
3. Added a top-down pass for JobBuilder to set location constraints;
4. Fixed AbstractIntroduceGroupByCombinerRule for general cases.

Change-Id: I0197dc879cf983577e63ea5c047144966c0f7a3c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/572
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index a2ed2ae..813e58f 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -41,6 +41,7 @@
 
     public abstract LogicalVariable newVar();
 
+    @Override
     public abstract IMetadataProvider<?, ?> getMetadataProvider();
 
     public abstract void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider);
@@ -53,7 +54,7 @@
      * returns true if op1 and op2 have already been compared
      */
     public abstract boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2);
-    
+
     public abstract void removeFromAlreadyCompared(ILogicalOperator op1);
 
     public abstract void addNotToBeInlinedVar(LogicalVariable var);
@@ -72,6 +73,8 @@
 
     public abstract List<FunctionalDependency> getFDList(ILogicalOperator op);
 
+    public void clearAllFDAndEquivalenceClasses();
+
     public abstract void putLogicalPropertiesVector(ILogicalOperator op, ILogicalPropertiesVector v);
 
     public abstract ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op);
@@ -89,6 +92,6 @@
     public abstract void computeAndSetTypeEnvironmentForOperator(ILogicalOperator op) throws AlgebricksException;
 
     public abstract void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars);
-    
+
     public abstract LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor();
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitor.java
deleted file mode 100644
index d8089bd..0000000
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitor.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
-import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
-import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
-
-/**
- * This visitor is to add back variables that are killed in the query plan rooted at an input operator.
- * After visiting, it also provides a variable map for variables that have been
- * mapped in the query plan, e.g., by group-by, assign, and union.
- */
-class EnforceVariablesVisitor implements IQueryOperatorVisitor<ILogicalOperator, Collection<LogicalVariable>> {
-    private final IOptimizationContext context;
-    private final Map<LogicalVariable, LogicalVariable> inputVarToOutputVarMap = new HashMap<>();
-
-    public EnforceVariablesVisitor(IOptimizationContext context) {
-        this.context = context;
-    }
-
-    public Map<LogicalVariable, LogicalVariable> getInputVariableToOutputVariableMap() {
-        return inputVarToOutputVarMap;
-    }
-
-    @Override
-    public ILogicalOperator visitAggregateOperator(AggregateOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return rewriteAggregateOperator(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitRunningAggregateOperator(RunningAggregateOperator op,
-            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
-        return rewriteAggregateOperator(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op,
-            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitGroupByOperator(GroupByOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        Set<LogicalVariable> liveVars = new HashSet<>();
-        VariableUtilities.getLiveVariables(op, liveVars);
-        varsToRecover.removeAll(liveVars);
-
-        // Maps group by key variables if the corresponding expressions are VariableReferenceExpressions.
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> keyVarExprRef : op.getGroupByList()) {
-            ILogicalExpression expr = keyVarExprRef.second.getValue();
-            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                VariableReferenceExpression varExpr = (VariableReferenceExpression) expr;
-                LogicalVariable sourceVar = varExpr.getVariableReference();
-                updateVarMapping(sourceVar, keyVarExprRef.first);
-                varsToRecover.remove(sourceVar);
-            }
-        }
-
-        for (LogicalVariable varToRecover : varsToRecover) {
-            // This limits the visitor can only be applied to a nested logical plan inside a Subplan operator,
-            // where the varsToRecover forms a candidate key which can uniquely identify a tuple out of the nested-tuple-source.
-            op.getDecorList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(null,
-                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(varToRecover))));
-        }
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitLimitOperator(LimitOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op,
-            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op,
-            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitOrderOperator(OrderOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitAssignOperator(AssignOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        List<Mutable<ILogicalExpression>> assignedExprRefs = op.getExpressions();
-        List<LogicalVariable> assignedVars = op.getVariables();
-
-        // Maps assigning variables if assignment expressions are VariableReferenceExpressions.
-        for (int index = 0; index < assignedVars.size(); ++index) {
-            ILogicalExpression expr = assignedExprRefs.get(index).getValue();
-            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                VariableReferenceExpression varExpr = (VariableReferenceExpression) expr;
-                LogicalVariable sourceVar = varExpr.getVariableReference();
-                updateVarMapping(sourceVar, assignedVars.get(index));
-                varsToRecover.remove(sourceVar);
-            }
-        }
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitSelectOperator(SelectOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitProjectOperator(ProjectOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        varsToRecover.removeAll(op.getVariables());
-        // Adds all missing variables that should propagates up.
-        op.getVariables().addAll(varsToRecover);
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op,
-            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitScriptOperator(ScriptOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        throw new UnsupportedOperationException("Script operators in a subplan are not supported!");
-    }
-
-    @Override
-    public ILogicalOperator visitSubplanOperator(SubplanOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitUnionOperator(UnionAllOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        // Update the variable mappings
-        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varTriples = op.getVariableMappings();
-        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple : varTriples) {
-            updateVarMapping(triple.second, triple.first);
-            updateVarMapping(triple.third, triple.first);
-        }
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitUnnestOperator(UnnestOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitOuterUnnestOperator(OuterUnnestOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        Set<LogicalVariable> liveVars = new HashSet<>();
-        VariableUtilities.getLiveVariables(op, liveVars);
-        varsToRecover.remove(liveVars);
-        if (!varsToRecover.isEmpty()) {
-            op.setPropagatesInput(true);
-            return visitsInputs(op, varsToRecover);
-        }
-        return op;
-    }
-
-    @Override
-    public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitDistinctOperator(DistinctOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitExchangeOperator(ExchangeOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitExternalDataLookupOperator(ExternalDataLookupOperator op,
-            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
-        Set<LogicalVariable> liveVars = new HashSet<>();
-        VariableUtilities.getLiveVariables(op, liveVars);
-        varsToRecover.retainAll(liveVars);
-        if (!varsToRecover.isEmpty()) {
-            op.setPropagateInput(true);
-            return visitsInputs(op, varsToRecover);
-        }
-        return op;
-    }
-
-    @Override
-    public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    /**
-     * Wraps an AggregateOperator or RunningAggregateOperator with a group-by operator where
-     * the group-by keys are variables in varsToRecover.
-     * Note that the function here prevents this visitor being used to rewrite arbitrary query plans.
-     * Instead, it could only be used for rewriting a nested plan within a subplan operator.
-     *
-     * @param op
-     *            the logical operator for aggregate or running aggregate.
-     * @param varsToRecover
-     *            the set of variables that needs to preserve.
-     * @return the wrapped group-by operator if {@code varsToRecover} is not empty, and {@code op} otherwise.
-     * @throws AlgebricksException
-     */
-    private ILogicalOperator rewriteAggregateOperator(ILogicalOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        Set<LogicalVariable> liveVars = new HashSet<>();
-        VariableUtilities.getLiveVariables(op, liveVars);
-        varsToRecover.removeAll(liveVars);
-
-        GroupByOperator gbyOp = new GroupByOperator();
-        for (LogicalVariable varToRecover : varsToRecover) {
-            // This limits the visitor can only be applied to a nested logical plan inside a Subplan operator,
-            // where the varsToRecover forms a candidate key which can uniquely identify a tuple out of the nested-tuple-source.
-            LogicalVariable newVar = context.newVar();
-            gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
-                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(varToRecover))));
-            updateVarMapping(varToRecover, newVar);
-        }
-
-        NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(gbyOp));
-        op.getInputs().clear();
-        op.getInputs().add(new MutableObject<ILogicalOperator>(nts));
-
-        ILogicalOperator inputOp = op.getInputs().get(0).getValue();
-        ILogicalPlan nestedPlan = new ALogicalPlanImpl();
-        nestedPlan.getRoots().add(new MutableObject<ILogicalOperator>(op));
-        gbyOp.getNestedPlans().add(nestedPlan);
-        gbyOp.getInputs().add(new MutableObject<ILogicalOperator>(inputOp));
-
-        OperatorManipulationUtil.computeTypeEnvironmentBottomUp(op, context);
-        return visitsInputs(gbyOp, varsToRecover);
-    }
-
-    private ILogicalOperator visitsInputs(ILogicalOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        if (op.getInputs().size() == 0 || varsToRecover.isEmpty()) {
-            return op;
-        }
-        Set<LogicalVariable> producedVars = new HashSet<>();
-        VariableUtilities.getProducedVariables(op, producedVars);
-        varsToRecover.removeAll(producedVars);
-        if (!varsToRecover.isEmpty()) {
-            if (op.getInputs().size() == 1) {
-                // Deals with single input operators.
-                ILogicalOperator newOp = op.getInputs().get(0).getValue().accept(this, varsToRecover);
-                op.getInputs().get(0).setValue(newOp);
-            } else {
-                // Deals with multi-input operators.
-                for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
-                    ILogicalOperator child = childRef.getValue();
-                    Set<LogicalVariable> varsToRecoverInChild = new HashSet<>();
-                    VariableUtilities.getProducedVariablesInDescendantsAndSelf(child, varsToRecoverInChild);
-                    // Obtains the variables that this particular child should propagate.
-                    varsToRecoverInChild.retainAll(varsToRecover);
-                    ILogicalOperator newChild = child.accept(this, varsToRecoverInChild);
-                    childRef.setValue(newChild);
-                }
-            }
-        }
-        return op;
-    }
-
-    private void updateVarMapping(LogicalVariable oldVar, LogicalVariable newVar) {
-        if (oldVar.equals(newVar)) {
-            return;
-        }
-        LogicalVariable mappedVar = newVar;
-        if (inputVarToOutputVarMap.containsKey(newVar)) {
-            mappedVar = inputVarToOutputVarMap.get(newVar);
-            inputVarToOutputVarMap.remove(newVar);
-        }
-        inputVarToOutputVarMap.put(oldVar, mappedVar);
-    }
-
-}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
index aa9848c..b86e8e9 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
@@ -22,7 +22,6 @@
 import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -45,14 +44,16 @@
             throws AlgebricksException {
         List<Mutable<ILogicalOperator>> inputs1 = op.getInputs();
         List<Mutable<ILogicalOperator>> inputs2 = arg.getInputs();
-        if (inputs1.size() != inputs2.size())
-            return Boolean.FALSE;
+        if (inputs1.size() != inputs2.size()) {
+            return false;
+        }
         for (int i = 0; i < inputs1.size(); i++) {
             ILogicalOperator input1 = inputs1.get(i).getValue();
             ILogicalOperator input2 = inputs2.get(i).getValue();
             boolean isomorphic = isOperatorIsomorphicPlanSegment(input1, input2);
-            if (!isomorphic)
-                return Boolean.FALSE;
+            if (!isomorphic) {
+                return false;
+            }
         }
         return IsomorphismUtilities.isOperatorIsomorphic(op, arg);
     }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
index f0bcd34..da252d4 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
@@ -52,6 +52,9 @@
     }
 
     public ILogicalExpression deepCopy(ILogicalExpression expr) throws AlgebricksException {
+        if (expr == null) {
+            return null;
+        }
         return expr.accept(this, null);
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index ec7d7fe..2a92ead 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -348,7 +348,7 @@
     @Override
     public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, ILogicalOperator arg)
             throws AlgebricksException {
-        NestedTupleSourceOperator opCopy = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(arg));
+        NestedTupleSourceOperator opCopy = new NestedTupleSourceOperator(op.getDataSourceReference());
         deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
         return opCopy;
     }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 74e74a6..5cf30c7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -134,7 +134,7 @@
     @Override
     public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg)
             throws AlgebricksException {
-        return new NestedTupleSourceOperator(null);
+        return new NestedTupleSourceOperator(op.getDataSourceReference());
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index ddcf6c8..4b791c1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -436,7 +436,9 @@
             return;
         }
         IVariableTypeEnvironment env = ctx.getOutputTypeEnvironment(op);
-        env.substituteProducedVariable(arg.first, arg.second);
+        if (env != null) {
+            env.substituteProducedVariable(arg.first, arg.second);
+        }
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index f80e1cd..e82b4f7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -22,7 +22,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -167,6 +166,9 @@
                     }
                     break;
                 }
+                case RANDOM_PARTITION_EXCHANGE: {
+                    break;
+                }
                 default: {
                     throw new AlgebricksException("Unhandled physical operator tag '" + physOp.getOperatorTag() + "'.");
                 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index 70eb544..0352f83 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -28,10 +28,8 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
 public class VariableUtilities {
@@ -54,6 +52,14 @@
         op.accept(visitor, null);
     }
 
+    public static void getSubplanLocalLiveVariables(ILogicalOperator op, Collection<LogicalVariable> liveVariables)
+            throws AlgebricksException {
+        VariableUtilities.getLiveVariables(op, liveVariables);
+        Set<LogicalVariable> locallyProducedVars = new HashSet<>();
+        VariableUtilities.getProducedVariablesInDescendantsAndSelf(op, locallyProducedVars);
+        liveVariables.retainAll(locallyProducedVars);
+    }
+
     public static void getUsedVariablesInDescendantsAndSelf(ILogicalOperator op, Collection<LogicalVariable> vars)
             throws AlgebricksException {
         // DFS traversal
@@ -77,6 +83,21 @@
         substituteVariables(op, v1, v2, true, ctx);
     }
 
+    public static void substituteVariables(ILogicalOperator op, Map<LogicalVariable, LogicalVariable> varMap,
+            ITypingContext ctx) throws AlgebricksException {
+        for (Map.Entry<LogicalVariable, LogicalVariable> entry : varMap.entrySet()) {
+            VariableUtilities.substituteVariables(op, entry.getKey(), entry.getValue(), ctx);
+        }
+    }
+
+    public static void substituteVariables(ILogicalOperator op,
+            List<Pair<LogicalVariable, LogicalVariable>> oldVarNewVarMapHistory, ITypingContext ctx)
+                    throws AlgebricksException {
+        for (Pair<LogicalVariable, LogicalVariable> entry : oldVarNewVarMapHistory) {
+            VariableUtilities.substituteVariables(op, entry.first, entry.second, ctx);
+        }
+    }
+
     public static void substituteVariablesInDescendantsAndSelf(ILogicalOperator op, LogicalVariable v1,
             LogicalVariable v2, ITypingContext ctx) throws AlgebricksException {
         for (Mutable<ILogicalOperator> childOp : op.getInputs()) {
@@ -100,32 +121,4 @@
         return varSet.equals(varArgSet);
     }
 
-    /**
-     * Recursively modifies the query plan to make sure every variable in {@code varsToEnforce}
-     * be part of the output schema of {@code opRef}.
-     *
-     * @param opRef,
-     *            the operator to enforce.
-     * @param varsToEnforce,
-     *            the variables that needs to be live after the operator of {@code opRef}.
-     * @param context,
-     *            the optimization context.
-     * @return a map that maps a variable in {@code varsToEnforce} to yet-another-variable if
-     *         a mapping happens in the query plan under {@code opRef}, e.g., by grouping and assigning.
-     * @throws AlgebricksException
-     */
-    public static Map<LogicalVariable, LogicalVariable> enforceVariablesInDescendantsAndSelf(
-            Mutable<ILogicalOperator> opRef, Collection<LogicalVariable> varsToEnforce, IOptimizationContext context)
-                    throws AlgebricksException {
-        Set<LogicalVariable> copiedVarsToEnforce = new HashSet<>();
-        copiedVarsToEnforce.addAll(varsToEnforce);
-        // Rewrites the query plan
-        EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(context);
-        ILogicalOperator result = opRef.getValue().accept(visitor, copiedVarsToEnforce);
-        opRef.setValue(result);
-        // Re-computes the type environment bottom up.
-        OperatorManipulationUtil.computeTypeEnvironmentBottomUp(result, context);
-        return visitor.getInputVariableToOutputVariableMap();
-    }
-
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index f5ea5f1..09d2253 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -127,9 +127,8 @@
                     public Pair<Boolean, IPartitioningProperty> coordinateRequirements(
                             IPartitioningProperty requirements, IPartitioningProperty firstDeliveredPartitioning,
                             ILogicalOperator op, IOptimizationContext context) throws AlgebricksException {
-                        if (firstDeliveredPartitioning != null
-                                && firstDeliveredPartitioning.getPartitioningType() == requirements
-                                        .getPartitioningType()) {
+                        if (firstDeliveredPartitioning != null && firstDeliveredPartitioning
+                                .getPartitioningType() == requirements.getPartitioningType()) {
                             switch (requirements.getPartitioningType()) {
                                 case UNORDERED_PARTITIONED: {
                                     UnorderedPartitionedProperty upp1 = (UnorderedPartitionedProperty) firstDeliveredPartitioning;
@@ -139,8 +138,8 @@
                                     Map<LogicalVariable, EquivalenceClass> eqmap = context.getEquivalenceClassMap(op);
                                     Set<LogicalVariable> covered = new ListSet<LogicalVariable>();
                                     Set<LogicalVariable> keysCurrent = uppreq.getColumnSet();
-                                    List<LogicalVariable> keysFirst = (keysRightBranch.containsAll(keysCurrent)) ? keysRightBranch
-                                            : keysLeftBranch;
+                                    List<LogicalVariable> keysFirst = (keysRightBranch.containsAll(keysCurrent))
+                                            ? keysRightBranch : keysLeftBranch;
                                     List<LogicalVariable> keysSecond = keysFirst == keysRightBranch ? keysLeftBranch
                                             : keysRightBranch;
                                     for (LogicalVariable r : uppreq.getColumnSet()) {
@@ -155,8 +154,8 @@
                                             j++;
                                         }
                                         if (!found) {
-                                            throw new IllegalStateException("Did not find a variable equivalent to "
-                                                    + r + " among " + keysFirst);
+                                            throw new IllegalStateException("Did not find a variable equivalent to " + r
+                                                    + " among " + keysFirst);
                                         }
                                         LogicalVariable v2 = keysSecond.get(j);
                                         EquivalenceClass ecFst = eqmap.get(v2);
@@ -167,6 +166,9 @@
                                                 break;
                                             }
                                         }
+                                        if (covered.equals(set1)) {
+                                            break;
+                                        }
                                     }
                                     if (!covered.equals(set1)) {
                                         throw new AlgebricksException("Could not modify " + requirements
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index 66cb6b2..2c93cd4 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -20,12 +20,13 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
@@ -54,6 +55,7 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.PropertiesUtil;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 
 public abstract class AbstractPreclusteredGroupByPOperator extends AbstractPhysicalOperator {
 
@@ -155,7 +157,18 @@
                     AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
                     IPhysicalOperator pop2 = op2.getPhysicalOperator();
                     if (pop2 instanceof AbstractPreclusteredGroupByPOperator) {
-                        List<LogicalVariable> sndOrder = ((AbstractPreclusteredGroupByPOperator) pop2).getGbyColumns();
+                        List<LogicalVariable> gbyColumns = ((AbstractPreclusteredGroupByPOperator) pop2)
+                                .getGbyColumns();
+                        List<LogicalVariable> sndOrder = new ArrayList<>();
+                        sndOrder.addAll(gbyColumns);
+                        Set<LogicalVariable> freeVars = new HashSet<>();
+                        try {
+                            OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc(op2, freeVars);
+                        } catch (AlgebricksException e) {
+                            throw new IllegalStateException(e);
+                        }
+                        // Only considers group key variables defined out-side the outer-most group-by operator.
+                        sndOrder.retainAll(freeVars);
                         groupProp.getColumnSet().addAll(sndOrder);
                         groupProp.getPreferredOrderEnforcer().addAll(sndOrder);
                         goon = false;
@@ -210,9 +223,8 @@
                     tl.add(((VariableReferenceExpression) decorPair.second.getValue()).getVariableReference());
                     fdList.add(new FunctionalDependency(hd, tl));
                 }
-                if (allOk
-                        && PropertiesUtil.matchLocalProperties(localProps, props,
-                                new HashMap<LogicalVariable, EquivalenceClass>(), fdList)) {
+                if (allOk && PropertiesUtil.matchLocalProperties(localProps, props,
+                        new HashMap<LogicalVariable, EquivalenceClass>(), fdList)) {
                     localProps = props;
                 }
             }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
index c0e20d6..03a8666 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
@@ -18,15 +18,17 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
+import java.util.ArrayList;
+
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -52,10 +54,9 @@
 
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
         IPartitioningProperty pp = new BroadcastPartitioningProperty(domain);
-        this.deliveredProperties = new StructuralPropertiesVector(pp, op2.getDeliveredPhysicalProperties()
-                .getLocalProperties());
+        // Broadcasts will destroy input local properties.
+        this.deliveredProperties = new StructuralPropertiesVector(pp, new ArrayList<ILocalStructuralProperty>());
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 34e3dc6..071ee72 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -18,12 +18,14 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.logging.Logger;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -34,6 +36,8 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalGroupingProperty;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
@@ -106,14 +110,14 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
         int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
         IVariableTypeEnvironment env = context.getTypeEnvironment(op);
-        IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
-                keysLeftBranch, env, context);
-        IBinaryHashFunctionFamily[] hashFunFamilies = JobGenHelper.variablesToBinaryHashFunctionFamilies(
-                keysLeftBranch, env, context);
+        IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper
+                .variablesToBinaryHashFunctionFactories(keysLeftBranch, env, context);
+        IBinaryHashFunctionFamily[] hashFunFamilies = JobGenHelper.variablesToBinaryHashFunctionFamilies(keysLeftBranch,
+                env, context);
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
         int i = 0;
         IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
@@ -173,9 +177,10 @@
                     case INNER: {
                         opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
                                 maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
-                                comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
-                                        keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
-                                        keysRight, keysLeft), predEvaluatorFactory);
+                                comparatorFactories, recDescriptor,
+                                new JoinMultiComparatorFactory(comparatorFactories, keysLeft, keysRight),
+                                new JoinMultiComparatorFactory(comparatorFactories, keysRight, keysLeft),
+                                predEvaluatorFactory);
                         break;
                     }
                     case LEFT_OUTER: {
@@ -185,9 +190,10 @@
                         }
                         opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
                                 maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
-                                comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
-                                        keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
-                                        keysRight, keysLeft), predEvaluatorFactory, true, nullWriterFactories);
+                                comparatorFactories, recDescriptor,
+                                new JoinMultiComparatorFactory(comparatorFactories, keysLeft, keysRight),
+                                new JoinMultiComparatorFactory(comparatorFactories, keysRight, keysLeft),
+                                predEvaluatorFactory, true, nullWriterFactories);
                         break;
                     }
                     default: {
@@ -209,7 +215,31 @@
     @Override
     protected List<ILocalStructuralProperty> deliveredLocalProperties(ILogicalOperator op, IOptimizationContext context)
             throws AlgebricksException {
-        return new LinkedList<ILocalStructuralProperty>();
+        List<ILocalStructuralProperty> deliveredLocalProperties = new ArrayList<ILocalStructuralProperty>();
+        // Inner join can kick off the "role reversal" optimization, which can kill data properties for the probe side.
+        if (kind == JoinKind.LEFT_OUTER) {
+            AbstractLogicalOperator probeOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+            IPhysicalPropertiesVector probeSideProperties = probeOp.getPhysicalOperator().getDeliveredProperties();
+            List<ILocalStructuralProperty> probeSideLocalProperties = probeSideProperties.getLocalProperties();
+            if (probeSideLocalProperties != null) {
+                // The local grouping property in the probe side will be maintained
+                // and the local ordering property in the probe side will be turned into a local grouping property
+                // if the grouping variables (or sort columns) in the local property contain all the join key variables
+                // for the left branch:
+                // 1. in case spilling is not kicked off, the ordering property is maintained and hence local grouping property is maintained.
+                // 2. if spilling is kicked off, the grouping property is still maintained though the ordering property is destroyed.
+                for (ILocalStructuralProperty property : probeSideLocalProperties) {
+                    Set<LogicalVariable> groupingVars = new ListSet<LogicalVariable>();
+                    Set<LogicalVariable> leftBranchVars = new ListSet<LogicalVariable>();
+                    property.getVariables(groupingVars);
+                    leftBranchVars.addAll(getKeysLeftBranch());
+                    if (groupingVars.containsAll(leftBranchVars)) {
+                        deliveredLocalProperties.add(new LocalGroupingProperty(groupingVars));
+                    }
+                }
+            }
+        }
+        return deliveredLocalProperties;
     }
 
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index a923944..7ff15d7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -105,8 +104,11 @@
             pp = IPartitioningProperty.UNPARTITIONED;
         }
 
-        List<ILocalStructuralProperty> localProps = new LinkedList<ILocalStructuralProperty>();
-        this.deliveredProperties = new StructuralPropertiesVector(pp, localProps);
+        // Nested loop join maintains the local structure property for the probe side.
+        AbstractLogicalOperator probeOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        IPhysicalPropertiesVector probeSideProperties = probeOp.getPhysicalOperator().getDeliveredProperties();
+        List<ILocalStructuralProperty> probeSideLocalProperties = probeSideProperties.getLocalProperties();
+        this.deliveredProperties = new StructuralPropertiesVector(pp, probeSideLocalProperties);
     }
 
     @Override
@@ -125,7 +127,7 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
         RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
                 propagatedSchema, context);
@@ -221,8 +223,8 @@
             } catch (AlgebricksException ae) {
                 throw new HyracksDataException(ae);
             }
-            boolean result = binaryBooleanInspector
-                    .getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength());
+            boolean result = binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(),
+                    p.getLength());
             if (result)
                 return 0;
             else
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
index 42e6bcf..af0087d 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
@@ -47,9 +47,10 @@
         this.domain = domain;
     }
 
+    @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         Pair<IConnectorDescriptor, TargetConstraint> connPair = createConnectorDescriptor(builder.getJobSpec(), op,
                 opSchema, context);
         builder.contributeConnectorWithTargetConstraint(op, connPair.first, connPair.second);
@@ -62,9 +63,10 @@
         return false;
     }
 
+    @Override
     public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
             ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
-        ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory(domain.cardinality());
+        ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory();
         MToNPartitioningConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
     }
@@ -77,8 +79,8 @@
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-        this.deliveredProperties = new StructuralPropertiesVector(new RandomPartitioningProperty(domain), op2
-                .getDeliveredPhysicalProperties().getLocalProperties());
+        this.deliveredProperties = new StructuralPropertiesVector(new RandomPartitioningProperty(domain),
+                op2.getDeliveredPhysicalProperties().getLocalProperties());
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index b4569e4..b5099b1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -20,6 +20,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -239,7 +240,35 @@
         for (Mutable<ILogicalOperator> children : op.getInputs()) {
             computeTypeEnvironmentBottomUp(children.getValue(), context);
         }
+        AbstractLogicalOperator abstractOp = (AbstractLogicalOperator) op;
+        if (abstractOp.hasNestedPlans()) {
+            for (ILogicalPlan p : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
+                for (Mutable<ILogicalOperator> rootRef : p.getRoots()) {
+                    computeTypeEnvironmentBottomUp(rootRef.getValue(), context);
+                }
+            }
+        }
         context.computeAndSetTypeEnvironmentForOperator(op);
     }
 
+    /***
+     * Is the operator <code>>op</code> an ancestor of any operators with tags in the set <code>tags</code>?
+     *
+     * @param op
+     * @param tags
+     * @return True if yes; false other wise.
+     */
+    public static boolean ancestorOfOperators(ILogicalOperator op, Set<LogicalOperatorTag> tags) {
+        LogicalOperatorTag opTag = op.getOperatorTag();
+        if (tags.contains(opTag)) {
+            return true;
+        }
+        for (Mutable<ILogicalOperator> children : op.getInputs()) {
+            if (ancestorOfOperators(children.getValue(), tags)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index 8e254f0..d130d4c 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -98,7 +98,8 @@
     }
 
     @Override
-    public void contributeGraphEdge(ILogicalOperator src, int srcOutputIndex, ILogicalOperator dest, int destInputIndex) {
+    public void contributeGraphEdge(ILogicalOperator src, int srcOutputIndex, ILogicalOperator dest,
+            int destInputIndex) {
         ArrayList<ILogicalOperator> outputs = outEdges.get(src);
         if (outputs == null) {
             outputs = new ArrayList<ILogicalOperator>();
@@ -144,7 +145,13 @@
         List<OperatorDescriptorId> roots = jobSpec.getRoots();
         setSpecifiedPartitionConstraints();
         for (OperatorDescriptorId rootId : roots) {
-            setPartitionConstraintsDFS(rootId, tgtConstraints, null);
+            setPartitionConstraintsBottomup(rootId, tgtConstraints, null, false);
+        }
+        for (OperatorDescriptorId rootId : roots) {
+            setPartitionConstraintsTopdown(rootId, tgtConstraints, null);
+        }
+        for (OperatorDescriptorId rootId : roots) {
+            setPartitionConstraintsBottomup(rootId, tgtConstraints, null, true);
         }
     }
 
@@ -161,7 +168,7 @@
         }
     }
 
-    private void setPartitionConstraintsDFS(OperatorDescriptorId opId,
+    private void setPartitionConstraintsTopdown(OperatorDescriptorId opId,
             Map<IConnectorDescriptor, TargetConstraint> tgtConstraints, IOperatorDescriptor parentOp) {
         List<IConnectorDescriptor> opInputs = jobSpec.getOperatorInputMap().get(opId);
         AlgebricksPartitionConstraint opConstraint = null;
@@ -172,9 +179,39 @@
                 org.apache.commons.lang3.tuple.Pair<org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>, org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>> p = jobSpec
                         .getConnectorOperatorMap().get(cid);
                 IOperatorDescriptor src = p.getLeft().getLeft();
-                // DFS
-                setPartitionConstraintsDFS(src.getOperatorId(), tgtConstraints, opDesc);
+                TargetConstraint constraint = tgtConstraints.get(conn);
+                if (constraint != null) {
+                    if (constraint == TargetConstraint.SAME_COUNT) {
+                        opConstraint = partitionConstraintMap.get(opDesc);
+                        if (partitionConstraintMap.get(src) == null) {
+                            if (opConstraint != null) {
+                                partitionConstraintMap.put(src, opConstraint);
+                                AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, src,
+                                        opConstraint);
+                            }
+                        }
+                    }
+                }
+                // Post Order DFS
+                setPartitionConstraintsTopdown(src.getOperatorId(), tgtConstraints, opDesc);
+            }
+        }
+    }
 
+    private void setPartitionConstraintsBottomup(OperatorDescriptorId opId,
+            Map<IConnectorDescriptor, TargetConstraint> tgtConstraints, IOperatorDescriptor parentOp,
+            boolean finalPass) {
+        List<IConnectorDescriptor> opInputs = jobSpec.getOperatorInputMap().get(opId);
+        AlgebricksPartitionConstraint opConstraint = null;
+        IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(opId);
+        if (opInputs != null) {
+            for (IConnectorDescriptor conn : opInputs) {
+                ConnectorDescriptorId cid = conn.getConnectorId();
+                org.apache.commons.lang3.tuple.Pair<org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>, org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>> p = jobSpec
+                        .getConnectorOperatorMap().get(cid);
+                IOperatorDescriptor src = p.getLeft().getLeft();
+                // Pre-order DFS
+                setPartitionConstraintsBottomup(src.getOperatorId(), tgtConstraints, opDesc, finalPass);
                 TargetConstraint constraint = tgtConstraints.get(conn);
                 if (constraint != null) {
                     switch (constraint) {
@@ -200,12 +237,14 @@
                         opConstraint = new AlgebricksCountPartitionConstraint(1);
                     }
                 }
-                if (opConstraint == null) {
+                if (opConstraint == null && finalPass) {
                     opConstraint = clusterLocations;
                 }
             }
-            partitionConstraintMap.put(opDesc, opConstraint);
-            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc, opConstraint);
+            if (opConstraint != null) {
+                partitionConstraintMap.put(opDesc, opConstraint);
+                AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc, opConstraint);
+            }
         }
     }
 
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index 16c4855..286d5c7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -67,7 +67,6 @@
     private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<IAlgebraicRewriteRule, HashSet<ILogicalOperator>>();
     private Map<LogicalVariable, FunctionalDependency> recordToPrimaryKey = new HashMap<LogicalVariable, FunctionalDependency>();
 
-    @SuppressWarnings("unchecked")
     private IMetadataProvider metadataProvider;
     private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<LogicalVariable>();
 
@@ -90,7 +89,8 @@
     public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
             IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
-            PhysicalOptimizationConfig physicalOptimizationConfig, LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
+            PhysicalOptimizationConfig physicalOptimizationConfig,
+            LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
         this.varCounter = varCounter;
         this.expressionEvalSizeComputer = expressionEvalSizeComputer;
         this.mergeAggregationExpressionFactory = mergeAggregationExpressionFactory;
@@ -100,29 +100,35 @@
         this.prettyPrintVisitor = prettyPrintVisitor;
     }
 
+    @Override
     public int getVarCounter() {
         return varCounter;
     }
 
+    @Override
     public void setVarCounter(int varCounter) {
         this.varCounter = varCounter;
     }
 
+    @Override
     public LogicalVariable newVar() {
         varCounter++;
         LogicalVariable var = new LogicalVariable(varCounter);
         return var;
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public IMetadataProvider getMetadataProvider() {
         return metadataProvider;
     }
 
+    @Override
     public void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider) {
         this.metadataProvider = metadataProvider;
     }
 
+    @Override
     public boolean checkIfInDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op) {
         HashSet<ILogicalOperator> operators = dontApply.get(rule);
         if (operators == null) {
@@ -132,6 +138,7 @@
         }
     }
 
+    @Override
     public void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op) {
         HashSet<ILogicalOperator> operators = dontApply.get(rule);
         if (operators == null) {
@@ -164,26 +171,30 @@
             }
         }
     }
-    
+
     @Override
     public void removeFromAlreadyCompared(ILogicalOperator op1) {
         alreadyCompared.remove(op1);
     }
 
+    @Override
     public void addNotToBeInlinedVar(LogicalVariable var) {
         notToBeInlinedVars.add(var);
     }
 
+    @Override
     public boolean shouldNotBeInlined(LogicalVariable var) {
         return notToBeInlinedVars.contains(var);
     }
 
+    @Override
     public void addPrimaryKey(FunctionalDependency pk) {
         assert (pk.getTail().size() == 1);
         LogicalVariable recordVar = pk.getTail().get(0);
         recordToPrimaryKey.put(recordVar, pk);
     }
 
+    @Override
     public List<LogicalVariable> findPrimaryKey(LogicalVariable recordVar) {
         FunctionalDependency fd = recordToPrimaryKey.get(recordVar);
         if (fd == null) {
@@ -213,6 +224,12 @@
     }
 
     @Override
+    public void clearAllFDAndEquivalenceClasses() {
+        eqClassGlobalMap.clear();
+        fdGlobalMap.clear();
+    }
+
+    @Override
     public ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op) {
         return logicalProps.get(op);
     }
@@ -232,10 +249,12 @@
         return varEvalSizeEnv;
     }
 
+    @Override
     public IMergeAggregationExpressionFactory getMergeAggregationExpressionFactory() {
         return mergeAggregationExpressionFactory;
     }
 
+    @Override
     public PhysicalOptimizationConfig getPhysicalOptimizationConfig() {
         return physicalOptimizationConfig;
     }
@@ -295,7 +314,7 @@
             me.setValue(new FunctionalDependency(hd, tl));
         }
     }
-    
+
     @Override
     public LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor() {
         return prettyPrintVisitor;
diff --git a/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitorTest.java b/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitorTest.java
deleted file mode 100644
index d0676cf..0000000
--- a/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitorTest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
-import org.junit.Test;
-
-import junit.framework.Assert;
-
-public class EnforceVariablesVisitorTest {
-
-    /**
-     * Tests the processing of project operator in RecoverVariablesVisitor.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testProject() throws Exception {
-        // Constructs the input operator.
-        LogicalVariable var = new LogicalVariable(1);
-        List<LogicalVariable> inputVarList = new ArrayList<>();
-        inputVarList.add(var);
-        ProjectOperator projectOp = new ProjectOperator(inputVarList);
-
-        // Constructs the visitor.
-        IOptimizationContext mockedContext = mock(IOptimizationContext.class);
-        EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
-
-        // Calls the visitor.
-        LogicalVariable varToEnforce = new LogicalVariable(2);
-        ProjectOperator op = (ProjectOperator) projectOp.accept(visitor,
-                Arrays.asList(new LogicalVariable[] { varToEnforce }));
-
-        // Checks the result.
-        List<LogicalVariable> expectedVars = Arrays.asList(new LogicalVariable[] { var, varToEnforce });
-        Assert.assertEquals(expectedVars, op.getVariables());
-        Assert.assertTrue(visitor.getInputVariableToOutputVariableMap().isEmpty());
-    }
-
-    /**
-     * Tests the processing of group-by operator in RecoverVariablesVisitor.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testGroupby() throws Exception {
-        // Constructs the group-by operator.
-        LogicalVariable keyVar = new LogicalVariable(2);
-        LogicalVariable keyExprVar = new LogicalVariable(1);
-        GroupByOperator gbyOp = new GroupByOperator();
-        gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(keyVar,
-                new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyExprVar))));
-
-        // Constructs the visitor.
-        IOptimizationContext mockedContext = mock(IOptimizationContext.class);
-        EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
-
-        // Calls the visitor.
-        LogicalVariable varToEnforce = new LogicalVariable(3);
-        Set<LogicalVariable> varsToEnforce = new HashSet<>();
-        varsToEnforce.add(keyExprVar);
-        varsToEnforce.add(varToEnforce);
-        GroupByOperator op = (GroupByOperator) gbyOp.accept(visitor, varsToEnforce);
-
-        // Checks the result.
-        Map<LogicalVariable, LogicalVariable> expectedVarMap = new HashMap<>();
-        expectedVarMap.put(keyExprVar, keyVar);
-        Assert.assertEquals(expectedVarMap, visitor.getInputVariableToOutputVariableMap());
-        VariableReferenceExpression decorVarExpr = (VariableReferenceExpression) op.getDecorList().get(0).second
-                .getValue();
-        Assert.assertEquals(decorVarExpr.getVariableReference(), varToEnforce);
-    }
-
-    /**
-     * Tests the processing of aggregate operator in RecoverVariablesVisitor.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testAggregate() throws Exception {
-        // Constructs the group-by operator.
-        List<LogicalVariable> aggVars = new ArrayList<>();
-        List<Mutable<ILogicalExpression>> aggExprRefs = new ArrayList<>();
-        AggregateOperator aggOp = new AggregateOperator(aggVars, aggExprRefs);
-
-        // Constructs the visitor.
-        LogicalVariable var = new LogicalVariable(3);
-        IOptimizationContext mockedContext = mock(IOptimizationContext.class);
-        when(mockedContext.newVar()).thenReturn(var);
-        EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
-
-        // Calls the visitor.
-        LogicalVariable varToEnforce = new LogicalVariable(2);
-        Set<LogicalVariable> varsToEnforce = new HashSet<>();
-        varsToEnforce.add(varToEnforce);
-        GroupByOperator op = (GroupByOperator) aggOp.accept(visitor, varsToEnforce);
-
-        // Checks the result.
-        Map<LogicalVariable, LogicalVariable> expectedVarMap = new HashMap<>();
-        expectedVarMap.put(varToEnforce, var);
-        Assert.assertEquals(expectedVarMap, visitor.getInputVariableToOutputVariableMap());
-        VariableReferenceExpression keyExpr = (VariableReferenceExpression) op.getGroupByList().get(0).second
-                .getValue();
-        Assert.assertEquals(keyExpr.getVariableReference(), varToEnforce);
-        LogicalVariable expectedGbyVar = op.getGroupByList().get(0).first;
-        Assert.assertEquals(expectedGbyVar, var);
-    }
-
-    /**
-     * Tests the processing of two serial group-by operators in RecoverVariablesVisitor.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testTwoGroupbys() throws Exception {
-        // Constructs the group-by operators.
-        LogicalVariable keyVar = new LogicalVariable(1);
-        LogicalVariable keyExprVar = new LogicalVariable(2);
-        GroupByOperator gbyOp = new GroupByOperator();
-        gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(keyVar,
-                new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyExprVar))));
-        LogicalVariable keyVar2 = new LogicalVariable(2);
-        LogicalVariable keyExprVar2 = new LogicalVariable(3);
-        GroupByOperator gbyOp2 = new GroupByOperator();
-        gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(keyVar2,
-                new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyExprVar2))));
-        gbyOp.getInputs().add(new MutableObject<ILogicalOperator>(gbyOp2));
-
-        // Constructs the visitor.
-        IOptimizationContext mockedContext = mock(IOptimizationContext.class);
-        EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
-
-        // Calls the visitor.
-        LogicalVariable varToEnforce = new LogicalVariable(4);
-        Set<LogicalVariable> varsToEnforce = new HashSet<>();
-        varsToEnforce.add(keyExprVar2);
-        varsToEnforce.add(varToEnforce);
-        GroupByOperator op = (GroupByOperator) gbyOp.accept(visitor, varsToEnforce);
-
-        // Checks the result.
-        Map<LogicalVariable, LogicalVariable> expectedVarMap = new HashMap<>();
-        expectedVarMap.put(keyExprVar2, keyVar);
-        Assert.assertEquals(expectedVarMap, visitor.getInputVariableToOutputVariableMap());
-        VariableReferenceExpression decorVarExpr = (VariableReferenceExpression) op.getDecorList().get(0).second
-                .getValue();
-        Assert.assertEquals(decorVarExpr.getVariableReference(), varToEnforce);
-        GroupByOperator op2 = (GroupByOperator) op.getInputs().get(0).getValue();
-        VariableReferenceExpression decorVarExpr2 = (VariableReferenceExpression) op2.getDecorList().get(0).second
-                .getValue();
-        Assert.assertEquals(decorVarExpr2.getVariableReference(), varToEnforce);
-    }
-
-}
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
index 9aaa3a7..2d2619e 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/AbstractIntroduceGroupByCombinerRule.java
@@ -28,7 +28,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -45,6 +44,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.IsomorphismUtilities;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -186,7 +186,7 @@
 
     private Pair<Boolean, ILogicalPlan> tryToPushSubplan(ILogicalPlan nestedPlan, GroupByOperator oldGbyOp,
             GroupByOperator newGbyOp, BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         List<Mutable<ILogicalOperator>> pushedRoots = new ArrayList<Mutable<ILogicalOperator>>();
         Set<SimilarAggregatesInfo> toReplaceSet = new HashSet<SimilarAggregatesInfo>();
         for (Mutable<ILogicalOperator> r : nestedPlan.getRoots()) {
@@ -215,7 +215,7 @@
                     VariableUtilities.getProducedVariables(rootRef.getValue(), newVars);
                 }
 
-                // Replaces variable exprs referring to the variables produced by newPlan by 
+                // Replaces variable exprs referring to the variables produced by newPlan by
                 // those produced by plan.
                 Iterator<LogicalVariable> originalVarIter = originalVars.iterator();
                 Iterator<LogicalVariable> newVarIter = newVars.iterator();
@@ -246,7 +246,7 @@
     private boolean tryToPushRoot(Mutable<ILogicalOperator> root, GroupByOperator oldGbyOp, GroupByOperator newGbyOp,
             BookkeepingInfo bi, List<LogicalVariable> gbyVars, IOptimizationContext context,
             List<Mutable<ILogicalOperator>> toPushAccumulate, Set<SimilarAggregatesInfo> toReplaceSet)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         AbstractLogicalOperator op1 = (AbstractLogicalOperator) root.getValue();
         if (op1.getOperatorTag() != LogicalOperatorTag.AGGREGATE) {
             return false;
@@ -273,6 +273,11 @@
         } else {
             GroupByOperator nestedGby = (GroupByOperator) op3;
             List<LogicalVariable> gbyVars2 = nestedGby.getGbyVarList();
+            Set<LogicalVariable> freeVars = new HashSet<>();
+            // Removes non-free variables defined in the nested plan.
+            OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc(nestedGby, freeVars);
+            gbyVars2.retainAll(freeVars);
+
             List<LogicalVariable> concatGbyVars = new ArrayList<LogicalVariable>(gbyVars);
             concatGbyVars.addAll(gbyVars2);
             for (ILogicalPlan p : nestedGby.getNestedPlans()) {
@@ -288,50 +293,99 @@
              * Push the nested pipeline which provides the input to the nested group operator into newGbyOp (the combined gby op).
              * The change is to fix asterixdb issue 782.
              */
-            Mutable<ILogicalOperator> nestedGbyInputRef = nestedGby.getInputs().get(0);
-            Mutable<ILogicalOperator> startOfPipelineRef = nestedGbyInputRef;
-            if (startOfPipelineRef.getValue().getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+            // Finds the reference of the bottom-most operator in the pipeline that
+            // should not be pushed to the combiner group-by.
+            Mutable<ILogicalOperator> currentOpRef = new MutableObject<ILogicalOperator>(nestedGby);
+            Mutable<ILogicalOperator> bottomOpRef = findBottomOpRefStayInOldGby(currentOpRef);
+
+            // Adds the used variables in the pipeline from <code>currentOpRef</code> to <code>bottomOpRef</code>
+            // into the group-by keys for the introduced combiner group-by operator.
+            Set<LogicalVariable> usedVars = collectUsedFreeVariables(currentOpRef, bottomOpRef);
+            for (LogicalVariable usedVar : usedVars) {
+                if (!concatGbyVars.contains(usedVar)) {
+                    concatGbyVars.add(usedVar);
+                }
+            }
+
+            // Retains the nested pipeline above the identified operator in the old group-by operator.
+            // Pushes the nested pipeline under the select operator into the new group-by operator.
+            Mutable<ILogicalOperator> oldNtsRef = findNtsRef(currentOpRef);
+            ILogicalOperator opToCombiner = bottomOpRef.getValue().getInputs().get(0).getValue();
+            if (opToCombiner.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+                // No pipeline other than the aggregate operator needs to push to combiner.
                 return true;
             }
-
-            // move down the nested pipeline to find the start of the pipeline right upon the nested-tuple-source
-            boolean hasIsNullFunction = OperatorPropertiesUtil.isNullTest((AbstractLogicalOperator) startOfPipelineRef
-                    .getValue());
-            while (startOfPipelineRef.getValue().getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.NESTEDTUPLESOURCE) {
-                startOfPipelineRef = startOfPipelineRef.getValue().getInputs().get(0);
-                hasIsNullFunction = OperatorPropertiesUtil.isNullTest((AbstractLogicalOperator) startOfPipelineRef
-                        .getValue());
-            }
-            //keep the old nested-tuple-source
-            Mutable<ILogicalOperator> oldNts = startOfPipelineRef.getValue().getInputs().get(0);
-
-            //move down the nested op in the new gby operator
-            Mutable<ILogicalOperator> newGbyNestedOpRef = toPushAccumulate.get(0);
-            while (newGbyNestedOpRef.getValue().getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.NESTEDTUPLESOURCE) {
-                newGbyNestedOpRef = newGbyNestedOpRef.getValue().getInputs().get(0);
-            }
-
-            //insert the pipeline before nested gby into the new (combiner) gby's nested plan on top of the nested-tuple-source
-            startOfPipelineRef.getValue().getInputs().set(0, newGbyNestedOpRef.getValue().getInputs().get(0));
-            newGbyNestedOpRef.getValue().getInputs().set(0, nestedGbyInputRef);
-
-            //in the old gby operator, remove the nested pipeline since it is already pushed to the combiner gby
-            nestedGby.getInputs().set(0, oldNts);
-            List<LogicalVariable> aggProducedVars = new ArrayList<LogicalVariable>();
-            VariableUtilities.getProducedVariables(toPushAccumulate.get(0).getValue(), aggProducedVars);
-
-            if (hasIsNullFunction && aggProducedVars.size() != 0) {
-                // if the old nested pipeline contains a not-null-check, we need to convert it to a not-system-null-check in the non-local gby
-                processNullTest(context, nestedGby, aggProducedVars);
-            }
-
+            bottomOpRef.getValue().getInputs().set(0, new MutableObject<ILogicalOperator>(oldNtsRef.getValue()));
+            Mutable<ILogicalOperator> newGbyNestedOpRef = findNtsRef(toPushAccumulate.get(0));
+            NestedTupleSourceOperator newNts = (NestedTupleSourceOperator) newGbyNestedOpRef.getValue();
+            newGbyNestedOpRef.setValue(opToCombiner);
+            oldNtsRef.setValue(newNts);
             return true;
         }
     }
 
     /**
+     * Find the set of used free variables along the pipeline from <code>topOpRef</code> (exclusive)
+     * to <code>bottomOpRef</code> (inclusive).
+     *
+     * @param topOpRef,
+     *            the top root of the pipeline.
+     * @param bottomOpRef,
+     *            the bottom of the pipeline.
+     * @return the set of used variables.
+     * @throws AlgebricksException
+     */
+    private Set<LogicalVariable> collectUsedFreeVariables(Mutable<ILogicalOperator> topOpRef,
+            Mutable<ILogicalOperator> bottomOpRef) throws AlgebricksException {
+        Set<LogicalVariable> usedVars = new HashSet<>();
+        Mutable<ILogicalOperator> currentOpRef = topOpRef;
+        while (currentOpRef != bottomOpRef) {
+            currentOpRef = currentOpRef.getValue().getInputs().get(0);
+            VariableUtilities.getUsedVariables(currentOpRef.getValue(), usedVars);
+        }
+        Set<LogicalVariable> freeVars = new HashSet<>();
+        OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) topOpRef.getValue(), freeVars);
+        usedVars.retainAll(freeVars);
+        return usedVars;
+    }
+
+    /**
+     * Find the reference of a nested tuple source operator in the query pipeline rooted at <code>currentOpRef</code>
+     *
+     * @param currentOpRef
+     * @return the reference of a nested tuple source operator
+     */
+    private Mutable<ILogicalOperator> findNtsRef(Mutable<ILogicalOperator> currentOpRef) {
+        while (currentOpRef.getValue().getInputs().size() > 0) {
+            currentOpRef = currentOpRef.getValue().getInputs().get(0);
+        }
+        return currentOpRef;
+    }
+
+    /**
+     * Find the bottom-most nested operator reference in the query pipeline rooted at <code>currentOpRef</code>
+     * that cannot be pushed into the combiner group-by operator.
+     *
+     * @param currentOpRef
+     * @return the bottom-most reference of a select operator
+     */
+    private Mutable<ILogicalOperator> findBottomOpRefStayInOldGby(Mutable<ILogicalOperator> currentOpRef)
+            throws AlgebricksException {
+        Mutable<ILogicalOperator> bottomOpRef = currentOpRef;
+        while (currentOpRef.getValue().getInputs().size() > 0) {
+            Set<LogicalVariable> producedVars = new HashSet<>();
+            VariableUtilities.getProducedVariables(currentOpRef.getValue(), producedVars);
+            if (currentOpRef.getValue().getOperatorTag() == LogicalOperatorTag.SELECT || !producedVars.isEmpty()) {
+                bottomOpRef = currentOpRef;
+            }
+            currentOpRef = currentOpRef.getValue().getInputs().get(0);
+        }
+        return bottomOpRef;
+    }
+
+    /**
      * Deal with the case where the nested plan in the combiner gby operator has a null-test before invoking aggregation functions.
-     * 
+     *
      * @param context
      *            The optimization context.
      * @param nestedGby
@@ -341,4 +395,4 @@
      */
     protected abstract void processNullTest(IOptimizationContext context, GroupByOperator nestedGby,
             List<LogicalVariable> aggregateVarsProducedByCombiner);
-}
\ No newline at end of file
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 2181efa..8bf1ad5 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -104,7 +104,8 @@
     }
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         // wait for the physical operators to be set first
         if (op.getPhysicalOperator() == null) {
@@ -166,18 +167,17 @@
         List<IPartitioningProperty> deliveredPartitioningPropertiesFromChildren = new ArrayList<IPartitioningProperty>();
         for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
             AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
-            deliveredPartitioningPropertiesFromChildren.add(child.getDeliveredPhysicalProperties()
-                    .getPartitioningProperty());
+            deliveredPartitioningPropertiesFromChildren
+                    .add(child.getDeliveredPhysicalProperties().getPartitioningProperty());
         }
         int partitioningCompatibleChild = 0;
         for (int i = 0; i < op.getInputs().size(); i++) {
             IPartitioningProperty deliveredPropertyFromChild = deliveredPartitioningPropertiesFromChildren.get(i);
-            if (reqdProperties == null
-                    || reqdProperties[i] == null
-                    || reqdProperties[i].getPartitioningProperty() == null
-                    || deliveredPropertyFromChild == null
-                    || reqdProperties[i].getPartitioningProperty().getPartitioningType() != deliveredPartitioningPropertiesFromChildren
-                            .get(i).getPartitioningType()) {
+            if (reqdProperties == null || reqdProperties[i] == null
+                    || reqdProperties[i].getPartitioningProperty() == null || deliveredPropertyFromChild == null
+                    || reqdProperties[i].getPartitioningProperty()
+                            .getPartitioningType() != deliveredPartitioningPropertiesFromChildren.get(i)
+                                    .getPartitioningType()) {
                 continue;
             }
             IPartitioningProperty requiredPropertyForChild = reqdProperties[i].getPartitioningProperty();
@@ -248,8 +248,8 @@
             AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue();
             IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties();
 
-            AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Properties delivered by " + child.getPhysicalOperator()
-                    + ": " + delivered + "\n");
+            AlgebricksConfig.ALGEBRICKS_LOGGER
+                    .finest(">>>> Properties delivered by " + child.getPhysicalOperator() + ": " + delivered + "\n");
             IPartitioningRequirementsCoordinator prc = pr.getPartitioningCoordinator();
             // Coordinates requirements by looking at the firstDeliveredPartitioning.
             Pair<Boolean, IPartitioningProperty> pbpp = prc.coordinateRequirements(
@@ -258,8 +258,8 @@
             IPhysicalPropertiesVector rqd = new StructuralPropertiesVector(pbpp.second,
                     requiredProperty.getLocalProperties());
 
-            AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Required properties for " + child.getPhysicalOperator()
-                    + ": " + rqd + "\n");
+            AlgebricksConfig.ALGEBRICKS_LOGGER
+                    .finest(">>>> Required properties for " + child.getPhysicalOperator() + ": " + rqd + "\n");
             // The partitioning property of reqdProperties[childIndex] could be updated here because
             // rqd.getPartitioningProperty() is the same object instance as requiredProperty.getPartitioningProperty().
             IPhysicalPropertiesVector diff = delivered.getUnsatisfiedPropertiesFrom(rqd,
@@ -273,7 +273,8 @@
                 changed = true;
                 addEnforcers(op, childIndex, diff, rqd, delivered, childrenDomain, nestedPlan, context);
 
-                AbstractLogicalOperator newChild = ((AbstractLogicalOperator) op.getInputs().get(childIndex).getValue());
+                AbstractLogicalOperator newChild = ((AbstractLogicalOperator) op.getInputs().get(childIndex)
+                        .getValue());
 
                 if (newChild != child) {
                     delivered = newChild.getDeliveredPhysicalProperties();
@@ -308,8 +309,8 @@
 
         if (opIsRedundantSort) {
             if (AlgebricksConfig.DEBUG) {
-                AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Removing redundant SORT operator "
-                        + op.getPhysicalOperator() + "\n");
+                AlgebricksConfig.ALGEBRICKS_LOGGER
+                        .fine(">>>> Removing redundant SORT operator " + op.getPhysicalOperator() + "\n");
                 printOp(op);
             }
             changed = true;
@@ -332,7 +333,7 @@
 
     private IPhysicalPropertiesVector newPropertiesDiff(AbstractLogicalOperator newChild,
             IPhysicalPropertiesVector required, boolean mayExpandPartitioningProperties, IOptimizationContext context)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         IPhysicalPropertiesVector newDelivered = newChild.getDeliveredPhysicalProperties();
 
         Map<LogicalVariable, EquivalenceClass> newChildEqClasses = context.getEquivalenceClassMap(newChild);
@@ -343,8 +344,8 @@
             newChildEqClasses = context.getEquivalenceClassMap(newChild);
             newChildFDs = context.getFDList(newChild);
         }
-        AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Required properties for new op. "
-                + newChild.getPhysicalOperator() + ": " + required + "\n");
+        AlgebricksConfig.ALGEBRICKS_LOGGER.finest(
+                ">>>> Required properties for new op. " + newChild.getPhysicalOperator() + ": " + required + "\n");
 
         return newDelivered.getUnsatisfiedPropertiesFrom(required, mayExpandPartitioningProperties, newChildEqClasses,
                 newChildFDs);
@@ -418,16 +419,16 @@
             IPhysicalPropertiesVector diffOfProperties, IOptimizationContext context) {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         if (op.getOperatorTag() != LogicalOperatorTag.ORDER
-                || (op.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.STABLE_SORT && op
-                        .getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.IN_MEMORY_STABLE_SORT)
+                || (op.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.STABLE_SORT
+                        && op.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.IN_MEMORY_STABLE_SORT)
                 || delivered.getLocalProperties() == null) {
             return false;
         }
         AbstractStableSortPOperator sortOp = (AbstractStableSortPOperator) op.getPhysicalOperator();
         sortOp.computeLocalProperties(op);
         ILocalStructuralProperty orderProp = sortOp.getOrderProperty();
-        return PropertiesUtil.matchLocalProperties(Collections.singletonList(orderProp),
-                delivered.getLocalProperties(), context.getEquivalenceClassMap(op), context.getFDList(op));
+        return PropertiesUtil.matchLocalProperties(Collections.singletonList(orderProp), delivered.getLocalProperties(),
+                context.getEquivalenceClassMap(op), context.getFDList(op));
     }
 
     private void addEnforcers(AbstractLogicalOperator op, int childIndex,
@@ -455,8 +456,8 @@
     private void addLocalEnforcers(AbstractLogicalOperator op, int i, List<ILocalStructuralProperty> localProperties,
             boolean nestedPlan, IOptimizationContext context) throws AlgebricksException {
         if (AlgebricksConfig.DEBUG) {
-            AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Adding local enforcers for local props = " + localProperties
-                    + "\n");
+            AlgebricksConfig.ALGEBRICKS_LOGGER
+                    .fine(">>>> Adding local enforcers for local props = " + localProperties + "\n");
         }
 
         if (localProperties == null || localProperties.isEmpty()) {
@@ -475,8 +476,8 @@
                 }
                 case LOCAL_GROUPING_PROPERTY: {
                     LocalGroupingProperty g = (LocalGroupingProperty) prop;
-                    Collection<LogicalVariable> vars = (g.getPreferredOrderEnforcer() != null) ? g
-                            .getPreferredOrderEnforcer() : g.getColumnSet();
+                    Collection<LogicalVariable> vars = (g.getPreferredOrderEnforcer() != null)
+                            ? g.getPreferredOrderEnforcer() : g.getColumnSet();
                     List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
                     for (LogicalVariable v : vars) {
                         OrderColumn oc = new OrderColumn(v, OrderKind.ASC);
@@ -502,7 +503,7 @@
 
     private Mutable<ILogicalOperator> enforceOrderProperties(List<LocalOrderProperty> oList,
             Mutable<ILogicalOperator> topOp, boolean isMicroOp, IOptimizationContext context)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         List<Pair<IOrder, Mutable<ILogicalExpression>>> oe = new LinkedList<Pair<IOrder, Mutable<ILogicalExpression>>>();
         for (LocalOrderProperty orderProperty : oList) {
             for (OrderColumn oc : orderProperty.getOrderColumns()) {
@@ -539,8 +540,8 @@
                         pop = new RandomMergeExchangePOperator();
                     } else {
                         if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) {
-                            IRangeMap rangeMap = (IRangeMap) op.getAnnotations().get(
-                                    OperatorAnnotations.USE_RANGE_CONNECTOR);
+                            IRangeMap rangeMap = (IRangeMap) op.getAnnotations()
+                                    .get(OperatorAnnotations.USE_RANGE_CONNECTOR);
                             pop = new RangePartitionMergePOperator(ordCols, domain, rangeMap);
                         } else {
                             OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
@@ -574,7 +575,8 @@
                     break;
                 }
                 case ORDERED_PARTITIONED: {
-                    pop = new RangePartitionPOperator(((OrderedPartitionedProperty) pp).getOrderColumns(), domain, null);
+                    pop = new RangePartitionPOperator(((OrderedPartitionedProperty) pp).getOrderColumns(), domain,
+                            null);
                     break;
                 }
                 case BROADCAST: {
@@ -600,8 +602,8 @@
             OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull(exchg, context);
             context.computeAndSetTypeEnvironmentForOperator(exchg);
             if (AlgebricksConfig.DEBUG) {
-                AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Added partitioning enforcer "
-                        + exchg.getPhysicalOperator() + ".\n");
+                AlgebricksConfig.ALGEBRICKS_LOGGER
+                        .fine(">>>> Added partitioning enforcer " + exchg.getPhysicalOperator() + ".\n");
                 printOp((AbstractLogicalOperator) op);
             }
         }
@@ -649,8 +651,8 @@
         newOp.recomputeSchema();
         newOp.computeDeliveredPhysicalProperties(context);
         context.computeAndSetTypeEnvironmentForOperator(newOp);
-        AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Structural properties for " + newOp.getPhysicalOperator()
-                + ": " + newOp.getDeliveredPhysicalProperties() + "\n");
+        AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Structural properties for " + newOp.getPhysicalOperator() + ": "
+                + newOp.getDeliveredPhysicalProperties() + "\n");
 
         PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(newOp, context);
     }
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 92a3691..f13187f 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -28,7 +28,6 @@
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -38,7 +37,6 @@
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
@@ -62,7 +60,8 @@
     private int lastUsedClusterId = 0;
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT
                 && op.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
@@ -153,16 +152,17 @@
             candidate = group.get(0);
             ReplicateOperator rop = new ReplicateOperator(group.size(), materializationFlags);
             rop.setPhysicalOperator(new ReplicatePOperator());
-            rop.setExecutionMode(ExecutionMode.PARTITIONED);
             Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(rop);
             AbstractLogicalOperator aopCandidate = (AbstractLogicalOperator) candidate.getValue();
             List<Mutable<ILogicalOperator>> originalCandidateParents = childrenToParents.get(candidate);
 
+            rop.setExecutionMode(((AbstractLogicalOperator) candidate.getValue()).getExecutionMode());
             if (aopCandidate.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
                 rop.getInputs().add(candidate);
             } else {
                 AbstractLogicalOperator beforeExchange = new ExchangeOperator();
                 beforeExchange.setPhysicalOperator(new OneToOneExchangePOperator());
+                beforeExchange.setExecutionMode(rop.getExecutionMode());
                 Mutable<ILogicalOperator> beforeExchangeRef = new MutableObject<ILogicalOperator>(beforeExchange);
                 beforeExchange.getInputs().add(candidate);
                 context.computeAndSetTypeEnvironmentForOperator(beforeExchange);
@@ -179,6 +179,7 @@
                 } else {
                     AbstractLogicalOperator exchange = new ExchangeOperator();
                     exchange.setPhysicalOperator(new OneToOneExchangePOperator());
+                    exchange.setExecutionMode(rop.getExecutionMode());
                     MutableObject<ILogicalOperator> exchangeRef = new MutableObject<ILogicalOperator>(exchange);
                     exchange.getInputs().add(ropRef);
                     rop.getOutputs().add(exchangeRef);
@@ -203,11 +204,14 @@
                 }
 
                 AbstractLogicalOperator assignOperator = new AssignOperator(liveVars, assignExprs);
+                assignOperator.setExecutionMode(rop.getExecutionMode());
                 assignOperator.setPhysicalOperator(new AssignPOperator());
                 AbstractLogicalOperator projectOperator = new ProjectOperator(liveVars);
                 projectOperator.setPhysicalOperator(new StreamProjectPOperator());
+                projectOperator.setExecutionMode(rop.getExecutionMode());
                 AbstractLogicalOperator exchOp = new ExchangeOperator();
                 exchOp.setPhysicalOperator(new OneToOneExchangePOperator());
+                exchOp.setExecutionMode(rop.getExecutionMode());
                 exchOp.getInputs().add(ropRef);
                 MutableObject<ILogicalOperator> exchOpRef = new MutableObject<ILogicalOperator>(exchOp);
                 rop.getOutputs().add(exchOpRef);
@@ -239,6 +243,7 @@
                     } else {
                         AbstractLogicalOperator exchg = new ExchangeOperator();
                         exchg.setPhysicalOperator(new OneToOneExchangePOperator());
+                        exchg.setExecutionMode(childOp.getExecutionMode());
                         exchg.getInputs().add(new MutableObject<ILogicalOperator>(childOp));
                         parentOp.getInputs().set(index, new MutableObject<ILogicalOperator>(exchg));
                         context.computeAndSetTypeEnvironmentForOperator(exchg);
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
index a90d347..1cefead 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
@@ -26,7 +26,6 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -45,7 +44,7 @@
 /**
  * Pushes projections through its input operator, provided that operator does
  * not produce the projected variables.
- * 
+ *
  * @author Nicola
  */
 public class PushProjectDownRule implements IAlgebraicRewriteRule {
@@ -56,7 +55,8 @@
     }
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         if (op.getOperatorTag() != LogicalOperatorTag.PROJECT) {
             return false;
@@ -79,7 +79,7 @@
 
     private static Pair<Boolean, Boolean> pushThroughOp(HashSet<LogicalVariable> toPush,
             Mutable<ILogicalOperator> opRef2, ILogicalOperator initialOp, IOptimizationContext context)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         List<LogicalVariable> initProjectList = new ArrayList<LogicalVariable>(toPush);
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
         do {
@@ -111,7 +111,7 @@
 
         boolean canCommuteProjection = initProjectList.containsAll(toPush) && initProjectList.containsAll(produced2)
                 && initProjectList.containsAll(used2);
-        // if true, we can get rid of the initial projection
+                // if true, we can get rid of the initial projection
 
         // get rid of useless decor vars.
         if (!canCommuteProjection && op2.getOperatorTag() == LogicalOperatorTag.GROUP) {
@@ -191,7 +191,7 @@
     // It does not try to push above another Projection.
     private static boolean pushAllProjectionsOnTopOf(Collection<LogicalVariable> toPush,
             Mutable<ILogicalOperator> opRef, IOptimizationContext context, ILogicalOperator initialOp)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         if (toPush.isEmpty()) {
             return false;
         }
@@ -201,15 +201,8 @@
             return false;
         }
 
-        switch (op.getOperatorTag()) {
-            case EXCHANGE: {
-                opRef = opRef.getValue().getInputs().get(0);
-                op = (AbstractLogicalOperator) opRef.getValue();
-                break;
-            }
-            case PROJECT: {
-                return false;
-            }
+        if (op.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+            return false;
         }
 
         ProjectOperator pi2 = new ProjectOperator(new ArrayList<LogicalVariable>(toPush));
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceLeftOuterJoinForSubplanRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceLeftOuterJoinForSubplanRule.java
index daf27ac..93af981 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceLeftOuterJoinForSubplanRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/IntroduceLeftOuterJoinForSubplanRule.java
@@ -21,7 +21,6 @@
 import java.util.Iterator;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -37,7 +36,8 @@
 public class IntroduceLeftOuterJoinForSubplanRule implements IAlgebraicRewriteRule {
 
     @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         return false;
     }
 
@@ -74,6 +74,7 @@
                 InnerJoinOperator join = (InnerJoinOperator) op1;
                 Mutable<ILogicalOperator> leftRef = join.getInputs().get(0);
                 Mutable<ILogicalOperator> rightRef = join.getInputs().get(1);
+
                 Mutable<ILogicalOperator> ntsRef = getNtsAtEndOfPipeline(leftRef);
                 if (ntsRef == null) {
                     ntsRef = getNtsAtEndOfPipeline(rightRef);
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
index 1cda8a2..e034af0 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/RandomPartitionComputerFactory.java
@@ -25,29 +25,21 @@
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class RandomPartitionComputerFactory implements
-		ITuplePartitionComputerFactory {
+public class RandomPartitionComputerFactory implements ITuplePartitionComputerFactory {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	private final int domainCardinality;
+    @Override
+    public ITuplePartitionComputer createPartitioner() {
+        return new ITuplePartitionComputer() {
 
-	public RandomPartitionComputerFactory(int domainCardinality) {
-		this.domainCardinality = domainCardinality;
-	}
+            private final Random random = new Random();
 
-	@Override
-	public ITuplePartitionComputer createPartitioner() {
-		return new ITuplePartitionComputer() {
-
-			private final Random random = new Random();
-
-			@Override
-			public int partition(IFrameTupleAccessor accessor, int tIndex,
-					int nParts) throws HyracksDataException {
-				return random.nextInt(domainCardinality);
-			}
-		};
-	}
+            @Override
+            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+                return random.nextInt(nParts);
+            }
+        };
+    }
 
 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
index 7a3a019..4d73fa5 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
@@ -52,10 +52,13 @@
         return new IFrameWriter() {
             @Override
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                buffer.mark();
+                // Record the current position, instead of using buffer.mark().
+                // The latter will be problematic because epWriters[i].nextFrame(buffer)
+                // can flip or clear the buffer.
+                int pos = buffer.position();
                 for (int i = 0; i < epWriters.length; ++i) {
                     if (i != 0) {
-                        buffer.reset();
+                        buffer.position(pos);
                     }
                     epWriters[i].nextFrame(buffer);
                 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index f8a8a67..38a1ebc 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -22,6 +22,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -36,11 +37,17 @@
 
 public class MaterializerTaskState extends AbstractStateObject {
     private RunFileWriter out;
+    private final AtomicInteger numConsumers = new AtomicInteger(1);
 
     public MaterializerTaskState(JobId jobId, TaskId taskId) {
         super(jobId, taskId);
     }
 
+    public MaterializerTaskState(JobId jobId, TaskId taskId, int numConsumers) {
+        super(jobId, taskId);
+        this.numConsumers.set(numConsumers);
+    }
+
     @Override
     public void toBytes(DataOutput out) throws IOException {
 
@@ -67,7 +74,7 @@
     }
 
     public void writeOut(IFrameWriter writer, IFrame frame) throws HyracksDataException {
-        RunFileReader in = out.createDeleteOnCloseReader();
+        RunFileReader in = out.createReader();
         try {
             writer.open();
             try {
@@ -75,6 +82,8 @@
                 while (in.nextFrame(frame)) {
                     writer.nextFrame(frame.getBuffer());
                 }
+            } catch (Throwable th) {
+                throw new HyracksDataException(th);
             } finally {
                 in.close();
             }
@@ -83,10 +92,9 @@
             throw new HyracksDataException(th);
         } finally {
             writer.close();
+            if (numConsumers.decrementAndGet() == 0) {
+                out.getFileReference().delete();
+            }
         }
     }
-
-    public void deleteFile() {
-        out.getFileReference().delete();
-    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index feff13c..82c62a5 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -43,10 +43,10 @@
     private final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0;
     private final static int MATERIALIZE_READER_ACTIVITY_ID = 1;
 
-    private boolean[] outputMaterializationFlags;
-    private boolean requiresMaterialization;
-    private int numberOfNonMaterializedOutputs = 0;
-    private int numberOfActiveMaterializeReaders = 0;
+    private final boolean[] outputMaterializationFlags;
+    private final boolean requiresMaterialization;
+    private final int numberOfNonMaterializedOutputs;
+    private final int numberOfMaterializedOutputs;
 
     public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity) {
         this(spec, rDesc, outputArity, new boolean[outputArity]);
@@ -59,14 +59,23 @@
             recordDescriptors[i] = rDesc;
         }
         this.outputMaterializationFlags = outputMaterializationFlags;
-        requiresMaterialization = false;
+
+        boolean reqMaterialization = false;
+        int matOutputs = 0;
+        int nonMatOutputs = 0;
         for (boolean flag : outputMaterializationFlags) {
             if (flag) {
-                requiresMaterialization = true;
-                break;
+                reqMaterialization = true;
+                matOutputs++;
+            } else {
+                nonMatOutputs++;
             }
         }
 
+        this.requiresMaterialization = reqMaterialization;
+        this.numberOfMaterializedOutputs = matOutputs;
+        this.numberOfNonMaterializedOutputs = nonMatOutputs;
+
     }
 
     @Override
@@ -75,27 +84,17 @@
                 new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
         builder.addActivity(this, sma);
         builder.addSourceEdge(0, sma, 0);
-        int taskOutputIndex = 0;
+        int pipelineOutputIndex = 0;
+        int activityId = MATERIALIZE_READER_ACTIVITY_ID;
         for (int i = 0; i < outputArity; i++) {
-            if (!outputMaterializationFlags[i]) {
-                builder.addTargetEdge(i, sma, taskOutputIndex);
-                taskOutputIndex++;
-            }
-        }
-        numberOfNonMaterializedOutputs = taskOutputIndex;
-
-        if (requiresMaterialization) {
-            int activityId = MATERIALIZE_READER_ACTIVITY_ID;
-            for (int i = 0; i < outputArity; i++) {
-                if (outputMaterializationFlags[i]) {
-                    MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(
-                            new ActivityId(odId, activityId));
-                    builder.addActivity(this, mra);
-                    builder.addTargetEdge(i, mra, 0);
-                    builder.addBlockingEdge(sma, mra);
-                    numberOfActiveMaterializeReaders++;
-                    activityId++;
-                }
+            if (outputMaterializationFlags[i]) {
+                MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(
+                        new ActivityId(odId, activityId++));
+                builder.addActivity(this, mra);
+                builder.addBlockingEdge(sma, mra);
+                builder.addTargetEdge(i, mra, 0);
+            } else {
+                builder.addTargetEdge(i, sma, pipelineOutputIndex++);
             }
         }
     }
@@ -119,7 +118,7 @@
                 public void open() throws HyracksDataException {
                     if (requiresMaterialization) {
                         state = new MaterializerTaskState(ctx.getJobletContext().getJobId(),
-                                new TaskId(getActivityId(), partition));
+                                new TaskId(getActivityId(), partition), numberOfMaterializedOutputs);
                         state.open(ctx);
                     }
                     for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
@@ -215,15 +214,6 @@
                     state.writeOut(writer, new VSizeFrame(ctx));
                 }
 
-                @Override
-                public void deinitialize() throws HyracksDataException {
-                    numberOfActiveMaterializeReaders--;
-                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(
-                            new TaskId(new ActivityId(getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
-                    if (numberOfActiveMaterializeReaders == 0) {
-                        state.deleteFile();
-                    }
-                }
             };
         }
     }