[ASTERIXDB-3197][COMP] outer join support for CBO

Change-Id: I70067ceb9817965a79d45f786548c6496a9e3853
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17557
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: <murali.krishna@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
index 9c69902..46fe70d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
@@ -871,7 +871,7 @@
             IOptimizationContext context, boolean isLeftOuterJoin, boolean isLeftOuterJoinWithSpecialGroupBy,
             IAlgebricksConstantValue leftOuterMissingValue, ILogicalOperator indexSearchOp,
             LogicalVariable newMissingNullPlaceHolderVar, Mutable<ILogicalExpression> conditionRef, Dataset dataset,
-            Index chosenIndex) throws AlgebricksException {
+            Index chosenIndex, AbstractFunctionCallExpression funcExpr) throws AlgebricksException {
         boolean isIndexOnlyPlan = analysisCtx.getIndexOnlyPlanInfo().getFirst();
         List<LogicalVariable> probePKVars = null;
         ILogicalOperator finalIndexSearchOp = indexSearchOp;
@@ -922,6 +922,9 @@
                     }
                 }
                 if (probePKVars == null || probePKVars.isEmpty()) {
+                    if (funcExpr != null) {
+                        conditionRef.setValue(funcExpr);
+                    }
                     return false;
                 }
                 if (isIndexOnlyPlan) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/ArrayBTreeAccessMethod.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/ArrayBTreeAccessMethod.java
index d01a2cb..7eb722d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/ArrayBTreeAccessMethod.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/ArrayBTreeAccessMethod.java
@@ -195,7 +195,7 @@
 
         return AccessMethodUtils.finalizeJoinPlanTransformation(afterJoinRefs, joinRef, indexSubTree, probeSubTree,
                 analysisCtx, context, isLeftOuterJoin, isLeftOuterJoinWithSpecialGroupBy, leftOuterMissingValue,
-                indexSearchOp, newNullPlaceHolderVar, conditionRef, dataset, chosenIndex);
+                indexSearchOp, newNullPlaceHolderVar, conditionRef, dataset, chosenIndex, null);
     }
 
     @Override
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java
index 2a94359..ee3fee0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java
@@ -307,7 +307,7 @@
 
         return AccessMethodUtils.finalizeJoinPlanTransformation(afterJoinRefs, joinRef, indexSubTree, probeSubTree,
                 analysisCtx, context, isLeftOuterJoin, isLeftOuterJoinWithSpecialGroupBy, leftOuterMissingValue,
-                indexSearchOp, newMissingNullPlaceHolderVar, conditionRef, dataset, chosenIndex);
+                indexSearchOp, newMissingNullPlaceHolderVar, conditionRef, dataset, chosenIndex, funcExpr);
     }
 
     /**
@@ -788,6 +788,7 @@
             unnestMapOp.setExecutionMode(ExecutionMode.PARTITIONED);
             unnestMapOp.setSourceLocation(dataSourceOp.getSourceLocation());
             unnestMapOp.getInputs().add(new MutableObject<>(inputOp));
+            context.computeAndSetTypeEnvironmentForOperator(unnestMapOp);
             indexSearchOp = unnestMapOp;
 
             // Adds equivalence classes --- one equivalent class between a primary key
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/RTreeAccessMethod.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/RTreeAccessMethod.java
index 3de78f7..954a445 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/RTreeAccessMethod.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/RTreeAccessMethod.java
@@ -377,7 +377,7 @@
 
         return AccessMethodUtils.finalizeJoinPlanTransformation(afterJoinRefs, joinRef, indexSubTree, probeSubTree,
                 analysisCtx, context, isLeftOuterJoin, isLeftOuterJoinWithSpecialGroupBy, leftOuterMissingValue,
-                indexSearchOp, newMissingNullPlaceHolderVar, conditionRef, dataset, chosenIndex);
+                indexSearchOp, newMissingNullPlaceHolderVar, conditionRef, dataset, chosenIndex, funcExpr);
     }
 
     @Override
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
index d0a1797..3a482a4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
@@ -20,6 +20,7 @@
 package org.apache.asterix.optimizer.rules.cbo;
 
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -30,10 +31,13 @@
 import org.apache.asterix.common.annotations.SkipSecondaryIndexSearchExpressionAnnotation;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Quadruple;
+import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -68,6 +72,19 @@
     private static final Logger LOGGER = LogManager.getLogger();
 
     private final JoinEnum joinEnum;
+    private int leafInputNumber;
+    List<ILogicalOperator> newJoinOps;
+    boolean[] unUsedJoinOps;
+    List<JoinOperator> allJoinOps; // can be inner join or left outer join
+    HashMap<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap;
+    HashMap<LogicalVariable, Integer> varLeafInputIds;
+    // The data scan operators. Will be in the order of the from clause.
+    // Important for position ordering when assigning bits to join expressions.
+    List<Pair<EmptyTupleSourceOperator, DataSourceScanOperator>> emptyTupleAndDataSourceOps;
+    List<Triple<Integer, Integer, Boolean>> buildSets; // the first is the bits and the second is the number of tables.
+    List<Quadruple<Integer, Integer, JoinOperator, Integer>> outerJoinsDependencyList;
+    List<AssignOperator> assignOps;
+    List<ILogicalExpression> assignJoinExprs; // These are the join expressions below the assign operator.
 
     public EnumerateJoinsRule(JoinEnum joinEnum) {
         this.joinEnum = joinEnum;
@@ -86,12 +103,14 @@
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
+
         boolean cboMode = this.getCBOMode(context);
         boolean cboTestMode = this.getCBOTestMode(context);
 
         if (!(cboMode || cboTestMode)) {
             return false;
         }
+
         // If we reach here, then either cboMode or cboTestMode is true.
         // If cboTestMode is true, then we use predefined cardinalities for datasets for asterixdb regression tests.
         // If cboMode is true, then all datasets need to have samples, otherwise the check in doAllDataSourcesHaveSamples()
@@ -106,22 +125,30 @@
             return false;
         }
 
-        List<ILogicalOperator> joinOps = new ArrayList<>();
-        HashMap<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap = new HashMap<>();
+        //joinOps = new ArrayList<>();
+        allJoinOps = new ArrayList<>();
+        newJoinOps = new ArrayList<>();
+        joinLeafInputsHashMap = new HashMap<>();
+        varLeafInputIds = new HashMap<>();
         // The data scan operators. Will be in the order of the from clause.
         // Important for position ordering when assigning bits to join expressions.
-        List<Pair<EmptyTupleSourceOperator, DataSourceScanOperator>> emptyTupleAndDataSourceOps = new ArrayList<>();
-        List<AssignOperator> assignOps = new ArrayList<>();
+        emptyTupleAndDataSourceOps = new ArrayList<>();
+        outerJoinsDependencyList = new ArrayList<>();
+        assignOps = new ArrayList<>();
+        assignJoinExprs = new ArrayList<>();
+        buildSets = new ArrayList<>();
 
         IPlanPrettyPrinter pp = context.getPrettyPrinter();
         printPlan(pp, (AbstractLogicalOperator) op, "Original Whole plan1");
-        boolean canTransform =
-                getJoinOpsAndLeafInputs(op, emptyTupleAndDataSourceOps, joinLeafInputsHashMap, joinOps, assignOps);
+        leafInputNumber = 0;
+        boolean canTransform = getJoinOpsAndLeafInputs(op);
 
         if (!canTransform) {
             return false;
         }
 
+        //convertOuterJoinstoJoinsIfPossible(outerJoinsDependencyList);
+
         // if this happens, something in the input plan is not acceptable to the new code.
         if (emptyTupleAndDataSourceOps.size() != joinLeafInputsHashMap.size()) {
             throw new IllegalStateException(
@@ -132,8 +159,17 @@
 
         int numberOfFromTerms = emptyTupleAndDataSourceOps.size();
 
+        //String viewInPlan = new ALogicalPlanImpl(opRef).toString(); //useful when debugging
+        //System.out.println("viewInPlan");
+        //System.out.println(viewInPlan);
+
+        if (buildSets.size() > 1) {
+            buildSets.sort(Comparator.comparingDouble(o -> o.second)); // sort on the number of tables in each set
+            // we need to build the smaller sets first. So we need to find these first.
+        }
         joinEnum.initEnum((AbstractLogicalOperator) op, cboMode, cboTestMode, numberOfFromTerms,
-                emptyTupleAndDataSourceOps, joinLeafInputsHashMap, joinOps, assignOps, context);
+                emptyTupleAndDataSourceOps, joinLeafInputsHashMap, allJoinOps, assignOps, outerJoinsDependencyList,
+                buildSets, context);
 
         if (cboMode) {
             if (!doAllDataSourcesHaveSamples(emptyTupleAndDataSourceOps, context)) {
@@ -147,7 +183,7 @@
         }
 
         if (assignOps.size() > 0) {
-            pushAssignsIntoLeafInputs(pp, joinLeafInputsHashMap, assignOps);
+            pushAssignsIntoLeafInputs(pp, joinLeafInputsHashMap, assignOps, assignJoinExprs);
         }
 
         if (LOGGER.isTraceEnabled()) {
@@ -165,26 +201,55 @@
         generateHintWarnings();
 
         if (numberOfFromTerms > 1) {
-            buildNewTree(cheapestPlanNode, joinLeafInputsHashMap, joinOps, new MutableInt(0));
-            printPlan(pp, (AbstractLogicalOperator) joinOps.get(0), "New Whole Plan after buildNewTree 1");
-            ILogicalOperator root = addRemainingAssignsAtTheTop(joinOps.get(0), assignOps);
-            printPlan(pp, (AbstractLogicalOperator) joinOps.get(0), "New Whole Plan after buildNewTree 2");
+            unUsedJoinOps = new boolean[allJoinOps.size()];
+            for (int i = 0; i < allJoinOps.size(); i++)
+                unUsedJoinOps[i] = true;
+            getNewJoinOps(cheapestPlanNode, allJoinOps);
+            if (allJoinOps.size() != newJoinOps.size()) {
+                return false; // there are some cases such as R OJ S on true. Here there is an OJ predicate but the code in findJoinConditions
+                // in JoinEnum does not capture this. Will fix later. Just bail for now.
+            }
+            buildNewTree(cheapestPlanNode, joinLeafInputsHashMap, newJoinOps, new MutableInt(0));
+            opRef.setValue(newJoinOps.get(0));
+            //String vp = new ALogicalPlanImpl(opRef).toString();
+
+            if (assignOps.size() > 0) {
+                for (int i = assignOps.size() - 1; i >= 0; i--) {
+                    MutableBoolean removed = new MutableBoolean(false);
+                    removed.setFalse();
+                    pushAssignsAboveJoins(newJoinOps.get(0), assignOps.get(i), assignJoinExprs.get(i), removed);
+                    if (removed.isTrue()) {
+                        assignOps.remove(i);
+                    }
+                }
+            }
+
+            printPlan(pp, (AbstractLogicalOperator) newJoinOps.get(0), "New Whole Plan after buildNewTree 1");
+            ILogicalOperator root = addRemainingAssignsAtTheTop(newJoinOps.get(0), assignOps);
+            printPlan(pp, (AbstractLogicalOperator) newJoinOps.get(0), "New Whole Plan after buildNewTree 2");
             printPlan(pp, (AbstractLogicalOperator) root, "New Whole Plan after buildNewTree");
+
             // this will be the new root
             opRef.setValue(root);
+            //String viewOutPlan = new ALogicalPlanImpl(opRef).toString(); //useful when debugging
+            //System.out.println("viewInPlan again");
+            //System.out.println(viewInPlan);
+            //System.out.println("viewOutPlan");
+            //System.out.println(viewOutPlan);
+
             if (LOGGER.isTraceEnabled()) {
                 LOGGER.trace("---------------------------- Printing Leaf Inputs");
                 printLeafPlans(pp, joinLeafInputsHashMap);
                 // print joins starting from the bottom
-                for (int i = joinOps.size() - 1; i >= 0; i--) {
-                    printPlan(pp, (AbstractLogicalOperator) joinOps.get(i), "join " + i);
+                for (int i = newJoinOps.size() - 1; i >= 0; i--) {
+                    printPlan(pp, (AbstractLogicalOperator) newJoinOps.get(i), "join " + i);
                 }
-                printPlan(pp, (AbstractLogicalOperator) joinOps.get(0), "New Whole Plan");
+                printPlan(pp, (AbstractLogicalOperator) newJoinOps.get(0), "New Whole Plan");
                 printPlan(pp, (AbstractLogicalOperator) root, "New Whole Plan");
             }
 
             // turn of this rule for all joins in this set (subtree)
-            for (ILogicalOperator joinOp : joinOps) {
+            for (ILogicalOperator joinOp : newJoinOps) {
                 context.addToDontApplySet(this, joinOp);
             }
 
@@ -195,11 +260,32 @@
         return true;
     }
 
+    private void pushAssignsAboveJoins(ILogicalOperator op, AssignOperator aOp, ILogicalExpression jexpr,
+            MutableBoolean removed) {
+        System.out.println("op " + op.toString());
+        if (!op.getInputs().isEmpty()) {
+            for (int i = 0; i < op.getInputs().size(); i++) {
+                ILogicalOperator oper = op.getInputs().get(i).getValue();
+                if (joinClause(oper)) {
+                    AbstractBinaryJoinOperator abOp = (AbstractBinaryJoinOperator) oper;
+                    ILogicalExpression expr = abOp.getCondition().getValue();
+                    if (expr.equals(jexpr)) {
+                        op.getInputs().get(i).setValue(aOp);
+                        aOp.getInputs().get(0).setValue(oper);
+                        removed.setTrue();
+                        return;
+                    }
+                }
+                pushAssignsAboveJoins(oper, aOp, jexpr, removed);
+            }
+        }
+    }
+
     private boolean joinClause(ILogicalOperator op) {
         if (op.getOperatorTag() == LogicalOperatorTag.INNERJOIN)
             return true;
-        //if (op.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN)
-        //return true;
+        if (op.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN)
+            return true;
         return false;
     }
 
@@ -262,7 +348,7 @@
         return (joinClause(op));
     }
 
-    // An internal edge must contain only assigns followed by an inner join
+    // An internal edge must contain only assigns followed by an inner join. Not sure if there will be other ops between joins
     private int numVarRefExprs(AssignOperator aOp) {
         List<Mutable<ILogicalExpression>> exprs = aOp.getExpressions();
         int count = 0;
@@ -288,6 +374,7 @@
                 return false;
             }
             assignOps.add(aOp);
+            assignJoinExprs.add(joinExprFound(op));
             op = op.getInputs().get(0).getValue();
         }
         return (joinClause(op));
@@ -318,31 +405,173 @@
         }
     }
 
+    private int getLeafInputId(LogicalVariable lv) {
+        if (varLeafInputIds.containsKey(lv))
+            return varLeafInputIds.get(lv);
+        return -1;
+    }
+
+    private boolean addLeafInputNumbersToVars(ILogicalOperator op) throws AlgebricksException {
+        HashSet<LogicalVariable> opVars = new HashSet<>();
+        VariableUtilities.getLiveVariables(op, opVars);
+        for (LogicalVariable lv : opVars) {
+            int id = getLeafInputId(lv);
+            if ((id != -1) && (id != leafInputNumber)) {
+                return false; // this should not happen
+                // the same variable in different leaf Inputs is problematic for CBO
+            }
+            varLeafInputIds.put(lv, leafInputNumber);
+        }
+        return true;
+    }
+
+    private boolean foundVar(LogicalVariable inputLV, ILogicalOperator op) throws AlgebricksException {
+        HashSet<LogicalVariable> opVars = new HashSet<>();
+        VariableUtilities.getLiveVariables(op, opVars);
+        if (opVars.contains(inputLV)) { // note that this will fail if there variables from different leafInputs
+            return true;
+        }
+        return false;
+    }
+
+    /* will implement this soon
+    // dependencylist is  first, second, op
+    // If we have R outer join S, first is the null extending table as in R, null
+    // In this case, if S is to joined, then R must be present. So S depends on R.
+    // If we have a case of (first, second, LOJ_operator) = (R_leaf_input_id, S_leaf_input_id, LOJop),
+    // and another (S_leaf_input_id, ..., joinOp),
+    // OR (..., S_leaf_input_id, joinOp) then the LOJ can be converted to a join!!
+    private void convertOuterJoinstoJoinsIfPossible(List<Quadruple<Integer, Integer, JoinOperator, Integer>> outerJoinsDependencyList) {
+        List<Integer> getRidOff = new ArrayList<>();
+        int i;
+        //for (Triple<Integer, Integer, ILogicalOperator> tr1 : outerJoinsDependencyList) {
+        for (i = 0; i < outerJoinsDependencyList.size(); i++) {
+            Quadruple<Integer, Integer, JoinOperator, Integer> tr1 = outerJoinsDependencyList.get(i);
+            if (tr1.getThird().getOuterJoin()) {
+                for (Quadruple<Integer, Integer, JoinOperator, Integer> tr2 : outerJoinsDependencyList) {
+                    if (tr2.getThird().getOuterJoin()) {
+                        if ((tr1.getSecond().equals(tr2.getFirst())) || (tr1.getSecond().equals(tr2.getSecond()))) {
+                            getRidOff.add(i);
+                        }
+                    }
+                }
+            }
+        }
+    
+        for (i = getRidOff.size() - 1; i >= 0; i--) {
+            int j = getRidOff.get(i);
+            JoinOperator joinOp = outerJoinsDependencyList.get(j).getThird();
+            joinOp.setOuterJoin(false);
+            outerJoinsDependencyList.remove(j);
+        }
+    }
+     */
+
+    // Each outer join will create one set of dependencies. The right side depends on the left side.
+    private boolean buildDependencyList(ILogicalOperator op, JoinOperator jO,
+            List<Quadruple<Integer, Integer, JoinOperator, Integer>> outerJoinsDependencyList, int rightSideBits)
+            throws AlgebricksException {
+        AbstractBinaryJoinOperator outerJoinOp = (AbstractBinaryJoinOperator) op;
+        ILogicalOperator leftOp = op.getInputs().get(0).getValue();
+        ILogicalExpression expr = outerJoinOp.getCondition().getValue();
+        int leftSideExprBits, rightSideExprBits;
+        List<LogicalVariable> joinExprVars;
+        List<Mutable<ILogicalExpression>> conjs = new ArrayList<>();
+        if (expr.splitIntoConjuncts(conjs)) {
+            for (Mutable<ILogicalExpression> conj : conjs) {
+                joinExprVars = new ArrayList<>();
+                leftSideExprBits = 0;
+                rightSideExprBits = 0;
+                conj.getValue().getUsedVariables(joinExprVars);
+                for (LogicalVariable lv : joinExprVars) {
+                    if (foundVar(lv, leftOp)) {
+                        leftSideExprBits |= 1 << (getLeafInputId(lv) - 1);
+                    } else {
+                        rightSideExprBits |= 1 << (getLeafInputId(lv) - 1);
+                    }
+                }
+                if (leftSideExprBits != 0 && rightSideExprBits != 0) {// avoid expressions like true
+                    outerJoinsDependencyList.add(new Quadruple(leftSideExprBits, rightSideBits, jO, 1));
+                }
+            }
+        } else {
+            leftSideExprBits = 0;
+            rightSideExprBits = 0;
+            joinExprVars = new ArrayList<>();
+            expr.getUsedVariables(joinExprVars);
+            for (LogicalVariable lv : joinExprVars) {
+                if (foundVar(lv, leftOp)) {
+                    leftSideExprBits |= 1 << (getLeafInputId(lv) - 1);
+                } else {
+                    rightSideExprBits |= 1 << (getLeafInputId(lv) - 1);
+                }
+            }
+            if (leftSideExprBits != 0 && rightSideExprBits != 0) {// avoid expressions like true
+                outerJoinsDependencyList.add(new Quadruple(leftSideExprBits, rightSideBits, jO, 1));
+            }
+        }
+        return true;
+    }
+
+    private ILogicalExpression joinExprFound(ILogicalOperator op) {
+        if (!op.getInputs().isEmpty()) {
+            for (int i = 0; i < op.getInputs().size(); i++) {
+                ILogicalOperator oper = op.getInputs().get(i).getValue();
+                if (joinClause(oper)) {
+                    AbstractBinaryJoinOperator abOp = (AbstractBinaryJoinOperator) oper;
+                    return abOp.getCondition().getValue();
+                }
+                return joinExprFound(oper);
+            }
+        } else {
+            return null;
+        }
+        return null;
+    }
+
     /**
      * This is the main routine that stores all the join operators and the leafInputs. We will later reuse the same
      * join operators but switch the leafInputs (see buildNewTree). The whole scheme is based on the assumption that the
      * leafInputs can be switched. The various data structures make the leafInputs accessible efficiently.
      */
-    private boolean getJoinOpsAndLeafInputs(ILogicalOperator op,
-            List<Pair<EmptyTupleSourceOperator, DataSourceScanOperator>> emptyTupleAndDataSourceOps,
-            HashMap<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap, List<ILogicalOperator> joinOps,
-            List<AssignOperator> assignOps) {
-
-        if (op.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
-            return false;
-        }
+    private boolean getJoinOpsAndLeafInputs(ILogicalOperator op) throws AlgebricksException {
 
         if (joinClause(op)) {
-            joinOps.add(op);
+            JoinOperator jO = new JoinOperator((AbstractBinaryJoinOperator) op);
+            allJoinOps.add(jO);
+            if (op.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+                jO.setOuterJoin(true);
+            }
+
+            int firstLeafInputNumber, lastLeafInputNumber;
+            int k = 0;
             for (int i = 0; i < 2; i++) {
                 ILogicalOperator nextOp = op.getInputs().get(i).getValue();
-                boolean canTransform = getJoinOpsAndLeafInputs(nextOp, emptyTupleAndDataSourceOps,
-                        joinLeafInputsHashMap, joinOps, assignOps);
+                firstLeafInputNumber = leafInputNumber + 1; // we are interested in the 2nd input only
+                boolean canTransform = getJoinOpsAndLeafInputs(nextOp);
                 if (!canTransform) {
                     return false;
                 }
+                lastLeafInputNumber = leafInputNumber; // we are interested in the 2nd input only
+                k = 0;
+                // now we know the leafInput numbers that occurred on the right side of this join.
+                if ((op.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) && (i == 1)) {
+                    for (int j = firstLeafInputNumber; j <= lastLeafInputNumber; j++) {
+                        k |= 1 << (j - 1);
+                    }
+                    if (firstLeafInputNumber < lastLeafInputNumber) { // if more is than one leafInput, only then buildSets make sense.
+                        buildSets.add(new Triple<>(k, lastLeafInputNumber - firstLeafInputNumber + 1, true)); // convert the second to boolean later
+                    }
+                    boolean ret = buildDependencyList(op, jO, outerJoinsDependencyList, k);
+                    if (!ret) {
+                        return false;
+                    }
+                }
             }
         } else {
+            if (op.getOperatorTag() == LogicalOperatorTag.GROUP) { // cannot handle group by's in leaf Inputs.
+                return false;
+            }
             Pair<EmptyTupleSourceOperator, DataSourceScanOperator> etsDataSource = containsLeafInputOnly(op);
             if (etsDataSource != null) { // a leaf input
                 EmptyTupleSourceOperator etsOp = etsDataSource.first;
@@ -356,17 +585,16 @@
                         joinLeafInputsHashMap.put(etsOp, selectOp);
                     }
                 } else {
+                    leafInputNumber++;
                     joinLeafInputsHashMap.put(etsOp, op);
+                    if (!addLeafInputNumbersToVars(op)) {
+                        return false;
+                    }
                 }
             } else { // This must be an internal edge
                 if (onlyAssigns(op, assignOps)) {
-                    //if (onlyOneAssign(op, assignOps)) {
-                    // currently, will handle only assign statement and nothing else in an internal Edge.
-                    // we can lift this restriction later if the need arises. This just makes some code easier.
-
                     ILogicalOperator skipAssisgnsOp = skipPastAssigns(op);
-                    boolean canTransform = getJoinOpsAndLeafInputs(skipAssisgnsOp, emptyTupleAndDataSourceOps,
-                            joinLeafInputsHashMap, joinOps, assignOps);
+                    boolean canTransform = getJoinOpsAndLeafInputs(skipAssisgnsOp);
                     if (!canTransform) {
                         return false;
                     }
@@ -375,7 +603,6 @@
                 }
             }
         }
-
         return true;
     }
 
@@ -438,11 +665,14 @@
         }
     }
 
-    private int findAssignOp(ILogicalOperator leafInput, List<AssignOperator> assignOps) throws AlgebricksException {
+    private int findAssignOp(ILogicalOperator leafInput, List<AssignOperator> assignOps,
+            List<ILogicalExpression> assignJoinExprs) throws AlgebricksException {
         int i = -1;
 
         for (AssignOperator aOp : assignOps) {
             i++;
+            if (assignJoinExprs.get(i) != null)
+                continue; // this is an assign associated with a join expression
             // this will be an Assign, so no need to check
             List<LogicalVariable> vars = new ArrayList<>();
             aOp.getExpressions().get(0).getValue().getUsedVariables(vars);
@@ -466,10 +696,7 @@
         if (plan.scanOp == PlanNode.ScanMethod.TABLE_SCAN && leafInput.getOperatorTag() == LogicalOperatorTag.SELECT) {
             SelectOperator selOper = (SelectOperator) leafInput;
             ILogicalExpression expr = selOper.getCondition().getValue();
-
             List<Mutable<ILogicalExpression>> conjs = new ArrayList<>();
-
-            conjs.clear();
             if (expr.splitIntoConjuncts(conjs)) {
                 conjs.remove(new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
                 for (Mutable<ILogicalExpression> conj : conjs) {
@@ -503,17 +730,48 @@
         addCardCostAnnotations(findDataSourceScanOperator(leftInput), plan);
     }
 
+    private void getJoinNode(PlanNode plan, List<JoinOperator> allJoinOps) throws AlgebricksException {
+        //AbstractBinaryJoinOperator joinOp;
+        Boolean outerJoin;
+        LogicalOperatorTag tag;
+        if (plan.outerJoin) {
+            outerJoin = true;
+        } else {
+            outerJoin = false;
+        }
+        int i = -1;
+        for (JoinOperator ajOp : allJoinOps) {
+            i++;
+            if (ajOp.getOuterJoin() == outerJoin && unUsedJoinOps[i]) {
+                unUsedJoinOps[i] = false;
+                newJoinOps.add(ajOp.getAbstractJoinOp());
+                break;
+            }
+        }
+    }
+
+    private void getNewJoinOps(PlanNode plan, List<JoinOperator> allJoinOps) throws AlgebricksException {
+        if (plan.IsJoinNode()) {
+            getJoinNode(plan, allJoinOps);
+            getNewJoinOps(plan.getLeftPlanNode(), allJoinOps);
+            getNewJoinOps(plan.getRightPlanNode(), allJoinOps);
+        }
+    }
+
     // This one is for join queries
     private void buildNewTree(PlanNode plan, HashMap<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap,
-            List<ILogicalOperator> joinOps, MutableInt totalNumberOfJoins) {
+            List<ILogicalOperator> joinOps, MutableInt totalNumberOfJoins) throws AlgebricksException {
         // we have to move the inputs in op around so that they match the tree structure in pn
         // we use the existing joinOps and switch the leafInputs appropriately.
         List<PlanNode> allPlans = joinEnum.getAllPlans();
         int leftIndex = plan.getLeftPlanIndex();
         int rightIndex = plan.getRightPlanIndex();
+        //System.out.println("allPlansSize " + allPlans.size() + " leftIndex " + leftIndex + " rightIndex " + rightIndex); // put in trace statements
+        //System.out.println("allPlansSize " + allPlans.size());
         PlanNode leftPlan = allPlans.get(leftIndex);
         PlanNode rightPlan = allPlans.get(rightIndex);
-        ILogicalOperator joinOp = joinOps.get(totalNumberOfJoins.intValue());
+
+        ILogicalOperator joinOp = joinOps.get(totalNumberOfJoins.intValue()); // intValue set to 0 initially
 
         if (plan.IsJoinNode()) {
             AbstractBinaryJoinOperator abJoinOp = (AbstractBinaryJoinOperator) joinOp;
@@ -567,7 +825,7 @@
             totalNumberOfJoins.increment();
             ILogicalOperator leftInput = joinOps.get(totalNumberOfJoins.intValue());
             joinOp.getInputs().get(0).setValue(leftInput);
-            buildNewTree(allPlans.get(leftIndex), joinLeafInputsHashMap, joinOps, totalNumberOfJoins);
+            buildNewTree(leftPlan, joinLeafInputsHashMap, joinOps, totalNumberOfJoins);
         }
 
         if (rightPlan.IsScanNode()) {
@@ -585,7 +843,7 @@
             totalNumberOfJoins.increment();
             ILogicalOperator rightInput = joinOps.get(totalNumberOfJoins.intValue());
             joinOp.getInputs().get(1).setValue(rightInput);
-            buildNewTree(allPlans.get(rightIndex), joinLeafInputsHashMap, joinOps, totalNumberOfJoins);
+            buildNewTree(rightPlan, joinLeafInputsHashMap, joinOps, totalNumberOfJoins);
         }
     }
 
@@ -628,14 +886,14 @@
     // for every internal edge assign (again assuming only 1 for now), find the corresponding leafInput and place the assign
     // on top of that LeafInput. Modify the joinLeafInputsHashMap as well.
     private void pushAssignsIntoLeafInputs(IPlanPrettyPrinter pp,
-            HashMap<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap, List<AssignOperator> assignOps)
-            throws AlgebricksException {
+            HashMap<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap, List<AssignOperator> assignOps,
+            List<ILogicalExpression> assignJoinExprs) throws AlgebricksException {
 
         for (Map.Entry<EmptyTupleSourceOperator, ILogicalOperator> mapElement : joinLeafInputsHashMap.entrySet()) {
             ILogicalOperator joinLeafInput = mapElement.getValue();
             printPlan(pp, (AbstractLogicalOperator) joinLeafInput, "Incoming leaf Input");
             EmptyTupleSourceOperator ets = mapElement.getKey();
-            int assignNumber = findAssignOp(joinLeafInput, assignOps);
+            int assignNumber = findAssignOp(joinLeafInput, assignOps, assignJoinExprs);
             if (assignNumber != -1) {
                 joinLeafInput = addAssignToLeafInput(joinLeafInput, assignOps.get(assignNumber));
                 printPlan(pp, (AbstractLogicalOperator) joinLeafInput, "Modified leaf Input");
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinCondition.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinCondition.java
index 7cf1ebd..d56d38a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinCondition.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinCondition.java
@@ -26,6 +26,7 @@
     protected static final int NO_JC = -1;
 
     protected ILogicalExpression joinCondition;
+    protected boolean outerJoin;
     private boolean derived = false;
     protected boolean partOfComposite = false;
     protected int numberOfVars = 0; // how many variables
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
index cdeafea..140a069 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
@@ -53,6 +53,8 @@
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Quadruple;
+import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -105,10 +107,15 @@
     private List<Pair<EmptyTupleSourceOperator, DataSourceScanOperator>> emptyTupleAndDataSourceOps;
     protected Map<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap;
     protected List<ILogicalExpression> singleDatasetPreds;
-    private List<AssignOperator> assignOps;
-    private List<ILogicalOperator> joinOps;
+    protected List<AssignOperator> assignOps;
+    List<Quadruple<Integer, Integer, JoinOperator, Integer>> outerJoinsDependencyList;
+    protected List<JoinOperator> allJoinOps;
     protected ILogicalOperator localJoinOp; // used in nestedLoopsApplicable code.
     protected IOptimizationContext optCtx;
+    protected boolean outerJoin;
+    protected List<Triple<Integer, Integer, Boolean>> buildSets;
+    protected int allTabsJnNum; // keeps track of the join Node where all the tables have been joined
+    protected int maxBits; // the joinNode where the dataset bits are the highest is where all the tables have been joined
 
     protected Stats stats;
     private boolean cboMode;
@@ -128,8 +135,10 @@
 
     protected void initEnum(AbstractLogicalOperator op, boolean cboMode, boolean cboTestMode, int numberOfFromTerms,
             List<Pair<EmptyTupleSourceOperator, DataSourceScanOperator>> emptyTupleAndDataSourceOps,
-            Map<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap, List<ILogicalOperator> joinOps,
-            List<AssignOperator> assignOps, IOptimizationContext context) throws AsterixException {
+            Map<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap, List<JoinOperator> allJoinOps,
+            List<AssignOperator> assignOps,
+            List<Quadruple<Integer, Integer, JoinOperator, Integer>> outerJoinsDependencyList,
+            List<Triple<Integer, Integer, Boolean>> buildSets, IOptimizationContext context) throws AsterixException {
         this.singleDatasetPreds = new ArrayList<>();
         this.joinConditions = new ArrayList<>();
         this.joinHints = new HashMap<>();
@@ -144,11 +153,16 @@
         this.emptyTupleAndDataSourceOps = emptyTupleAndDataSourceOps;
         this.joinLeafInputsHashMap = joinLeafInputsHashMap;
         this.assignOps = assignOps;
-        this.joinOps = joinOps;
+        this.outerJoin = false; // assume no outerjoins anywhere in the query at first.
+        this.outerJoinsDependencyList = outerJoinsDependencyList;
+        this.allJoinOps = allJoinOps;
+        this.buildSets = buildSets;
         this.op = op;
         this.forceJoinOrderMode = getForceJoinOrderMode(context);
         this.queryPlanShape = getQueryPlanShape(context);
         initCostHandleAndJoinNodes(context);
+        this.allTabsJnNum = 1; // keeps track of where the final join Node will be. In case of bushy plans, this may not always be the last join nod     e.
+        this.maxBits = 1;
     }
 
     protected void initCostHandleAndJoinNodes(IOptimizationContext context) {
@@ -217,7 +231,7 @@
                 return op;
             }
         }
-        // this will never happen, but keep compiler happy
+
         return null;
     }
 
@@ -279,6 +293,16 @@
         return eqPredFound ? andExpr : null;
     }
 
+    protected boolean lookForOuterJoins(List<Integer> newJoinConditions) {
+        for (int joinNum : newJoinConditions) {
+            JoinCondition jc = joinConditions.get(joinNum);
+            if (jc.outerJoin) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     protected HashJoinExpressionAnnotation findHashJoinHint(List<Integer> newJoinConditions) {
         for (int i : newJoinConditions) {
             JoinCondition jc = joinConditions.get(i);
@@ -358,7 +382,6 @@
                 return i;
             }
         }
-        // should never happen; keep compiler happy.
         return JoinNode.NO_JN;
     }
 
@@ -400,7 +423,7 @@
             }
             return bits;
         }
-        // should never reach this because every variable must exist in some leaf input.
+
         return JoinNode.NO_JN;
     }
 
@@ -408,21 +431,29 @@
     // It also fills in the dataset Bits for each join predicate.
     private void findJoinConditionsAndAssignSels() throws AlgebricksException {
         List<Mutable<ILogicalExpression>> conjs = new ArrayList<>();
-        for (ILogicalOperator jOp : joinOps) {
-            AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) jOp;
+        for (JoinOperator jOp : allJoinOps) {
+            AbstractBinaryJoinOperator joinOp = jOp.getAbstractJoinOp();
             ILogicalExpression expr = joinOp.getCondition().getValue();
             conjs.clear();
             if (expr.splitIntoConjuncts(conjs)) {
                 conjs.remove(new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
                 for (Mutable<ILogicalExpression> conj : conjs) {
                     JoinCondition jc = new JoinCondition();
+                    jc.outerJoin = jOp.getOuterJoin();
+                    if (jc.outerJoin) {
+                        outerJoin = true;
+                    }
                     jc.joinCondition = conj.getValue().cloneExpression();
                     joinConditions.add(jc);
                     jc.selectivity = stats.getSelectivityFromAnnotationMain(jc.joinCondition, true);
                 }
             } else {
-                if ((expr.getExpressionTag().equals(LogicalExpressionTag.FUNCTION_CALL))) {
+                if ((expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL)) {
                     JoinCondition jc = new JoinCondition();
+                    jc.outerJoin = jOp.getOuterJoin();
+                    if (jc.outerJoin) {
+                        outerJoin = true;
+                    }
                     // change to not a true condition
                     jc.joinCondition = expr.cloneExpression();
                     joinConditions.add(jc);
@@ -568,12 +599,51 @@
         return dataRecVarInScan.toString().substring(2);
     }
 
+    private boolean isThisCombinationPossible(JoinNode leftJn, JoinNode rightJn) {
+        for (Quadruple<Integer, Integer, JoinOperator, Integer> tr : outerJoinsDependencyList) {
+            if (tr.getThird().getOuterJoin()) {
+                if (rightJn.datasetBits == tr.getSecond()) { // A dependent table(s) is being joined. Find if other table(s) is present
+                    if (!((leftJn.datasetBits & tr.getFirst()) > 0)) {
+                        return false; // required table not found
+                    }
+                }
+            }
+        }
+
+        if (leftJn.level == 1) { // if we are at a higher level, there is nothing to check as these tables have been joined already in leftJn
+            for (Quadruple<Integer, Integer, JoinOperator, Integer> tr : outerJoinsDependencyList) {
+                if (tr.getThird().getOuterJoin()) {
+                    if (leftJn.datasetBits == tr.getSecond()) { // A dependent table(s) is being joined. Find if other table(s) is present
+                        if (!((rightJn.datasetBits & tr.getFirst()) > 0)) {
+                            return false; // required table not found
+                        }
+                    }
+                }
+            }
+        }
+        return true;
+    }
+
+    private int findBuildSet(int jbits, int numbTabs) {
+        int i;
+        if (buildSets.isEmpty()) {
+            return -1;
+        }
+        for (i = 0; i < buildSets.size(); i++) {
+            //System.out.println("first " + buildSets.get(i).first + " second " + buildSets.get(i).second + " numtabs " + numbTabs + " bits " + jbits);
+            if ((buildSets.get(i).third) && (buildSets.get(i).first & jbits) > 0) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
     private int addNonBushyJoinNodes(int level, int jnNumber, int[] startJnAtLevel) throws AlgebricksException {
         // adding joinNodes of level (2, 3, ..., numberOfTerms)
         int startJnSecondLevel = startJnAtLevel[2];
         int startJnPrevLevel = startJnAtLevel[level - 1];
         int startJnNextLevel = startJnAtLevel[level];
-        int i, j, addPlansToThisJn;
+        int i, j, k, addPlansToThisJn;
 
         // walking thru the previous level
         for (i = startJnPrevLevel; i < startJnNextLevel; i++) {
@@ -584,8 +654,14 @@
                 continue;
             }
 
-            // walk thru the first level here
-            for (j = 1; j < startJnSecondLevel; j++) {
+            int endLevel;
+            if (outerJoin && buildSets.size() > 0) { // we do not need outerJoin here but ok for now. BuildSets are built only when we have outerjoins
+                endLevel = startJnNextLevel; // bushy trees possible
+            } else {
+                endLevel = startJnSecondLevel; // no bushy trees
+            }
+
+            for (j = 1; j < endLevel; j++) { // this enables bushy plans; dangerous :-) should be done only if outer joins are present.
                 if (level == 2 && i > j) {
                     // don't want to generate x y and y x. we will do this in plan generation.
                     continue;
@@ -596,7 +672,28 @@
                     // these already have some common table
                     continue;
                 }
+                //System.out.println("Before1 i = " + i + " j = " + j); // will put these in trace statements soon
+                //System.out.println("Before1 Jni Dataset bits = " + jnI.datasetBits + " Jni Dataset bits = " + jnJ.datasetBits);
+                // first check if the new table is part of a buildSet.
+                k = findBuildSet(jnJ.datasetBits, jnI.level + jnJ.level);
+                //System.out.println("Buildset " + k);
+                if (k > -1) {
+                    if ((jnI.datasetBits & buildSets.get(k).first) == 0) { // i should also be part of the buildSet
+                        continue;
+                    }
+                }
+                //System.out.println("Before2 i = " + i + " j = " + j); // put these in trace statements
+                //System.out.println("Before2 Jni Dataset bits = " + jnI.datasetBits + " Jni Dataset bits = " + jnJ.datasetBits);
+                //System.out.println("Before i = " + i + " j = " + j);
+                if (!isThisCombinationPossible(jnI, jnJ)) {
+                    continue;
+                }
+                //System.out.println("After i = " + i + " j = " + j); //put these in trace statements
+                //System.out.println("After Jni Dataset bits = " + jnI.datasetBits + " Jni Dataset bits = " + jnJ.datasetBits);
                 int newBits = jnI.datasetBits | jnJ.datasetBits;
+                if ((k > 0) && (newBits == buildSets.get(k).first)) { // This buildSet is no longer needed.
+                    buildSets.get(k).third = false;
+                }
                 JoinNode jnNewBits = jnArray[newBits];
                 jnNewBits.jnArrayIndex = newBits;
                 // visiting this join node for the first time
@@ -609,6 +706,10 @@
                     // Then jn[33].highestKeyspaceId will equal 5
                     // if this joinNode ever gets removed, then set jn[19].highestKeyspaceId = 0
                     jn.datasetBits = newBits;
+                    if (newBits > maxBits) {
+                        maxBits = newBits;
+                        allTabsJnNum = jnNumber;
+                    }
                     jnNewBits.jnIndex = addPlansToThisJn = jnNumber;
                     jn.level = level;
                     jn.highestDatasetId = Math.max(jnI.highestDatasetId, j);
@@ -637,6 +738,7 @@
 
                 JoinNode jnIJ = jnArray[addPlansToThisJn];
                 jnIJ.jnArrayIndex = addPlansToThisJn;
+
                 jnIJ.addMultiDatasetPlans(jnI, jnJ);
                 if (forceJoinOrderMode && level > cboFullEnumLevel) {
                     break;
@@ -762,6 +864,10 @@
             ILogicalOperator leafInput = this.joinLeafInputsHashMap.get(ets);
             if (!cboTestMode) {
                 if (idxDetails == null) {
+                    dataScanPlan = jn.addSingleDatasetPlans();
+                    if (dataScanPlan == PlanNode.NO_PLAN) {
+                        return PlanNode.NO_PLAN;
+                    }
                     continue;
                 }
                 double origDatasetCard, finalDatasetCard, sampleCard;
@@ -861,13 +967,11 @@
     }
 
     private ILogicalOperator findASelectOp(ILogicalOperator op) {
-
         while (op != null && op.getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE) {
 
             if (op.getOperatorTag() == LogicalOperatorTag.SELECT) {
                 return op;
             }
-
             op = op.getInputs().get(0).getValue();
         }
         return null;
@@ -973,7 +1077,7 @@
 
         markCompositeJoinPredicates();
         int lastJnNum = enumerateHigherLevelJoinNodes();
-        JoinNode lastJn = jnArray[lastJnNum];
+        JoinNode lastJn = jnArray[allTabsJnNum];
         if (LOGGER.isTraceEnabled()) {
             EnumerateJoinsRule.printPlan(pp, op, "Original Whole plan in JN END");
             LOGGER.trace(dumpJoinNodes(lastJnNum));
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
index 6c5b2ca..41ef4af 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
@@ -44,6 +44,7 @@
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Quadruple;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -695,14 +696,37 @@
         }
     }
 
-    private int buildHashJoinPlan(JoinNode leftJn, JoinNode rightJn, PlanNode leftPlan, PlanNode rightPlan,
-            ILogicalExpression hashJoinExpr, HashJoinExpressionAnnotation hintHashJoin) {
+    // check if the left side or the right side is the preserving side.
+    // The preserving side has to be the probe side (which is the left side since our engine builds from the right side)
+    // R LOJ S -- R is the preserving side; S is the null extending side.
+    // In the dependency list, S will the second entry in the quadruple.
+    // So R must be on the left side and S must be on the right side.
+
+    private boolean nullExtendingSide(int bits, boolean outerJoin) {
+        if (outerJoin) {
+            for (Quadruple<Integer, Integer, JoinOperator, Integer> qu : joinEnum.outerJoinsDependencyList) {
+                if (qu.getThird().getOuterJoin()) {
+                    if (qu.getSecond() == bits) {
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
+    protected int buildHashJoinPlan(JoinNode leftJn, JoinNode rightJn, PlanNode leftPlan, PlanNode rightPlan,
+            ILogicalExpression hashJoinExpr, HashJoinExpressionAnnotation hintHashJoin, boolean outerJoin) {
         List<PlanNode> allPlans = joinEnum.allPlans;
         PlanNode pn, cheapestPlan;
         ICost hjCost, leftExchangeCost, rightExchangeCost, childCosts, totalCost;
         this.leftJn = leftJn;
         this.rightJn = rightJn;
 
+        if (nullExtendingSide(leftJn.datasetBits, outerJoin)) {
+            return PlanNode.NO_PLAN;
+        }
+
         if (hashJoinExpr == null || hashJoinExpr == ConstantExpression.TRUE) {
             return PlanNode.NO_PLAN;
         }
@@ -717,7 +741,7 @@
             return PlanNode.NO_PLAN;
         }
         boolean forceEnum = hintHashJoin != null || joinEnum.forceJoinOrderMode
-                || !joinEnum.queryPlanShape.equals(AlgebricksConfig.QUERY_PLAN_SHAPE_ZIGZAG)
+                || !joinEnum.queryPlanShape.equals(AlgebricksConfig.QUERY_PLAN_SHAPE_ZIGZAG) || outerJoin
                 || level <= joinEnum.cboFullEnumLevel;
         if (rightJn.cardinality * rightJn.size <= leftJn.cardinality * leftJn.size || forceEnum) {
             // We want to build with the smaller side.
@@ -730,6 +754,7 @@
             if (this.cheapestPlanIndex == PlanNode.NO_PLAN || totalCost.costLT(this.cheapestPlanCost) || forceEnum) {
                 pn = new PlanNode(allPlans.size(), joinEnum);
                 pn.setJoinNode(this);
+                pn.outerJoin = outerJoin;
                 pn.setLeftJoinIndex(leftJn.jnArrayIndex);
                 pn.setRightJoinIndex(rightJn.jnArrayIndex);
                 pn.setLeftPlanIndex(leftPlan.allPlansIndex);
@@ -758,7 +783,7 @@
     }
 
     private int buildBroadcastHashJoinPlan(JoinNode leftJn, JoinNode rightJn, PlanNode leftPlan, PlanNode rightPlan,
-            ILogicalExpression hashJoinExpr, BroadcastExpressionAnnotation hintBroadcastHashJoin) {
+            ILogicalExpression hashJoinExpr, BroadcastExpressionAnnotation hintBroadcastHashJoin, boolean outerJoin) {
         List<PlanNode> allPlans = joinEnum.allPlans;
         PlanNode pn, cheapestPlan;
         ICost bcastHjCost, leftExchangeCost, rightExchangeCost, childCosts, totalCost;
@@ -766,6 +791,10 @@
         this.leftJn = leftJn;
         this.rightJn = rightJn;
 
+        if (nullExtendingSide(leftJn.datasetBits, outerJoin)) {
+            return PlanNode.NO_PLAN;
+        }
+
         if (hashJoinExpr == null || hashJoinExpr == ConstantExpression.TRUE) {
             return PlanNode.NO_PLAN;
         }
@@ -781,7 +810,7 @@
         }
 
         boolean forceEnum = hintBroadcastHashJoin != null || joinEnum.forceJoinOrderMode
-                || !joinEnum.queryPlanShape.equals(AlgebricksConfig.QUERY_PLAN_SHAPE_ZIGZAG)
+                || !joinEnum.queryPlanShape.equals(AlgebricksConfig.QUERY_PLAN_SHAPE_ZIGZAG) || outerJoin
                 || level <= joinEnum.cboFullEnumLevel;
         if (rightJn.cardinality * rightJn.size <= leftJn.cardinality * leftJn.size || forceEnum) {
             // We want to broadcast and build with the smaller side.
@@ -794,6 +823,7 @@
             if (this.cheapestPlanIndex == PlanNode.NO_PLAN || totalCost.costLT(this.cheapestPlanCost) || forceEnum) {
                 pn = new PlanNode(allPlans.size(), joinEnum);
                 pn.setJoinNode(this);
+                pn.outerJoin = outerJoin;
                 pn.setLeftJoinIndex(leftJn.jnArrayIndex);
                 pn.setRightJoinIndex(rightJn.jnArrayIndex);
                 pn.setLeftPlanIndex(leftPlan.allPlansIndex);
@@ -822,7 +852,7 @@
     }
 
     private int buildNLJoinPlan(JoinNode leftJn, JoinNode rightJn, PlanNode leftPlan, PlanNode rightPlan,
-            ILogicalExpression nestedLoopJoinExpr, IndexedNLJoinExpressionAnnotation hintNLJoin)
+            ILogicalExpression nestedLoopJoinExpr, IndexedNLJoinExpressionAnnotation hintNLJoin, boolean outerJoin)
             throws AlgebricksException {
 
         // Build a nested loops plan, first check if it is possible
@@ -834,6 +864,11 @@
 
         this.leftJn = leftJn;
         this.rightJn = rightJn;
+
+        if (nullExtendingSide(leftJn.datasetBits, outerJoin)) {
+            return PlanNode.NO_PLAN;
+        }
+
         if (rightJn.jnArrayIndex > numberOfTerms) {
             // right side consists of more than one table
             return PlanNode.NO_PLAN; // nested loop plan not possible.
@@ -848,10 +883,12 @@
         rightExchangeCost = joinEnum.getCostHandle().zeroCost();
         childCosts = allPlans.get(leftPlan.allPlansIndex).totalCost;
         totalCost = nljCost.costAdd(leftExchangeCost).costAdd(childCosts);
-        boolean forceEnum = hintNLJoin != null || joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel;
+        boolean forceEnum =
+                hintNLJoin != null || joinEnum.forceJoinOrderMode || outerJoin || level <= joinEnum.cboFullEnumLevel;
         if (this.cheapestPlanIndex == PlanNode.NO_PLAN || totalCost.costLT(this.cheapestPlanCost) || forceEnum) {
             pn = new PlanNode(allPlans.size(), joinEnum);
             pn.setJoinNode(this);
+            pn.outerJoin = outerJoin;
             pn.setLeftJoinIndex(leftJn.jnArrayIndex);
             pn.setRightJoinIndex(rightJn.jnArrayIndex);
             pn.setLeftPlanIndex(leftPlan.allPlansIndex);
@@ -878,7 +915,7 @@
     }
 
     private int buildCPJoinPlan(JoinNode leftJn, JoinNode rightJn, PlanNode leftPlan, PlanNode rightPlan,
-            ILogicalExpression hashJoinExpr, ILogicalExpression nestedLoopJoinExpr) {
+            ILogicalExpression hashJoinExpr, ILogicalExpression nestedLoopJoinExpr, boolean outerJoin) {
         // Now build a cartesian product nested loops plan
         List<PlanNode> allPlans = joinEnum.allPlans;
         PlanNode pn, cheapestPlan;
@@ -888,6 +925,10 @@
             return PlanNode.NO_PLAN;
         }
 
+        if (nullExtendingSide(leftJn.datasetBits, outerJoin)) {
+            return PlanNode.NO_PLAN;
+        }
+
         this.leftJn = leftJn;
         this.rightJn = rightJn;
 
@@ -915,10 +956,11 @@
         childCosts =
                 allPlans.get(leftPlan.allPlansIndex).totalCost.costAdd(allPlans.get(rightPlan.allPlansIndex).totalCost);
         totalCost = cpCost.costAdd(rightExchangeCost).costAdd(childCosts);
-        boolean forceEnum = joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel;
+        boolean forceEnum = joinEnum.forceJoinOrderMode || outerJoin || level <= joinEnum.cboFullEnumLevel;
         if (this.cheapestPlanIndex == PlanNode.NO_PLAN || totalCost.costLT(this.cheapestPlanCost) || forceEnum) {
             pn = new PlanNode(allPlans.size(), joinEnum);
             pn.setJoinNode(this);
+            pn.outerJoin = outerJoin;
             pn.setLeftJoinIndex(leftJn.jnArrayIndex);
             pn.setRightJoinIndex(rightJn.jnArrayIndex);
             pn.setLeftPlanIndex(leftPlan.allPlansIndex);
@@ -995,6 +1037,7 @@
         }
         ILogicalExpression hashJoinExpr = joinEnum.getHashJoinExpr(newJoinConditions);
         ILogicalExpression nestedLoopJoinExpr = joinEnum.getNestedLoopJoinExpr(newJoinConditions);
+        boolean outerJoin = joinEnum.lookForOuterJoins(newJoinConditions);
 
         double current_card = this.cardinality;
         if (current_card >= Cost.MAX_CARD) {
@@ -1026,13 +1069,14 @@
                         || rightJn.aliases.contains(buildOrProbeObject)))
                         || (probe && (leftJn.datasetNames.contains(buildOrProbeObject)
                                 || leftJn.aliases.contains(buildOrProbeObject)))) {
-                    hjPlan = buildHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, hintHashJoin);
+                    hjPlan = buildHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, hintHashJoin,
+                            outerJoin);
                 } else if ((build && (leftJn.datasetNames.contains(buildOrProbeObject)
                         || leftJn.aliases.contains(buildOrProbeObject)))
                         || (probe && (rightJn.datasetNames.contains(buildOrProbeObject)
                                 || rightJn.aliases.contains(buildOrProbeObject)))) {
-                    commutativeHjPlan =
-                            buildHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, hintHashJoin);
+                    commutativeHjPlan = buildHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr,
+                            hintHashJoin, outerJoin);
                 }
             }
             if (hjPlan == PlanNode.NO_PLAN && commutativeHjPlan == PlanNode.NO_PLAN) {
@@ -1048,24 +1092,27 @@
                                         (build ? "build " : "probe ") + "with " + buildOrProbeObject));
                     }
                 }
-                hjPlan = buildHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null);
+                hjPlan = buildHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null, outerJoin);
                 if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
-                    commutativeHjPlan = buildHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, null);
+                    commutativeHjPlan =
+                            buildHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, null, outerJoin);
                 }
-                bcastHjPlan = buildBroadcastHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null);
+                bcastHjPlan =
+                        buildBroadcastHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null, outerJoin);
                 if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
-                    commutativeBcastHjPlan =
-                            buildBroadcastHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, null);
+                    commutativeBcastHjPlan = buildBroadcastHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan,
+                            hashJoinExpr, null, outerJoin);
                 }
-                nljPlan = buildNLJoinPlan(leftJn, rightJn, leftPlan, rightPlan, nestedLoopJoinExpr, null);
+                nljPlan = buildNLJoinPlan(leftJn, rightJn, leftPlan, rightPlan, nestedLoopJoinExpr, null, outerJoin);
                 if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
                     commutativeNljPlan =
-                            buildNLJoinPlan(rightJn, leftJn, rightPlan, leftPlan, nestedLoopJoinExpr, null);
+                            buildNLJoinPlan(rightJn, leftJn, rightPlan, leftPlan, nestedLoopJoinExpr, null, outerJoin);
                 }
-                cpPlan = buildCPJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, nestedLoopJoinExpr);
+                cpPlan = buildCPJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, nestedLoopJoinExpr,
+                        outerJoin);
                 if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
-                    commutativeCpPlan =
-                            buildCPJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, nestedLoopJoinExpr);
+                    commutativeCpPlan = buildCPJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr,
+                            nestedLoopJoinExpr, outerJoin);
                 }
             }
         } else if (hintBroadcastHashJoin != null) {
@@ -1080,18 +1127,18 @@
                 joinEnum.joinHints.put(hintBroadcastHashJoin, null);
                 if (rightJn.datasetNames.contains(broadcastObject) || rightJn.aliases.contains(broadcastObject)) {
                     bcastHjPlan = buildBroadcastHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr,
-                            hintBroadcastHashJoin);
+                            hintBroadcastHashJoin, outerJoin);
                 } else if (leftJn.datasetNames.contains(broadcastObject) || leftJn.aliases.contains(broadcastObject)) {
                     commutativeBcastHjPlan = buildBroadcastHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan,
-                            hashJoinExpr, hintBroadcastHashJoin);
+                            hashJoinExpr, hintBroadcastHashJoin, outerJoin);
                 }
             } else if (broadcastObject == null) {
                 joinEnum.joinHints.put(hintBroadcastHashJoin, null);
                 bcastHjPlan = buildBroadcastHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr,
-                        hintBroadcastHashJoin);
+                        hintBroadcastHashJoin, outerJoin);
                 if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
                     commutativeBcastHjPlan = buildBroadcastHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan,
-                            hashJoinExpr, hintBroadcastHashJoin);
+                            hashJoinExpr, hintBroadcastHashJoin, outerJoin);
                 }
             }
             if (bcastHjPlan == PlanNode.NO_PLAN && commutativeBcastHjPlan == PlanNode.NO_PLAN) {
@@ -1108,32 +1155,35 @@
                     }
                 }
 
-                hjPlan = buildHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null);
+                hjPlan = buildHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null, outerJoin);
                 if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
-                    commutativeHjPlan = buildHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, null);
+                    commutativeHjPlan =
+                            buildHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, null, outerJoin);
                 }
-                bcastHjPlan = buildBroadcastHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null);
+                bcastHjPlan =
+                        buildBroadcastHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null, outerJoin);
                 if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
-                    commutativeBcastHjPlan =
-                            buildBroadcastHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, null);
+                    commutativeBcastHjPlan = buildBroadcastHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan,
+                            hashJoinExpr, null, outerJoin);
                 }
-                nljPlan = buildNLJoinPlan(leftJn, rightJn, leftPlan, rightPlan, nestedLoopJoinExpr, null);
+                nljPlan = buildNLJoinPlan(leftJn, rightJn, leftPlan, rightPlan, nestedLoopJoinExpr, null, outerJoin);
                 if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
                     commutativeNljPlan =
-                            buildNLJoinPlan(rightJn, leftJn, rightPlan, leftPlan, nestedLoopJoinExpr, null);
+                            buildNLJoinPlan(rightJn, leftJn, rightPlan, leftPlan, nestedLoopJoinExpr, null, outerJoin);
                 }
-                cpPlan = buildCPJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, nestedLoopJoinExpr);
+                cpPlan = buildCPJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, nestedLoopJoinExpr,
+                        outerJoin);
                 if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
-                    commutativeCpPlan =
-                            buildCPJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, nestedLoopJoinExpr);
+                    commutativeCpPlan = buildCPJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr,
+                            nestedLoopJoinExpr, outerJoin);
                 }
             }
         } else if (hintNLJoin != null) {
             joinEnum.joinHints.put(hintNLJoin, null);
-            nljPlan = buildNLJoinPlan(leftJn, rightJn, leftPlan, rightPlan, nestedLoopJoinExpr, hintNLJoin);
+            nljPlan = buildNLJoinPlan(leftJn, rightJn, leftPlan, rightPlan, nestedLoopJoinExpr, hintNLJoin, outerJoin);
             if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
-                commutativeNljPlan =
-                        buildNLJoinPlan(rightJn, leftJn, rightPlan, leftPlan, nestedLoopJoinExpr, hintNLJoin);
+                commutativeNljPlan = buildNLJoinPlan(rightJn, leftJn, rightPlan, leftPlan, nestedLoopJoinExpr,
+                        hintNLJoin, outerJoin);
             }
             if (nljPlan == PlanNode.NO_PLAN && commutativeNljPlan == PlanNode.NO_PLAN) {
                 // Hints are attached to predicates, so newJoinConditions should not be empty, but adding the check to be safe.
@@ -1147,39 +1197,45 @@
                                         ErrorCode.INAPPLICABLE_HINT, "index nested loop join", "ignored"));
                     }
                 }
-                hjPlan = buildHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null);
+                hjPlan = buildHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null, outerJoin);
                 if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
-                    commutativeHjPlan = buildHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, null);
+                    commutativeHjPlan =
+                            buildHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, null, outerJoin);
                 }
-                bcastHjPlan = buildBroadcastHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null);
+                bcastHjPlan =
+                        buildBroadcastHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null, outerJoin);
                 if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
-                    commutativeBcastHjPlan =
-                            buildBroadcastHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, null);
+                    commutativeBcastHjPlan = buildBroadcastHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan,
+                            hashJoinExpr, null, outerJoin);
                 }
-                cpPlan = buildCPJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, nestedLoopJoinExpr);
+                cpPlan = buildCPJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, nestedLoopJoinExpr,
+                        outerJoin);
                 if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
-                    commutativeCpPlan =
-                            buildCPJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, nestedLoopJoinExpr);
+                    commutativeCpPlan = buildCPJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr,
+                            nestedLoopJoinExpr, outerJoin);
                 }
             }
         } else {
-            hjPlan = buildHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null);
+            hjPlan = buildHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null, outerJoin);
             if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
-                commutativeHjPlan = buildHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, null);
+                commutativeHjPlan =
+                        buildHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, null, outerJoin);
             }
-            bcastHjPlan = buildBroadcastHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null);
+            bcastHjPlan =
+                    buildBroadcastHashJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, null, outerJoin);
             if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
                 commutativeBcastHjPlan =
-                        buildBroadcastHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, null);
+                        buildBroadcastHashJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, null, outerJoin);
             }
-            nljPlan = buildNLJoinPlan(leftJn, rightJn, leftPlan, rightPlan, nestedLoopJoinExpr, null);
+            nljPlan = buildNLJoinPlan(leftJn, rightJn, leftPlan, rightPlan, nestedLoopJoinExpr, null, outerJoin);
             if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
-                commutativeNljPlan = buildNLJoinPlan(rightJn, leftJn, rightPlan, leftPlan, nestedLoopJoinExpr, null);
+                commutativeNljPlan =
+                        buildNLJoinPlan(rightJn, leftJn, rightPlan, leftPlan, nestedLoopJoinExpr, null, outerJoin);
             }
-            cpPlan = buildCPJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, nestedLoopJoinExpr);
+            cpPlan = buildCPJoinPlan(leftJn, rightJn, leftPlan, rightPlan, hashJoinExpr, nestedLoopJoinExpr, outerJoin);
             if (!joinEnum.forceJoinOrderMode || level <= joinEnum.cboFullEnumLevel) {
-                commutativeCpPlan =
-                        buildCPJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr, nestedLoopJoinExpr);
+                commutativeCpPlan = buildCPJoinPlan(rightJn, leftJn, rightPlan, leftPlan, hashJoinExpr,
+                        nestedLoopJoinExpr, outerJoin);
             }
         }
 
@@ -1235,16 +1291,21 @@
         List<PlanNode> allPlans = joinEnum.allPlans;
         StringBuilder sb = new StringBuilder(128);
         // This will avoid printing JoinNodes that have no plans
-        sb.append("Printing Join Node ").append(jnArrayIndex).append('\n');
-        sb.append("datasetNames ").append('\n');
+        if (IsBaseLevelJoinNode()) {
+            sb.append("\nPrinting Scan Node ").append(jnArrayIndex).append('\n');
+        } else {
+            sb.append("\nPrinting Join Node ").append(jnArrayIndex).append('\n');
+        }
+        sb.append("datasetNames ");
         for (String datasetName : datasetNames) {
-            // Need to not print newline
             sb.append(datasetName).append(' ');
         }
-        sb.append("datasetIndex ").append('\n');
+        sb.append('\n');
+        sb.append("datasetIndex ");
         for (int j = 0; j < datasetIndexes.size(); j++) {
-            sb.append(j).append(datasetIndexes.get(j)).append('\n');
+            sb.append(j).append(datasetIndexes.get(j));
         }
+        sb.append('\n');
         sb.append("datasetBits is ").append(datasetBits).append('\n');
         if (IsBaseLevelJoinNode()) {
             sb.append("orig cardinality is ").append((double) Math.round(origCardinality * 100) / 100).append('\n');
@@ -1253,11 +1314,18 @@
         if (planIndexesArray.size() == 0) {
             sb.append("No plans considered for this join node").append('\n');
         }
+        sb.append("jnIndex ").append(jnIndex).append('\n');
+        sb.append("datasetBits ").append(datasetBits).append('\n');
+        sb.append("cardinality ").append((double) Math.round(cardinality * 100) / 100).append('\n');
+        sb.append("size ").append((double) Math.round(size * 100) / 100).append('\n');
+        sb.append("level ").append(level).append('\n');
+        sb.append("highestDatasetId ").append(highestDatasetId).append('\n');
+        sb.append("----------------").append('\n');
         for (int j = 0; j < planIndexesArray.size(); j++) {
             int k = planIndexesArray.get(j);
             PlanNode pn = allPlans.get(k);
             sb.append("planIndexesArray  [").append(j).append("] is ").append(k).append('\n');
-            sb.append("Printing PlanNode ").append(k).append('\n');
+            sb.append("Printing Plan ").append(k).append('\n');
             if (IsBaseLevelJoinNode()) {
                 sb.append("DATA_SOURCE_SCAN").append('\n');
             } else {
@@ -1266,12 +1334,12 @@
                 sb.append("Printing Join expr ").append('\n');
                 if (pn.joinExpr != null) {
                     sb.append(pn.joinExpr).append('\n');
+                    sb.append("outer join " + pn.outerJoin).append('\n');
                 } else {
                     sb.append("null").append('\n');
                 }
             }
             sb.append("card ").append((double) Math.round(cardinality * 100) / 100).append('\n');
-            sb.append("------------------").append('\n');
             sb.append("operator cost ").append(pn.opCost.computeTotalCost()).append('\n');
             sb.append("total cost ").append(pn.totalCost.computeTotalCost()).append('\n');
             sb.append("jnIndexes ").append(pn.jnIndexes[0]).append(" ").append(pn.jnIndexes[1]).append('\n');
@@ -1283,9 +1351,10 @@
                 sb.append("planIndexes ").append(l).append(" ").append(r).append('\n');
                 sb.append("(lcost = ").append(leftPlan.totalCost.computeTotalCost()).append(") (rcost = ")
                         .append(rightPlan.totalCost.computeTotalCost()).append(")").append('\n');
+                sb.append("------------------").append('\n');
             }
-            sb.append("\n");
         }
+        sb.append("*****************************").append('\n');
         sb.append("jnIndex ").append(jnIndex).append('\n');
         sb.append("datasetBits ").append(datasetBits).append('\n');
         sb.append("cardinality ").append((double) Math.round(cardinality * 100) / 100).append('\n');
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinOperator.java
new file mode 100644
index 0000000..702c242
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinOperator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.optimizer.rules.cbo;
+
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+
+public class JoinOperator {
+    private final AbstractBinaryJoinOperator joinOp;
+    private boolean outerJoin;
+
+    public JoinOperator(AbstractBinaryJoinOperator joinOp) {
+        this.joinOp = joinOp;
+        outerJoin = false; // if this is really an outer join, this will be set to true
+    }
+
+    public void setOuterJoin(boolean value) {
+        outerJoin = value;
+    }
+
+    public boolean getOuterJoin() {
+        return outerJoin;
+    }
+
+    public AbstractBinaryJoinOperator getAbstractJoinOp() {
+        return joinOp;
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/PlanNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/PlanNode.java
index da4938b..9dc7d6c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/PlanNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/PlanNode.java
@@ -32,6 +32,7 @@
     protected static int NO_PLAN = -1;
 
     private final JoinEnum joinEnum;
+    boolean outerJoin = false;
     protected int allPlansIndex;
     protected int[] planIndexes;
     protected int[] jnIndexes;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ASTERIXDB-2402.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ASTERIXDB-2402.plan
new file mode 100644
index 0000000..33d5a4e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ASTERIXDB-2402.plan
@@ -0,0 +1,168 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- SUBPLAN  |PARTITIONED|
+              {
+                -- AGGREGATE  |LOCAL|
+                  -- ASSIGN  |LOCAL|
+                    -- MICRO_PRE_CLUSTERED_GROUP_BY[$$230]  |LOCAL|
+                            {
+                              -- AGGREGATE  |LOCAL|
+                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                            }
+                      -- MICRO_STABLE_SORT [$$230(ASC)]  |LOCAL|
+                        -- ASSIGN  |LOCAL|
+                          -- UNNEST  |LOCAL|
+                            -- SUBPLAN  |LOCAL|
+                                    {
+                                      -- AGGREGATE  |LOCAL|
+                                        -- ASSIGN  |LOCAL|
+                                          -- UNNEST  |LOCAL|
+                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                    }
+                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+              }
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- COMMIT  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- INSERT_DELETE  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$214]  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ASSIGN  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- PRE_CLUSTERED_GROUP_BY[$$267]  |PARTITIONED|
+                                          {
+                                            -- AGGREGATE  |LOCAL|
+                                              -- STREAM_SELECT  |LOCAL|
+                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                          }
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STABLE_SORT [$$267(ASC)]  |PARTITIONED|
+                                        -- HASH_PARTITION_EXCHANGE [$$267]  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- HYBRID_HASH_JOIN [$$266][$$237]  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- RUNNING_AGGREGATE  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- UNNEST  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- PRE_CLUSTERED_GROUP_BY[$$319]  |PARTITIONED|
+                                                                  {
+                                                                    -- AGGREGATE  |LOCAL|
+                                                                      -- MICRO_PRE_CLUSTERED_GROUP_BY[$$321, $$322]  |LOCAL|
+                                                                              {
+                                                                                -- AGGREGATE  |LOCAL|
+                                                                                  -- STREAM_SELECT  |LOCAL|
+                                                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                              }
+                                                                        -- STREAM_SELECT  |LOCAL|
+                                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                  }
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- STABLE_SORT [$$319(ASC), $$321(ASC), $$322(ASC)]  |PARTITIONED|
+                                                                -- HASH_PARTITION_EXCHANGE [$$319]  |PARTITIONED|
+                                                                  -- UNION_ALL  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ASSIGN  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- BTREE_SEARCH (channels.Shelters.Shelters)  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- SPLIT  |PARTITIONED|
+                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                -- RTREE_SEARCH (channels.Shelters.s_location)  |PARTITIONED|
+                                                                                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                        -- NESTED_LOOP  |PARTITIONED|
+                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                            -- NESTED_LOOP  |PARTITIONED|
+                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                    -- DATASOURCE_SCAN (channels.EmergenciesNearMeChannelChannelSubscriptions)  |PARTITIONED|
+                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                                                  -- STREAM_SELECT  |PARTITIONED|
+                                                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                        -- DATASOURCE_SCAN (channels.Reports)  |PARTITIONED|
+                                                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                            -- ASSIGN  |PARTITIONED|
+                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                -- DATASOURCE_SCAN (channels.UserLocations)  |PARTITIONED|
+                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ASSIGN  |PARTITIONED|
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- SPLIT  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- RTREE_SEARCH (channels.Shelters.s_location)  |PARTITIONED|
+                                                                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- NESTED_LOOP  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- NESTED_LOOP  |PARTITIONED|
+                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                              -- DATASOURCE_SCAN (channels.EmergenciesNearMeChannelChannelSubscriptions)  |PARTITIONED|
+                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                                            -- STREAM_SELECT  |PARTITIONED|
+                                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                  -- DATASOURCE_SCAN (channels.Reports)  |PARTITIONED|
+                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                          -- DATASOURCE_SCAN (channels.UserLocations)  |PARTITIONED|
+                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$237]  |PARTITIONED|
+                                                  -- ASSIGN  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- HYBRID_HASH_JOIN [$$248, $$250][$$239, $$240]  |PARTITIONED|
+                                                          -- HASH_PARTITION_EXCHANGE [$$248, $$250]  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- DATASOURCE_SCAN (channels.EmergenciesNearMeChannelBrokerSubscriptions)  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                          -- HASH_PARTITION_EXCHANGE [$$239, $$240]  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- DATASOURCE_SCAN (channels.Broker)  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/btree-index-selection/intersection-misc/intersection-misc-01.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/btree-index-selection/intersection-misc/intersection-misc-01.plan
new file mode 100644
index 0000000..ee0f744
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/btree-index-selection/intersection-misc/intersection-misc-01.plan
@@ -0,0 +1,106 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- SORT_MERGE_EXCHANGE [$$142(ASC), $$132(ASC), $$144(ASC) ]  |PARTITIONED|
+            -- STABLE_SORT [$$142(ASC), $$132(ASC), $$144(ASC)]  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- HYBRID_HASH_JOIN [$$137][$$144]  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$137]  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- HYBRID_HASH_JOIN [$$142][$$136]  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$142]  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- STREAM_SELECT  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- PRE_CLUSTERED_GROUP_BY[$$130]  |PARTITIONED|
+                                                  {
+                                                    -- AGGREGATE  |LOCAL|
+                                                      -- STREAM_SELECT  |LOCAL|
+                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                  }
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- STABLE_SORT [$$130(ASC)]  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$130]  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- HYBRID_HASH_JOIN [$$139][$$85]  |PARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$$139]  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- STREAM_SELECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- REPLICATE  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- DATASOURCE_SCAN (test.d)  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- REPLICATE  |PARTITIONED|
+                                                            -- HASH_PARTITION_EXCHANGE [$$85]  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ASSIGN  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- DATASOURCE_SCAN (test.c)  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$136]  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ASSIGN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- REPLICATE  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- DATASOURCE_SCAN (test.d)  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$144]  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- PRE_CLUSTERED_GROUP_BY[$$133]  |PARTITIONED|
+                                          {
+                                            -- AGGREGATE  |LOCAL|
+                                              -- STREAM_SELECT  |LOCAL|
+                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                          }
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STABLE_SORT [$$133(ASC)]  |PARTITIONED|
+                                        -- HASH_PARTITION_EXCHANGE [$$133]  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- HYBRID_HASH_JOIN [$$140][$$104]  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$140]  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- STREAM_SELECT  |PARTITIONED|
+                                                      -- ASSIGN  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- REPLICATE  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- DATASOURCE_SCAN (test.d)  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- REPLICATE  |PARTITIONED|
+                                                          -- HASH_PARTITION_EXCHANGE [$$85]  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN (test.c)  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/btree-index-selection/intersection-misc/intersection-misc-02.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/btree-index-selection/intersection-misc/intersection-misc-02.plan
new file mode 100644
index 0000000..ee0f744
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/btree-index-selection/intersection-misc/intersection-misc-02.plan
@@ -0,0 +1,106 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- SORT_MERGE_EXCHANGE [$$142(ASC), $$132(ASC), $$144(ASC) ]  |PARTITIONED|
+            -- STABLE_SORT [$$142(ASC), $$132(ASC), $$144(ASC)]  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- HYBRID_HASH_JOIN [$$137][$$144]  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$137]  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- HYBRID_HASH_JOIN [$$142][$$136]  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$142]  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- STREAM_SELECT  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- PRE_CLUSTERED_GROUP_BY[$$130]  |PARTITIONED|
+                                                  {
+                                                    -- AGGREGATE  |LOCAL|
+                                                      -- STREAM_SELECT  |LOCAL|
+                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                  }
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- STABLE_SORT [$$130(ASC)]  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$130]  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- HYBRID_HASH_JOIN [$$139][$$85]  |PARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$$139]  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- STREAM_SELECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- REPLICATE  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- DATASOURCE_SCAN (test.d)  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- REPLICATE  |PARTITIONED|
+                                                            -- HASH_PARTITION_EXCHANGE [$$85]  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ASSIGN  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- DATASOURCE_SCAN (test.c)  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$136]  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ASSIGN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- REPLICATE  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- DATASOURCE_SCAN (test.d)  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$144]  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- PRE_CLUSTERED_GROUP_BY[$$133]  |PARTITIONED|
+                                          {
+                                            -- AGGREGATE  |LOCAL|
+                                              -- STREAM_SELECT  |LOCAL|
+                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                          }
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STABLE_SORT [$$133(ASC)]  |PARTITIONED|
+                                        -- HASH_PARTITION_EXCHANGE [$$133]  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- HYBRID_HASH_JOIN [$$140][$$104]  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$140]  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- STREAM_SELECT  |PARTITIONED|
+                                                      -- ASSIGN  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- REPLICATE  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- DATASOURCE_SCAN (test.d)  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- REPLICATE  |PARTITIONED|
+                                                          -- HASH_PARTITION_EXCHANGE [$$85]  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN (test.c)  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/btree-index-selection/intersection-misc/intersection-misc-03.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/btree-index-selection/intersection-misc/intersection-misc-03.plan
new file mode 100644
index 0000000..ee0f744
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/btree-index-selection/intersection-misc/intersection-misc-03.plan
@@ -0,0 +1,106 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- SORT_MERGE_EXCHANGE [$$142(ASC), $$132(ASC), $$144(ASC) ]  |PARTITIONED|
+            -- STABLE_SORT [$$142(ASC), $$132(ASC), $$144(ASC)]  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- HYBRID_HASH_JOIN [$$137][$$144]  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$137]  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- HYBRID_HASH_JOIN [$$142][$$136]  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$142]  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- STREAM_SELECT  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- PRE_CLUSTERED_GROUP_BY[$$130]  |PARTITIONED|
+                                                  {
+                                                    -- AGGREGATE  |LOCAL|
+                                                      -- STREAM_SELECT  |LOCAL|
+                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                  }
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- STABLE_SORT [$$130(ASC)]  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$130]  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- HYBRID_HASH_JOIN [$$139][$$85]  |PARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$$139]  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- STREAM_SELECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- REPLICATE  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- DATASOURCE_SCAN (test.d)  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- REPLICATE  |PARTITIONED|
+                                                            -- HASH_PARTITION_EXCHANGE [$$85]  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ASSIGN  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- DATASOURCE_SCAN (test.c)  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$136]  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    -- ASSIGN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- REPLICATE  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- DATASOURCE_SCAN (test.d)  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$144]  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- STREAM_SELECT  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- PRE_CLUSTERED_GROUP_BY[$$133]  |PARTITIONED|
+                                          {
+                                            -- AGGREGATE  |LOCAL|
+                                              -- STREAM_SELECT  |LOCAL|
+                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                          }
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STABLE_SORT [$$133(ASC)]  |PARTITIONED|
+                                        -- HASH_PARTITION_EXCHANGE [$$133]  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- HYBRID_HASH_JOIN [$$140][$$104]  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$140]  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- STREAM_SELECT  |PARTITIONED|
+                                                      -- ASSIGN  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- REPLICATE  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- DATASOURCE_SCAN (test.d)  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- REPLICATE  |PARTITIONED|
+                                                          -- HASH_PARTITION_EXCHANGE [$$85]  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN (test.c)  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q5.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q5.plan
index 95ff63d3..19bf0b1 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q5.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q5.plan
@@ -8,12 +8,12 @@
               -- STREAM_PROJECT  |PARTITIONED|
                 -- ASSIGN  |PARTITIONED|
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                    -- SORT_GROUP_BY[$$291]  |PARTITIONED|
+                    -- SORT_GROUP_BY[$$292]  |PARTITIONED|
                             {
                               -- AGGREGATE  |LOCAL|
                                 -- NESTED_TUPLE_SOURCE  |LOCAL|
                             }
-                      -- HASH_PARTITION_EXCHANGE [$$291]  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$292]  |PARTITIONED|
                         -- SORT_GROUP_BY[$$275]  |PARTITIONED|
                                 {
                                   -- AGGREGATE  |LOCAL|
@@ -22,76 +22,77 @@
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                             -- STREAM_PROJECT  |PARTITIONED|
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- HYBRID_HASH_JOIN [$$257][$$256]  |PARTITIONED|
-                                  -- HASH_PARTITION_EXCHANGE [$$257]  |PARTITIONED|
+                                -- HYBRID_HASH_JOIN [$$274, $$290][$$266, $$269]  |PARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$274, $$290]  |PARTITIONED|
                                     -- STREAM_PROJECT  |PARTITIONED|
-                                      -- STREAM_SELECT  |PARTITIONED|
-                                        -- ASSIGN  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- DATASOURCE_SCAN (test.region)  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                  -- HASH_PARTITION_EXCHANGE [$$256]  |PARTITIONED|
-                                    -- STREAM_PROJECT  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- HYBRID_HASH_JOIN [$$274, $$268][$$266, $$269]  |PARTITIONED|
-                                          -- HASH_PARTITION_EXCHANGE [$$274, $$268]  |PARTITIONED|
-                                            -- STREAM_PROJECT  |PARTITIONED|
-                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- HYBRID_HASH_JOIN [$$258][$$274]  |PARTITIONED|
-                                                  -- HASH_PARTITION_EXCHANGE [$$258]  |PARTITIONED|
-                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- HYBRID_HASH_JOIN [$$260, $$254, $$263][$$276, $$277, $$278]  |PARTITIONED|
-                                                          -- HASH_PARTITION_EXCHANGE [$$260, $$254, $$263]  |PARTITIONED|
-                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- HYBRID_HASH_JOIN [$$245, $$246][$$254, $$281]  |PARTITIONED|
-                                                                  -- HASH_PARTITION_EXCHANGE [$$245, $$246]  |PARTITIONED|
-                                                                    -- ASSIGN  |PARTITIONED|
-                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                        -- ASSIGN  |PARTITIONED|
-                                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- DATASOURCE_SCAN (test.stock)  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                  -- HASH_PARTITION_EXCHANGE [$$254, $$281]  |PARTITIONED|
-                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                      -- ASSIGN  |PARTITIONED|
-                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- UNNEST  |PARTITIONED|
-                                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                                              -- STREAM_SELECT  |PARTITIONED|
-                                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                                  -- ASSIGN  |PARTITIONED|
-                                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                        -- DATASOURCE_SCAN (test.orders)  |PARTITIONED|
-                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                          -- HASH_PARTITION_EXCHANGE [$$276, $$277, $$278]  |PARTITIONED|
-                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                              -- ASSIGN  |PARTITIONED|
-                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- DATASOURCE_SCAN (test.customer)  |PARTITIONED|
-                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                  -- HASH_PARTITION_EXCHANGE [$$274]  |PARTITIONED|
-                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                      -- ASSIGN  |PARTITIONED|
-                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- DATASOURCE_SCAN (test.nation)  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                          -- HASH_PARTITION_EXCHANGE [$$266, $$269]  |PARTITIONED|
-                                            -- STREAM_PROJECT  |PARTITIONED|
-                                              -- ASSIGN  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- HYBRID_HASH_JOIN [$$256][$$257]  |PARTITIONED|
+                                              -- HASH_PARTITION_EXCHANGE [$$256]  |PARTITIONED|
                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- DATASOURCE_SCAN (test.supplier)  |PARTITIONED|
-                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                    -- HYBRID_HASH_JOIN [$$258][$$274]  |PARTITIONED|
+                                                      -- HASH_PARTITION_EXCHANGE [$$258]  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- HYBRID_HASH_JOIN [$$260, $$254, $$263][$$276, $$277, $$278]  |PARTITIONED|
+                                                              -- HASH_PARTITION_EXCHANGE [$$260, $$254, $$263]  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- HYBRID_HASH_JOIN [$$245, $$246][$$254, $$281]  |PARTITIONED|
+                                                                      -- HASH_PARTITION_EXCHANGE [$$245, $$246]  |PARTITIONED|
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- ASSIGN  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- DATASOURCE_SCAN (test.stock)  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                      -- HASH_PARTITION_EXCHANGE [$$254, $$281]  |PARTITIONED|
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- ASSIGN  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              -- UNNEST  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- STREAM_SELECT  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                            -- DATASOURCE_SCAN (test.orders)  |PARTITIONED|
+                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                              -- HASH_PARTITION_EXCHANGE [$$276, $$277, $$278]  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ASSIGN  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- DATASOURCE_SCAN (test.customer)  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                      -- HASH_PARTITION_EXCHANGE [$$274]  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- DATASOURCE_SCAN (test.nation)  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                              -- HASH_PARTITION_EXCHANGE [$$257]  |PARTITIONED|
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  -- STREAM_SELECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- DATASOURCE_SCAN (test.region)  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                  -- HASH_PARTITION_EXCHANGE [$$266, $$269]  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- DATASOURCE_SCAN (test.supplier)  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q7.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q7.plan
index 98a1725..91cd909 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q7.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q7.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$su_nationkey(ASC), $#1(ASC), $#2(ASC) ]  |PARTITIONED|
-          -- SORT_GROUP_BY[$$325, $$326, $$327]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$326, $$327, $$328]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_EXCHANGE [$$325, $$326, $$327]  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$326, $$327, $$328]  |PARTITIONED|
               -- SORT_GROUP_BY[$$277, $$273, $$274]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
@@ -21,79 +21,80 @@
                         -- STREAM_SELECT  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- HYBRID_HASH_JOIN [$$288][$$304]  |PARTITIONED|
-                                -- HASH_PARTITION_EXCHANGE [$$288]  |PARTITIONED|
+                              -- HYBRID_HASH_JOIN [$$277][$$303]  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                   -- STREAM_PROJECT  |PARTITIONED|
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- HYBRID_HASH_JOIN [$$277][$$303]  |PARTITIONED|
-                                        -- HASH_PARTITION_EXCHANGE [$$277]  |PARTITIONED|
+                                      -- HYBRID_HASH_JOIN [$$324][$$300]  |PARTITIONED|
+                                        -- HASH_PARTITION_EXCHANGE [$$324]  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- HYBRID_HASH_JOIN [$$299][$$300]  |PARTITIONED|
-                                                -- HASH_PARTITION_EXCHANGE [$$299]  |PARTITIONED|
-                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- HYBRID_HASH_JOIN [$$291, $$293, $$295][$$305, $$306, $$307]  |PARTITIONED|
-                                                        -- HASH_PARTITION_EXCHANGE [$$291, $$293, $$295]  |PARTITIONED|
-                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- HYBRID_HASH_JOIN [$$275, $$276][$$310, $$311]  |PARTITIONED|
-                                                                -- HASH_PARTITION_EXCHANGE [$$275, $$276]  |PARTITIONED|
-                                                                  -- ASSIGN  |PARTITIONED|
-                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                      -- ASSIGN  |PARTITIONED|
-                                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- DATASOURCE_SCAN (test.stock)  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                -- HASH_PARTITION_EXCHANGE [$$310, $$311]  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                    -- STREAM_SELECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- HYBRID_HASH_JOIN [$$288][$$304]  |PARTITIONED|
+                                                    -- HASH_PARTITION_EXCHANGE [$$288]  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- HYBRID_HASH_JOIN [$$291, $$293, $$295][$$305, $$306, $$307]  |PARTITIONED|
+                                                            -- HASH_PARTITION_EXCHANGE [$$291, $$293, $$295]  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- HYBRID_HASH_JOIN [$$275, $$276][$$310, $$311]  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$275, $$276]  |PARTITIONED|
                                                                       -- STREAM_PROJECT  |PARTITIONED|
                                                                         -- ASSIGN  |PARTITIONED|
                                                                           -- STREAM_PROJECT  |PARTITIONED|
-                                                                            -- UNNEST  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- DATASOURCE_SCAN (test.stock)  |PARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$310, $$311]  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ASSIGN  |PARTITIONED|
                                                                               -- STREAM_PROJECT  |PARTITIONED|
-                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                -- UNNEST  |PARTITIONED|
                                                                                   -- STREAM_PROJECT  |PARTITIONED|
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- DATASOURCE_SCAN (test.orders)  |PARTITIONED|
+                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                      -- STREAM_PROJECT  |PARTITIONED|
                                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                        -- HASH_PARTITION_EXCHANGE [$$305, $$306, $$307]  |PARTITIONED|
-                                                          -- ASSIGN  |PARTITIONED|
-                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                                          -- DATASOURCE_SCAN (test.orders)  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                            -- HASH_PARTITION_EXCHANGE [$$305, $$306, $$307]  |PARTITIONED|
                                                               -- ASSIGN  |PARTITIONED|
                                                                 -- STREAM_PROJECT  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- DATASOURCE_SCAN (test.customer)  |PARTITIONED|
+                                                                  -- ASSIGN  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
                                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                -- HASH_PARTITION_EXCHANGE [$$300]  |PARTITIONED|
-                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                    -- ASSIGN  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- DATASOURCE_SCAN (test.customer)  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                    -- HASH_PARTITION_EXCHANGE [$$304]  |PARTITIONED|
+                                                      -- REPLICATE  |PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- DATASOURCE_SCAN (test.supplier)  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- REPLICATE  |PARTITIONED|
-                                            -- HASH_PARTITION_EXCHANGE [$$303]  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ASSIGN  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- DATASOURCE_SCAN (test.nation)  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                        -- HASH_PARTITION_EXCHANGE [$$300]  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
                                               -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ASSIGN  |PARTITIONED|
-                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- DATASOURCE_SCAN (test.supplier)  |PARTITIONED|
                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- DATASOURCE_SCAN (test.nation)  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                -- BROADCAST_EXCHANGE  |PARTITIONED|
                                   -- STREAM_PROJECT  |PARTITIONED|
                                     -- ASSIGN  |PARTITIONED|
                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                         -- REPLICATE  |PARTITIONED|
-                                          -- HASH_PARTITION_EXCHANGE [$$303]  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                             -- STREAM_PROJECT  |PARTITIONED|
                                               -- ASSIGN  |PARTITIONED|
                                                 -- STREAM_PROJECT  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q8.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q8.plan
index 9d2969e..16343e7 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q8.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q8.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$#1(ASC) ]  |PARTITIONED|
-          -- SORT_GROUP_BY[$$349]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$350]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_EXCHANGE [$$349]  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$350]  |PARTITIONED|
               -- SORT_GROUP_BY[$$294]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
@@ -20,90 +20,91 @@
                       -- STREAM_PROJECT  |PARTITIONED|
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                           -- HYBRID_HASH_JOIN [$$325][$$326]  |PARTITIONED|
-                            -- HASH_PARTITION_EXCHANGE [$$325]  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                               -- STREAM_PROJECT  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- HYBRID_HASH_JOIN [$$320][$$341]  |PARTITIONED|
-                                    -- HASH_PARTITION_EXCHANGE [$$320]  |PARTITIONED|
+                                  -- HYBRID_HASH_JOIN [$$347][$$341]  |PARTITIONED|
+                                    -- HASH_PARTITION_EXCHANGE [$$347]  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- HYBRID_HASH_JOIN [$$308][$$309]  |PARTITIONED|
-                                            -- HASH_PARTITION_EXCHANGE [$$308]  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- HYBRID_HASH_JOIN [$$311][$$331]  |PARTITIONED|
-                                                    -- HASH_PARTITION_EXCHANGE [$$311]  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- HYBRID_HASH_JOIN [$$313, $$315, $$317][$$333, $$334, $$332]  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$313, $$315, $$317]  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- HYBRID_HASH_JOIN [$$306][$$324]  |PARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$306]  |PARTITIONED|
-                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- HYBRID_HASH_JOIN [$$296, $$295][$$306, $$336]  |PARTITIONED|
-                                                                            -- HASH_PARTITION_EXCHANGE [$$296, $$295]  |PARTITIONED|
-                                                                              -- ASSIGN  |PARTITIONED|
-                                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                                  -- ASSIGN  |PARTITIONED|
-                                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                        -- DATASOURCE_SCAN (test.stock)  |PARTITIONED|
-                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                            -- HASH_PARTITION_EXCHANGE [$$306, $$336]  |PARTITIONED|
-                                                                              -- STREAM_SELECT  |PARTITIONED|
-                                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                                  -- ASSIGN  |PARTITIONED|
-                                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                                      -- UNNEST  |PARTITIONED|
-                                                                                        -- STREAM_SELECT  |PARTITIONED|
-                                                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                                                            -- ASSIGN  |PARTITIONED|
-                                                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                  -- DATASOURCE_SCAN (test.orders)  |PARTITIONED|
-                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$324]  |PARTITIONED|
-                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                        -- STREAM_SELECT  |PARTITIONED|
-                                                                          -- ASSIGN  |PARTITIONED|
-                                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- DATASOURCE_SCAN (test.item)  |PARTITIONED|
-                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$333, $$334, $$332]  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- HYBRID_HASH_JOIN [$$308][$$309]  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$308]  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- HYBRID_HASH_JOIN [$$311][$$331]  |PARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$$311]  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- HYBRID_HASH_JOIN [$$313, $$315, $$317][$$333, $$334, $$332]  |PARTITIONED|
+                                                                -- HASH_PARTITION_EXCHANGE [$$313, $$315, $$317]  |PARTITIONED|
                                                                   -- STREAM_PROJECT  |PARTITIONED|
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- DATASOURCE_SCAN (test.customer)  |PARTITIONED|
+                                                                      -- HYBRID_HASH_JOIN [$$306][$$324]  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$306]  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- HYBRID_HASH_JOIN [$$296, $$295][$$306, $$336]  |PARTITIONED|
+                                                                                -- HASH_PARTITION_EXCHANGE [$$296, $$295]  |PARTITIONED|
+                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- DATASOURCE_SCAN (test.stock)  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                -- HASH_PARTITION_EXCHANGE [$$306, $$336]  |PARTITIONED|
+                                                                                  -- STREAM_SELECT  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                          -- UNNEST  |PARTITIONED|
+                                                                                            -- STREAM_SELECT  |PARTITIONED|
+                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                -- ASSIGN  |PARTITIONED|
+                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- DATASOURCE_SCAN (test.orders)  |PARTITIONED|
+                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                        -- HASH_PARTITION_EXCHANGE [$$324]  |PARTITIONED|
+                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                            -- STREAM_SELECT  |PARTITIONED|
+                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- DATASOURCE_SCAN (test.item)  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                -- HASH_PARTITION_EXCHANGE [$$333, $$334, $$332]  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ASSIGN  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                    -- HASH_PARTITION_EXCHANGE [$$331]  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ASSIGN  |PARTITIONED|
-                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- REPLICATE  |PARTITIONED|
+                                                                          -- DATASOURCE_SCAN (test.customer)  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$$331]  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ASSIGN  |PARTITIONED|
                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- REPLICATE  |PARTITIONED|
                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- DATASOURCE_SCAN (test.nation)  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
                                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                            -- HASH_PARTITION_EXCHANGE [$$309]  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                -- STREAM_SELECT  |PARTITIONED|
-                                                  -- ASSIGN  |PARTITIONED|
-                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- DATASOURCE_SCAN (test.region)  |PARTITIONED|
+                                                                        -- DATASOURCE_SCAN (test.nation)  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$309]  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- STREAM_SELECT  |PARTITIONED|
+                                                      -- ASSIGN  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                            -- DATASOURCE_SCAN (test.region)  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                     -- HASH_PARTITION_EXCHANGE [$$341]  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
@@ -112,7 +113,7 @@
                                               -- DATASOURCE_SCAN (test.supplier)  |PARTITIONED|
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                   -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                            -- HASH_PARTITION_EXCHANGE [$$326]  |PARTITIONED|
+                            -- BROADCAST_EXCHANGE  |PARTITIONED|
                               -- STREAM_PROJECT  |PARTITIONED|
                                 -- ASSIGN  |PARTITIONED|
                                   -- STREAM_PROJECT  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q9.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q9.plan
index 9eafc6f..7283aa0 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q9.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/ch2/ch2_q9.plan
@@ -3,12 +3,12 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$n_name(ASC), $#1(DESC) ]  |PARTITIONED|
-          -- SORT_GROUP_BY[$$229, $$230]  |PARTITIONED|
+          -- SORT_GROUP_BY[$$230, $$231]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_EXCHANGE [$$229, $$230]  |PARTITIONED|
+            -- HASH_PARTITION_EXCHANGE [$$230, $$231]  |PARTITIONED|
               -- SORT_GROUP_BY[$$225, $$198]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
@@ -20,48 +20,49 @@
                       -- STREAM_PROJECT  |PARTITIONED|
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                           -- HYBRID_HASH_JOIN [$$213][$$214]  |PARTITIONED|
-                            -- HASH_PARTITION_EXCHANGE [$$213]  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                               -- STREAM_PROJECT  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- HYBRID_HASH_JOIN [$$210][$$224]  |PARTITIONED|
-                                    -- HASH_PARTITION_EXCHANGE [$$210]  |PARTITIONED|
+                                  -- HYBRID_HASH_JOIN [$$228][$$224]  |PARTITIONED|
+                                    -- HASH_PARTITION_EXCHANGE [$$228]  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- HYBRID_HASH_JOIN [$$206][$$207]  |PARTITIONED|
-                                            -- HASH_PARTITION_EXCHANGE [$$206]  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- HYBRID_HASH_JOIN [$$200, $$199][$$206, $$217]  |PARTITIONED|
-                                                    -- HASH_PARTITION_EXCHANGE [$$200, $$199]  |PARTITIONED|
+                                        -- ASSIGN  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- HYBRID_HASH_JOIN [$$206][$$207]  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$206]  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- HYBRID_HASH_JOIN [$$200, $$199][$$206, $$217]  |PARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$$200, $$199]  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ASSIGN  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- DATASOURCE_SCAN (test.stock)  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$$206, $$217]  |PARTITIONED|
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- ASSIGN  |PARTITIONED|
+                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                -- UNNEST  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ASSIGN  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- DATASOURCE_SCAN (test.orders)  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$207]  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- STREAM_SELECT  |PARTITIONED|
                                                       -- ASSIGN  |PARTITIONED|
                                                         -- STREAM_PROJECT  |PARTITIONED|
-                                                          -- ASSIGN  |PARTITIONED|
-                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- DATASOURCE_SCAN (test.stock)  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                    -- HASH_PARTITION_EXCHANGE [$$206, $$217]  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ASSIGN  |PARTITIONED|
-                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                            -- UNNEST  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- DATASOURCE_SCAN (test.orders)  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                            -- HASH_PARTITION_EXCHANGE [$$207]  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                -- STREAM_SELECT  |PARTITIONED|
-                                                  -- ASSIGN  |PARTITIONED|
-                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- DATASOURCE_SCAN (test.item)  |PARTITIONED|
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                            -- DATASOURCE_SCAN (test.item)  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                     -- HASH_PARTITION_EXCHANGE [$$224]  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
@@ -70,7 +71,7 @@
                                               -- DATASOURCE_SCAN (test.supplier)  |PARTITIONED|
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                   -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                            -- HASH_PARTITION_EXCHANGE [$$214]  |PARTITIONED|
+                            -- BROADCAST_EXCHANGE  |PARTITIONED|
                               -- STREAM_PROJECT  |PARTITIONED|
                                 -- ASSIGN  |PARTITIONED|
                                   -- STREAM_PROJECT  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/extract-common-operators/extract-common-operators.01.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/extract-common-operators/extract-common-operators.01.plan
index 29aafbf..c66d648 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/extract-common-operators/extract-common-operators.01.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/extract-common-operators/extract-common-operators.01.plan
@@ -13,78 +13,76 @@
                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- HYBRID_HASH_JOIN [$$414][$$408]  |PARTITIONED|
-                                -- HASH_PARTITION_EXCHANGE [$$414]  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
-                                    -- ASSIGN  |PARTITIONED|
-                                      -- STREAM_PROJECT  |PARTITIONED|
-                                        -- ASSIGN  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- REPLICATE  |PARTITIONED|
-                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- DATASOURCE_SCAN (Metadata.Synonym)  |PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                -- HASH_PARTITION_EXCHANGE [$$408]  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- HYBRID_HASH_JOIN [$$408][$$ds_name]  |PARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- REPLICATE  |PARTITIONED|
-                                            -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- BTREE_SEARCH (Metadata.Dataset.Dataset)  |PARTITIONED|
-                                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                      -- UNNEST  |UNPARTITIONED|
-                                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- REPLICATE  |PARTITIONED|
-                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                              -- UNNEST  |UNPARTITIONED|
-                                                -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                    -- HASH_PARTITION_EXCHANGE [$$410]  |PARTITIONED|
-                      -- STREAM_PROJECT  |PARTITIONED|
-                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- HYBRID_HASH_JOIN [$$428][$$412]  |PARTITIONED|
-                            -- HASH_PARTITION_EXCHANGE [$$428]  |PARTITIONED|
-                              -- STREAM_PROJECT  |PARTITIONED|
+                              -- HYBRID_HASH_JOIN [$$408][$$ds_name]  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- HYBRID_HASH_JOIN [$$410][$$syn_name]  |PARTITIONED|
-                                    -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
-                                      -- STREAM_PROJECT  |PARTITIONED|
-                                        -- ASSIGN  |PARTITIONED|
-                                          -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- REPLICATE  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- DATASOURCE_SCAN (Metadata.Synonym)  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                      -- UNNEST  |UNPARTITIONED|
-                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                            -- HASH_PARTITION_EXCHANGE [$$412]  |PARTITIONED|
-                              -- STREAM_PROJECT  |PARTITIONED|
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- HYBRID_HASH_JOIN [$$412][$$ds_name]  |PARTITIONED|
+                                  -- HYBRID_HASH_JOIN [$$408][$$414]  |PARTITIONED|
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                             -- REPLICATE  |PARTITIONED|
-                                              -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+                                              -- HASH_PARTITION_EXCHANGE [$$412]  |PARTITIONED|
                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                     -- BTREE_SEARCH (Metadata.Dataset.Dataset)  |PARTITIONED|
                                                       -- BROADCAST_EXCHANGE  |PARTITIONED|
                                                         -- UNNEST  |UNPARTITIONED|
                                                           -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- HASH_PARTITION_EXCHANGE [$$414]  |PARTITIONED|
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- REPLICATE  |PARTITIONED|
-                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                -- UNNEST  |UNPARTITIONED|
-                                                  -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- REPLICATE  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- DATASOURCE_SCAN (Metadata.Synonym)  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- REPLICATE  |PARTITIONED|
+                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                      -- UNNEST  |UNPARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                    -- HASH_PARTITION_EXCHANGE [$$410]  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- HYBRID_HASH_JOIN [$$412][$$ds_name]  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- HYBRID_HASH_JOIN [$$410][$$syn_name]  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- HYBRID_HASH_JOIN [$$412][$$428]  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- REPLICATE  |PARTITIONED|
+                                                -- HASH_PARTITION_EXCHANGE [$$412]  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- BTREE_SEARCH (Metadata.Dataset.Dataset)  |PARTITIONED|
+                                                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                          -- UNNEST  |UNPARTITIONED|
+                                                            -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                            -- HASH_PARTITION_EXCHANGE [$$428]  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- REPLICATE  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          -- DATASOURCE_SCAN (Metadata.Synonym)  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                      -- UNNEST  |UNPARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- REPLICATE  |PARTITIONED|
+                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                        -- UNNEST  |UNPARTITIONED|
+                                          -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/leftouterjoin/query-ASTERIXDB-2857.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/leftouterjoin/query-ASTERIXDB-2857.plan
new file mode 100644
index 0000000..18934ba
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/leftouterjoin/query-ASTERIXDB-2857.plan
@@ -0,0 +1,39 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$145(ASC), $$146(ASC), $#3(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$145(ASC), $$146(ASC), $#3(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- NESTED_LOOP  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- HYBRID_HASH_JOIN [$$136][$$137]  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$136]  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- BTREE_SEARCH (test.tenk.tenk)  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$137]  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- BTREE_SEARCH (test.tenk.tenk)  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- BTREE_SEARCH (test.tenk.tenk)  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/tpcds/query-ASTERIXDB-1581.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/tpcds/query-ASTERIXDB-1581.plan
new file mode 100644
index 0000000..2913fce
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/tpcds/query-ASTERIXDB-1581.plan
@@ -0,0 +1,204 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- NESTED_LOOP  |PARTITIONED|
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- BTREE_SEARCH (tpcds.item.item)  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- ASSIGN  |PARTITIONED|
+                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+      -- BROADCAST_EXCHANGE  |LOCAL|
+        -- STREAM_PROJECT  |LOCAL|
+          -- ASSIGN  |LOCAL|
+            -- STREAM_PROJECT  |LOCAL|
+              -- UNNEST  |LOCAL|
+                -- STREAM_PROJECT  |LOCAL|
+                  -- ASSIGN  |LOCAL|
+                    -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                      -- PRE_CLUSTERED_GROUP_BY[$$173]  |LOCAL|
+                              {
+                                -- AGGREGATE  |LOCAL|
+                                  -- AGGREGATE  |LOCAL|
+                                    -- STREAM_SELECT  |LOCAL|
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                              }
+                        -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                          -- STREAM_PROJECT  |LOCAL|
+                            -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                              -- HYBRID_HASH_JOIN [$$173][$$174]  |LOCAL|
+                                -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                                  -- PRE_CLUSTERED_GROUP_BY[$$109]  |LOCAL|
+                                          {
+                                            -- AGGREGATE  |LOCAL|
+                                              -- AGGREGATE  |LOCAL|
+                                                -- STREAM_SELECT  |LOCAL|
+                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                          }
+                                    -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                                      -- STABLE_SORT [$$109(ASC)]  |LOCAL|
+                                        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                          -- STREAM_PROJECT  |UNPARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                              -- HYBRID_HASH_JOIN [$$109][$$166]  |UNPARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                  -- STREAM_PROJECT  |UNPARTITIONED|
+                                                    -- ASSIGN  |UNPARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                        -- REPLICATE  |UNPARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                            -- STREAM_PROJECT  |UNPARTITIONED|
+                                                              -- ASSIGN  |UNPARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                  -- REPLICATE  |UNPARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                      -- AGGREGATE  |UNPARTITIONED|
+                                                                        -- AGGREGATE  |UNPARTITIONED|
+                                                                          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                                            -- AGGREGATE  |PARTITIONED|
+                                                                              -- STREAM_SELECT  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- REPLICATE  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                  -- NESTED_LOOP  |UNPARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                      -- STREAM_PROJECT  |UNPARTITIONED|
+                                                        -- ASSIGN  |UNPARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                            -- REPLICATE  |UNPARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                -- ASSIGN  |UNPARTITIONED|
+                                                                  -- STREAM_SELECT  |UNPARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                      -- REPLICATE  |UNPARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                          -- AGGREGATE  |UNPARTITIONED|
+                                                                            -- AGGREGATE  |UNPARTITIONED|
+                                                                              -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                                                -- AGGREGATE  |PARTITIONED|
+                                                                                  -- STREAM_SELECT  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- REPLICATE  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                      -- STREAM_PROJECT  |UNPARTITIONED|
+                                                        -- ASSIGN  |UNPARTITIONED|
+                                                          -- AGGREGATE  |UNPARTITIONED|
+                                                            -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                              -- AGGREGATE  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- STREAM_SELECT  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
+                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- ASSIGN  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- REPLICATE  |PARTITIONED|
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                                  -- NESTED_LOOP  |LOCAL|
+                                    -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                                      -- ASSIGN  |LOCAL|
+                                        -- STREAM_PROJECT  |LOCAL|
+                                          -- STREAM_SELECT  |LOCAL|
+                                            -- ASSIGN  |LOCAL|
+                                              -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                                                -- PRE_CLUSTERED_GROUP_BY[$$175]  |LOCAL|
+                                                        {
+                                                          -- AGGREGATE  |LOCAL|
+                                                            -- AGGREGATE  |LOCAL|
+                                                              -- STREAM_SELECT  |LOCAL|
+                                                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                        }
+                                                  -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                                                    -- STABLE_SORT [$$175(ASC)]  |LOCAL|
+                                                      -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                        -- STREAM_PROJECT  |UNPARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                            -- HYBRID_HASH_JOIN [$$175][$$176]  |UNPARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                -- REPLICATE  |UNPARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                    -- STREAM_PROJECT  |UNPARTITIONED|
+                                                                      -- ASSIGN  |UNPARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                          -- REPLICATE  |UNPARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                              -- AGGREGATE  |UNPARTITIONED|
+                                                                                -- AGGREGATE  |UNPARTITIONED|
+                                                                                  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                                                    -- AGGREGATE  |PARTITIONED|
+                                                                                      -- STREAM_SELECT  |PARTITIONED|
+                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- REPLICATE  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                -- REPLICATE  |UNPARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                    -- ASSIGN  |UNPARTITIONED|
+                                                                      -- STREAM_SELECT  |UNPARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                          -- REPLICATE  |UNPARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                              -- AGGREGATE  |UNPARTITIONED|
+                                                                                -- AGGREGATE  |UNPARTITIONED|
+                                                                                  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                                                    -- AGGREGATE  |PARTITIONED|
+                                                                                      -- STREAM_SELECT  |PARTITIONED|
+                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                          -- ASSIGN  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- REPLICATE  |PARTITIONED|
+                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                      -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                      -- STREAM_PROJECT  |UNPARTITIONED|
+                                        -- ASSIGN  |UNPARTITIONED|
+                                          -- AGGREGATE  |UNPARTITIONED|
+                                            -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                              -- AGGREGATE  |PARTITIONED|
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  -- STREAM_SELECT  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- ASSIGN  |PARTITIONED|
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- REPLICATE  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/tpcds/query-ASTERIXDB-1591.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/tpcds/query-ASTERIXDB-1591.plan
index bdf16c1..17eb2c4 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/tpcds/query-ASTERIXDB-1591.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/tpcds/query-ASTERIXDB-1591.plan
@@ -68,41 +68,41 @@
                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                   -- STREAM_PROJECT  |PARTITIONED|
                                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- HYBRID_HASH_JOIN [$$150][$$167]  |PARTITIONED|
-                                                                                        -- HASH_PARTITION_EXCHANGE [$$150]  |PARTITIONED|
+                                                                                      -- HYBRID_HASH_JOIN [$$161][$$151]  |PARTITIONED|
+                                                                                        -- HASH_PARTITION_EXCHANGE [$$161]  |PARTITIONED|
                                                                                           -- STREAM_PROJECT  |PARTITIONED|
                                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- HYBRID_HASH_JOIN [$$161][$$151]  |PARTITIONED|
-                                                                                                -- HASH_PARTITION_EXCHANGE [$$161]  |PARTITIONED|
+                                                                                              -- HYBRID_HASH_JOIN [$$150][$$167]  |PARTITIONED|
+                                                                                                -- HASH_PARTITION_EXCHANGE [$$150]  |PARTITIONED|
                                                                                                   -- ASSIGN  |PARTITIONED|
                                                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                       -- DATASOURCE_SCAN (tpcds.customer)  |PARTITIONED|
                                                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                           -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                                                -- HASH_PARTITION_EXCHANGE [$$151]  |PARTITIONED|
-                                                                                                  -- DATASOURCE_SCAN (tpcds.customer_address)  |PARTITIONED|
-                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                                        -- HASH_PARTITION_EXCHANGE [$$167]  |PARTITIONED|
-                                                                                          -- ASSIGN  |PARTITIONED|
-                                                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                -- HYBRID_HASH_JOIN [$$172][$$154]  |PARTITIONED|
-                                                                                                  -- HASH_PARTITION_EXCHANGE [$$172]  |PARTITIONED|
-                                                                                                    -- ASSIGN  |PARTITIONED|
-                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                          -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
-                                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                    -- REPLICATE  |PARTITIONED|
-                                                                                                      -- HASH_PARTITION_EXCHANGE [$$154]  |PARTITIONED|
-                                                                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                                                                -- HASH_PARTITION_EXCHANGE [$$167]  |PARTITIONED|
+                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                        -- HYBRID_HASH_JOIN [$$172][$$154]  |PARTITIONED|
+                                                                                                          -- HASH_PARTITION_EXCHANGE [$$172]  |PARTITIONED|
+                                                                                                            -- ASSIGN  |PARTITIONED|
+                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                  -- DATASOURCE_SCAN (tpcds.store_sales)  |PARTITIONED|
+                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                            -- DATASOURCE_SCAN (tpcds.date_dim)  |PARTITIONED|
-                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                            -- REPLICATE  |PARTITIONED|
+                                                                                                              -- HASH_PARTITION_EXCHANGE [$$154]  |PARTITIONED|
+                                                                                                                -- STREAM_SELECT  |PARTITIONED|
+                                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                    -- DATASOURCE_SCAN (tpcds.date_dim)  |PARTITIONED|
+                                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                        -- HASH_PARTITION_EXCHANGE [$$151]  |PARTITIONED|
+                                                                                          -- DATASOURCE_SCAN (tpcds.customer_address)  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                               -- HASH_PARTITION_EXCHANGE [$$169]  |PARTITIONED|
                                                                 -- ASSIGN  |PARTITIONED|
                                                                   -- STREAM_PROJECT  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.014.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.014.plan
new file mode 100644
index 0000000..6db5e85
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.014.plan
@@ -0,0 +1,111 @@
+distribute result [$$101] [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$101]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$101] <- [{"uname": $$uname, "cnt": $$105}] [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+        -- SORT_MERGE_EXCHANGE [$$105(DESC), $$uname(ASC) ]  |PARTITIONED|
+          order (DESC, $$105) (ASC, $$uname) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+          -- STABLE_SORT [$$105(DESC), $$uname(ASC)]  |PARTITIONED|
+            exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              group by ([$$uname := $$116]) decor ([]) {
+                        aggregate [$$105] <- [agg-sql-sum($$115)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- AGGREGATE  |LOCAL|
+                          nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                     } [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+              -- SORT_GROUP_BY[$$116]  |PARTITIONED|
+                exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                -- HASH_PARTITION_EXCHANGE [$$116]  |PARTITIONED|
+                  group by ([$$116 := $$102]) decor ([]) {
+                            aggregate [$$115] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- AGGREGATE  |LOCAL|
+                              nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                         } [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                  -- SORT_GROUP_BY[$$102]  |PARTITIONED|
+                    exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      project ([$$102]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        select ($$92) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_SELECT  |PARTITIONED|
+                          project ([$$92, $$102]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              group by ([$$114 := $$112]) decor ([$$102]) {
+                                        aggregate [$$92] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- AGGREGATE  |LOCAL|
+                                          select (not(is-missing($$113))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- STREAM_SELECT  |LOCAL|
+                                            nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                     } [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                              -- PRE_CLUSTERED_GROUP_BY[$$112]  |PARTITIONED|
+                                exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  order (ASC, $$112) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                                  -- STABLE_SORT [$$112(ASC)]  |PARTITIONED|
+                                    exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                                    -- HASH_PARTITION_EXCHANGE [$$112]  |PARTITIONED|
+                                      project ([$$102, $$113, $$112]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          left outer join (eq($$106, $$88)) [cardinality: 2.21, op-cost: 4.2, total-cost: 12.6]
+                                          -- HYBRID_HASH_JOIN [$$106][$$88]  |PARTITIONED|
+                                            exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                                            -- HASH_PARTITION_EXCHANGE [$$106]  |PARTITIONED|
+                                              running-aggregate [$$112] <- [create-query-uid()] [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                                              -- RUNNING_AGGREGATE  |PARTITIONED|
+                                                project ([$$102, $$106]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  assign [$$106] <- [$$ht1.getField("display_url")] [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                                                  -- ASSIGN  |PARTITIONED|
+                                                    project ([$$102, $$ht1]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      unnest $$ht1 <- scan-collection($$107) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                                                      -- UNNEST  |PARTITIONED|
+                                                        project ([$$107, $$102]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                          assign [$$107, $$102] <- [$$p1.getField("entities").getField("urls"), $$p1.getField("user").getField("name")] [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                                                          -- ASSIGN  |PARTITIONED|
+                                                            project ([$$p1]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.1]
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                data-scan []<-[$$103, $$p1] <- test.ColumnDataset project ({entities:{urls:[{display_url:any}]},user:{name:any}}) [cardinality: 2.0, op-cost: 2.1, total-cost: 2.1]
+                                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- HASH_PARTITION_EXCHANGE [$$88]  |PARTITIONED|
+                                              project ([$$113, $$88]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                assign [$$113, $$88] <- [true, $$ht2.getField("display_url")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- ASSIGN  |PARTITIONED|
+                                                  project ([$$ht2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    unnest $$ht2 <- scan-collection($$108) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- UNNEST  |PARTITIONED|
+                                                      project ([$$108]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        assign [$$108] <- [$$p2.getField("entities").getField("urls")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          project ([$$p2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              data-scan []<-[$$104, $$p2] <- test.RowDataset [cardinality: 2.0, op-cost: 2.1, total-cost: 2.1]
+                                                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/join/hash-join-with-redundant-variable/hash-join-with-redundant-variable.08.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/join/hash-join-with-redundant-variable/hash-join-with-redundant-variable.08.plan
new file mode 100644
index 0000000..bb2f997
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/join/hash-join-with-redundant-variable/hash-join-with-redundant-variable.08.plan
@@ -0,0 +1,50 @@
+distribute result [$$38] [cardinality: 1500.0, op-cost: 0.0, total-cost: 1500.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 1500.0, op-cost: 0.0, total-cost: 1500.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$38]) [cardinality: 1500.0, op-cost: 0.0, total-cost: 1500.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$38] <- [{"o_orderkey": $$43, "l_orderkey": $$44, "l_suppkey": $$47}] [cardinality: 1500.0, op-cost: 0.0, total-cost: 1500.0]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 1500.0, op-cost: 0.0, total-cost: 1500.0]
+        -- SORT_MERGE_EXCHANGE [$$43(ASC), $$44(ASC), $$47(ASC) ]  |PARTITIONED|
+          order (ASC, $$43) (ASC, $$44) (ASC, $$47) [cardinality: 1500.0, op-cost: 0.0, total-cost: 1500.0]
+          -- STABLE_SORT [$$43(ASC), $$44(ASC), $$47(ASC)]  |PARTITIONED|
+            exchange [cardinality: 1500.0, op-cost: 0.0, total-cost: 1500.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              project ([$$43, $$44, $$47]) [cardinality: 1500.0, op-cost: 0.0, total-cost: 1500.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                exchange [cardinality: 1500.0, op-cost: 0.0, total-cost: 1500.0]
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  left outer join (and(eq($$43, $$44), eq($$56, $$47))) [cardinality: 6005.0, op-cost: 7505.0, total-cost: 22515.0]
+                  -- HYBRID_HASH_JOIN [$$43, $$56][$$44, $$47]  |PARTITIONED|
+                    exchange [cardinality: 1500.0, op-cost: 0.0, total-cost: 1500.0]
+                    -- HASH_PARTITION_EXCHANGE [$$43, $$56]  |PARTITIONED|
+                      assign [$$56] <- [$$43] [cardinality: 1500.0, op-cost: 0.0, total-cost: 1500.0]
+                      -- ASSIGN  |PARTITIONED|
+                        project ([$$43]) [cardinality: 1500.0, op-cost: 0.0, total-cost: 1500.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          exchange [cardinality: 1500.0, op-cost: 0.0, total-cost: 1500.0]
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            data-scan []<-[$$43, $$o] <- tpch.Orders [cardinality: 1500.0, op-cost: 1500.0, total-cost: 1500.0]
+                            -- DATASOURCE_SCAN  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- HASH_PARTITION_EXCHANGE [$$44, $$47]  |PARTITIONED|
+                      project ([$$44, $$47]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$47] <- [$$l.getField(2)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$44, $$l]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$44, $$45, $$l] <- tpch.LineItem [cardinality: 6005.0, op-cost: 6005.0, total-cost: 6005.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|