[ASTERIXDB-3363][COMP] CBO not choosing indexnl join when there are
multiple join predicates.

Change-Id: Ic32b6a5d7b3c7b336d0f8aa2ee5add5d6cda59b8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18196
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: <murali.krishna@couchbase.com>
Reviewed-by: Vijay Sarathy <vijay.sarathy@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/cost/CostMethods.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/cost/CostMethods.java
index 3de972a..71e9b3c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/cost/CostMethods.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/cost/CostMethods.java
@@ -22,6 +22,7 @@
 import java.util.Map;
 
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.optimizer.rules.cbo.JoinNode;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -129,7 +130,7 @@
         return new Cost(DOP * rightJn.getCardinality());
     }
 
-    public Cost costIndexNLJoin(JoinNode jn) {
+    public Cost costIndexNLJoin(JoinNode jn, Index index) {
         JoinNode leftJn = jn.getLeftJn();
         JoinNode rightJn = jn.getRightJn();
         double origRightCard = rightJn.getOrigCardinality();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/cost/ICostMethods.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/cost/ICostMethods.java
index a1dd374..d8dd77f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/cost/ICostMethods.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/cost/ICostMethods.java
@@ -19,6 +19,7 @@
 
 package org.apache.asterix.optimizer.cost;
 
+import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.optimizer.rules.cbo.JoinNode;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
@@ -36,7 +37,7 @@
 
     Cost costBroadcastHashJoin(JoinNode currentJn);
 
-    Cost costIndexNLJoin(JoinNode currentJn);
+    Cost costIndexNLJoin(JoinNode currentJn, Index index);
 
     Cost costCartesianProductJoin(JoinNode currentJn);
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
index 4eb7ff3..b5041f0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
@@ -199,9 +199,11 @@
      * Simply picks the first index that it finds. TODO: Improve this decision
      * process by making it more systematic.
      */
-    protected Pair<IAccessMethod, Index> chooseBestIndex(Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs) {
+    protected Pair<IAccessMethod, Index> chooseBestIndex(Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs,
+            List<Pair<IAccessMethod, Index>> chosenIndexes) {
         List<Pair<IAccessMethod, Index>> list = new ArrayList<>();
         chooseAllIndexes(analyzedAMs, list);
+        chosenIndexes.addAll(list);
         return list.isEmpty() ? null : list.get(0);
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
index b2b5e7c..b7ae1b0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.TreeMap;
 
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
@@ -140,7 +141,9 @@
         afterJoinRefs = new ArrayList<>();
         // Recursively checks the given plan whether the desired pattern exists in it.
         // If so, try to optimize the plan.
-        boolean planTransformed = checkAndApplyJoinTransformation(opRef, context, false);
+        List<Pair<IAccessMethod, Index>> chosenIndexes = new ArrayList<>();
+        Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs = new TreeMap<>();
+        boolean planTransformed = checkAndApplyJoinTransformation(opRef, context, false, chosenIndexes, analyzedAMs);
 
         if (joinOp != null) {
             // We found an optimization here. Don't need to optimize this operator again.
@@ -156,7 +159,8 @@
         return planTransformed;
     }
 
-    public boolean checkApplicable(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+    public boolean checkApplicable(Mutable<ILogicalOperator> opRef, IOptimizationContext context,
+            List<Pair<IAccessMethod, Index>> chosenIndexes, Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs)
             throws AlgebricksException {
         clear();
         setMetadataDeclarations(context);
@@ -166,7 +170,7 @@
         afterJoinRefs = new ArrayList<>();
         // Recursively checks the given plan whether the desired pattern exists in it.
         // If so, try to optimize the plan.
-        boolean planTransformed = checkAndApplyJoinTransformation(opRef, context, true);
+        boolean planTransformed = checkAndApplyJoinTransformation(opRef, context, true, chosenIndexes, analyzedAMs);
 
         if (!planTransformed) {
             return false;
@@ -256,7 +260,8 @@
      * if it is not already optimized.
      */
     protected boolean checkAndApplyJoinTransformation(Mutable<ILogicalOperator> opRef, IOptimizationContext context,
-            boolean checkApplicableOnly) throws AlgebricksException {
+            boolean checkApplicableOnly, List<Pair<IAccessMethod, Index>> chosenIndexes,
+            Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs) throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
         boolean joinFoundAndOptimizationApplied;
 
@@ -267,7 +272,8 @@
         // Recursively check the plan and try to optimize it. We first check the children of the given operator
         // to make sure an earlier join in the path is optimized first.
         for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
-            joinFoundAndOptimizationApplied = checkAndApplyJoinTransformation(inputOpRef, context, checkApplicableOnly);
+            joinFoundAndOptimizationApplied = checkAndApplyJoinTransformation(inputOpRef, context, checkApplicableOnly,
+                    chosenIndexes, analyzedAMs);
             if (joinFoundAndOptimizationApplied) {
                 return true;
             }
@@ -321,8 +327,7 @@
 
             // For each access method, this contains the information about
             // whether an available index can be applicable or not.
-            Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs = null;
-            if (continueCheck) {
+            if (!checkApplicableOnly && continueCheck) {
                 analyzedAMs = new HashMap<>();
             }
 
@@ -334,7 +339,8 @@
                 // the subplan into the index branch and giving the join a condition for this rule to optimize.
                 // *No nodes* from this rewrite will be used beyond this point.
                 joinFromSubplanRewrite.findAfterSubplanSelectOperator(afterJoinRefs);
-                if (rewriteLocallyAndTransform(joinRef, context, joinFromSubplanRewrite, checkApplicableOnly)) {
+                if (rewriteLocallyAndTransform(joinRef, context, joinFromSubplanRewrite, checkApplicableOnly,
+                        chosenIndexes, analyzedAMs)) {
                     // Connect the after-join operators to the index subtree root before this rewrite. This also avoids
                     // performing the secondary index validation step twice.
                     ILogicalOperator lastAfterJoinOp = afterJoinRefs.get(afterJoinRefs.size() - 1).getValue();
@@ -393,7 +399,7 @@
 
                 // We are going to use indexes from the inner branch.
                 // If no index is available, then we stop here.
-                Pair<IAccessMethod, Index> chosenIndex = chooseBestIndex(analyzedAMs);
+                Pair<IAccessMethod, Index> chosenIndex = chooseBestIndex(analyzedAMs, chosenIndexes);
                 if (chosenIndex == null) {
                     context.addToDontApplySet(this, joinOp);
                     continueCheck = false;
@@ -512,13 +518,15 @@
     }
 
     private boolean rewriteLocallyAndTransform(Mutable<ILogicalOperator> opRef, IOptimizationContext context,
-            IIntroduceAccessMethodRuleLocalRewrite<AbstractBinaryJoinOperator> rewriter, boolean checkApplicableOnly)
+            IIntroduceAccessMethodRuleLocalRewrite<AbstractBinaryJoinOperator> rewriter, boolean checkApplicableOnly,
+            List<Pair<IAccessMethod, Index>> chosenIndexes, Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs)
             throws AlgebricksException {
         AbstractBinaryJoinOperator joinRewrite = rewriter.createOperator(joinOp, context);
         boolean transformationResult = false;
         if (joinRewrite != null) {
             Mutable<ILogicalOperator> joinRuleInput = new MutableObject<>(joinRewrite);
-            transformationResult = checkAndApplyJoinTransformation(joinRuleInput, context, checkApplicableOnly);
+            transformationResult = checkAndApplyJoinTransformation(joinRuleInput, context, checkApplicableOnly,
+                    chosenIndexes, analyzedAMs);
         }
 
         // Restore our state, so we can look for more optimizations if this transformation failed.
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
index 559b338..4a97acb 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
@@ -877,8 +877,11 @@
             // this annotation is needed for the physical optimizer to replace this with the unnest operator later
             AbstractFunctionCallExpression afcExpr = (AbstractFunctionCallExpression) expr;
             removeJoinAnnotations(afcExpr);
-            setAnnotation(afcExpr,
-                    plan.joinHint != null ? plan.joinHint : IndexedNLJoinExpressionAnnotation.INSTANCE_ANY_INDEX);
+            if (plan.joinHint != null) {
+                setAnnotation(afcExpr, plan.joinHint);
+            } else {
+                setAnnotation(plan.exprAndHint.first, plan.exprAndHint.second);
+            }
             if (LOGGER.isTraceEnabled()) {
                 LOGGER.trace("Added IndexedNLJoinExpressionAnnotation to " + afcExpr.toString());
             }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
index bb7485b..892a4ea 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
@@ -267,12 +267,22 @@
     }
 
     protected ILogicalExpression getNestedLoopJoinExpr(List<Integer> newJoinConditions) {
-        if (newJoinConditions.size() != 1) {
-            // may remove this restriction later if possible
-            return null;
+        if (newJoinConditions.size() == 0) {
+            // this is a cartesian product
+            return ConstantExpression.TRUE;
         }
-        JoinCondition jc = joinConditions.get(newJoinConditions.get(0));
-        return jc.joinCondition;
+        if (newJoinConditions.size() == 1) {
+            JoinCondition jc = joinConditions.get(newJoinConditions.get(0));
+            return jc.joinCondition;
+        }
+        ScalarFunctionCallExpression andExpr = new ScalarFunctionCallExpression(
+                BuiltinFunctions.getBuiltinFunctionInfo(AlgebricksBuiltinFunctions.AND));
+        for (int joinNum : newJoinConditions) {
+            // need to AND all the expressions.
+            JoinCondition jc = joinConditions.get(joinNum);
+            andExpr.getArguments().add(new MutableObject<>(jc.joinCondition));
+        }
+        return andExpr;
     }
 
     protected ILogicalExpression getHashJoinExpr(List<Integer> newJoinConditions) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
index 099dc4d..0ff630d 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
@@ -848,37 +848,46 @@
         return true;
     }
 
-    private boolean nestedLoopsApplicable(ILogicalExpression joinExpr) throws AlgebricksException {
-
-        List<LogicalVariable> usedVarList = new ArrayList<>();
-        joinExpr.getUsedVariables(usedVarList);
-        if (usedVarList.size() != 2) {
-            return false;
-        }
+    private boolean nestedLoopsApplicable(ILogicalExpression joinExpr, boolean outerJoin,
+            List<Pair<IAccessMethod, Index>> chosenIndexes, Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs)
+            throws AlgebricksException {
 
         if (joinExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
             return false;
         }
-
-        LogicalVariable var0 = usedVarList.get(0);
-        LogicalVariable var1 = usedVarList.get(1);
-
+        List<LogicalVariable> usedVarList = new ArrayList<>();
+        joinExpr.getUsedVariables(usedVarList);
+        if (usedVarList.size() != 2 && outerJoin) {
+            return false;
+        }
+        ILogicalOperator joinLeafInput0 = null;
+        ILogicalOperator joinLeafInput1 = null;
         // Find which joinLeafInput these vars belong to.
-        // go thru the leaf inputs and see where these variables came from
-        ILogicalOperator joinLeafInput0 = joinEnum.findLeafInput(Collections.singletonList(var0));
-        if (joinLeafInput0 == null) {
-            return false; // this should not happen unless an assignment is between two joins.
+        // go through the leaf inputs and see where these variables came from
+        for (LogicalVariable usedVar : usedVarList) {
+            ILogicalOperator joinLeafInput = joinEnum.findLeafInput(Collections.singletonList(usedVar));
+            if (joinLeafInput0 == null) {
+                joinLeafInput0 = joinLeafInput;
+            } else if (joinLeafInput1 == null && joinLeafInput != joinLeafInput0) {
+                joinLeafInput1 = joinLeafInput;
+            }
+            // This check ensures that the used variables in the join expression
+            // refer to not more than two leaf inputs.
+            if (joinLeafInput != joinLeafInput0 && joinLeafInput != joinLeafInput1) {
+                return false;
+            }
         }
 
-        ILogicalOperator joinLeafInput1 = joinEnum.findLeafInput(Collections.singletonList(var1));
-        if (joinLeafInput1 == null) {
+        // This check ensures that the used variables in the join expression
+        // refer to not less than two leaf inputs.
+        if (joinLeafInput0 == null || joinLeafInput1 == null) {
             return false;
         }
 
         ILogicalOperator innerLeafInput = this.leafInput;
 
         // This must equal one of the two joinLeafInputsHashMap found above. check for sanity!!
-        if (innerLeafInput != joinLeafInput1 && innerLeafInput != joinLeafInput0) {
+        if (innerLeafInput != joinLeafInput0 && innerLeafInput != joinLeafInput1) {
             return false; // This should not happen. So debug to find out why this happened.
         }
 
@@ -896,13 +905,15 @@
 
         // Now call the rewritePre code
         IntroduceJoinAccessMethodRule tmp = new IntroduceJoinAccessMethodRule();
-        boolean retVal = tmp.checkApplicable(new MutableObject<>(joinEnum.localJoinOp), joinEnum.optCtx);
+        boolean retVal = tmp.checkApplicable(new MutableObject<>(joinEnum.localJoinOp), joinEnum.optCtx, chosenIndexes,
+                analyzedAMs);
 
         return retVal;
     }
 
     private boolean NLJoinApplicable(JoinNode leftJn, JoinNode rightJn, boolean outerJoin,
-            ILogicalExpression nestedLoopJoinExpr) throws AlgebricksException {
+            ILogicalExpression nestedLoopJoinExpr, List<Pair<IAccessMethod, Index>> chosenIndexes,
+            Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs) throws AlgebricksException {
         if (nullExtendingSide(leftJn.datasetBits, outerJoin)) {
             return false;
         }
@@ -912,7 +923,8 @@
             return false; // nested loop plan not possible.
         }
 
-        if (nestedLoopJoinExpr == null || !rightJn.nestedLoopsApplicable(nestedLoopJoinExpr)) {
+        if (nestedLoopJoinExpr == null
+                || !rightJn.nestedLoopsApplicable(nestedLoopJoinExpr, outerJoin, chosenIndexes, analyzedAMs)) {
             return false;
         }
 
@@ -958,7 +970,7 @@
             totalCost = hjCost.costAdd(leftExchangeCost).costAdd(rightExchangeCost).costAdd(childCosts);
             if (this.cheapestPlanIndex == PlanNode.NO_PLAN || totalCost.costLT(this.cheapestPlanCost) || forceEnum) {
                 pn = new PlanNode(allPlans.size(), joinEnum, this, leftPlan, rightPlan, outerJoin);
-                pn.setJoinAndHintInfo(PlanNode.JoinMethod.HYBRID_HASH_JOIN, hashJoinExpr,
+                pn.setJoinAndHintInfo(PlanNode.JoinMethod.HYBRID_HASH_JOIN, hashJoinExpr, null,
                         HashJoinExpressionAnnotation.BuildSide.RIGHT, hintHashJoin);
                 pn.setJoinCosts(hjCost, totalCost, leftExchangeCost, rightExchangeCost);
                 planIndexesArray.add(pn.allPlansIndex);
@@ -998,7 +1010,7 @@
             totalCost = bcastHjCost.costAdd(rightExchangeCost).costAdd(childCosts);
             if (this.cheapestPlanIndex == PlanNode.NO_PLAN || totalCost.costLT(this.cheapestPlanCost) || forceEnum) {
                 pn = new PlanNode(allPlans.size(), joinEnum, this, leftPlan, rightPlan, outerJoin);
-                pn.setJoinAndHintInfo(PlanNode.JoinMethod.BROADCAST_HASH_JOIN, hashJoinExpr,
+                pn.setJoinAndHintInfo(PlanNode.JoinMethod.BROADCAST_HASH_JOIN, hashJoinExpr, null,
                         HashJoinExpressionAnnotation.BuildSide.RIGHT, hintBroadcastHashJoin);
                 pn.setJoinCosts(bcastHjCost, totalCost, leftExchangeCost, rightExchangeCost);
                 planIndexesArray.add(pn.allPlansIndex);
@@ -1024,11 +1036,39 @@
         this.leftJn = leftJn;
         this.rightJn = rightJn;
 
-        if (!NLJoinApplicable(leftJn, rightJn, outerJoin, nestedLoopJoinExpr)) {
+        List<Pair<IAccessMethod, Index>> chosenIndexes = new ArrayList<>();
+        Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs = new TreeMap<>();
+        if (!NLJoinApplicable(leftJn, rightJn, outerJoin, nestedLoopJoinExpr, chosenIndexes, analyzedAMs)) {
+            return PlanNode.NO_PLAN;
+        }
+        if (chosenIndexes.isEmpty()) {
             return PlanNode.NO_PLAN;
         }
 
-        nljCost = joinEnum.getCostMethodsHandle().costIndexNLJoin(this);
+        Pair<AbstractFunctionCallExpression, IndexedNLJoinExpressionAnnotation> exprAndHint = new Pair<>(null, null);
+        nljCost = joinEnum.getCostHandle().maxCost();
+        ICost curNljCost;
+        for (Map.Entry<IAccessMethod, AccessMethodAnalysisContext> amEntry : analyzedAMs.entrySet()) {
+            AccessMethodAnalysisContext analysisCtx = amEntry.getValue();
+            Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexIt =
+                    analysisCtx.getIteratorForIndexExprsAndVars();
+            List<IOptimizableFuncExpr> exprs = analysisCtx.getMatchedFuncExprs();
+            while (indexIt.hasNext()) {
+                Collection<String> indexNames = new ArrayList<>();
+                Map.Entry<Index, List<Pair<Integer, Integer>>> indexEntry = indexIt.next();
+                Index index = indexEntry.getKey();
+                AbstractFunctionCallExpression afce = buildExpr(exprs, indexEntry.getValue());
+                curNljCost = joinEnum.getCostMethodsHandle().costIndexNLJoin(this, index);
+                if (curNljCost.costLE(nljCost)) {
+                    nljCost = curNljCost;
+                    indexNames.add(index.getIndexName());
+                    exprAndHint = new Pair<>(afce, IndexedNLJoinExpressionAnnotation.newInstance(indexNames));
+                }
+            }
+        }
+        if (exprAndHint.first == null) {
+            return PlanNode.NO_PLAN;
+        }
         leftExchangeCost = joinEnum.getCostMethodsHandle().computeNLJOuterExchangeCost(this);
         rightExchangeCost = joinEnum.getCostHandle().zeroCost();
         childCosts = allPlans.get(leftPlan.allPlansIndex).totalCost;
@@ -1037,8 +1077,8 @@
                 hintNLJoin != null || joinEnum.forceJoinOrderMode || outerJoin || level <= joinEnum.cboFullEnumLevel;
         if (this.cheapestPlanIndex == PlanNode.NO_PLAN || totalCost.costLT(this.cheapestPlanCost) || forceEnum) {
             pn = new PlanNode(allPlans.size(), joinEnum, this, leftPlan, rightPlan, outerJoin);
-
-            pn.setJoinAndHintInfo(PlanNode.JoinMethod.INDEX_NESTED_LOOP_JOIN, nestedLoopJoinExpr, null, hintNLJoin);
+            pn.setJoinAndHintInfo(PlanNode.JoinMethod.INDEX_NESTED_LOOP_JOIN, nestedLoopJoinExpr, exprAndHint, null,
+                    hintNLJoin);
             pn.setJoinCosts(nljCost, totalCost, leftExchangeCost, rightExchangeCost);
             planIndexesArray.add(pn.allPlansIndex);
             allPlans.add(pn);
@@ -1093,7 +1133,7 @@
         if (this.cheapestPlanIndex == PlanNode.NO_PLAN || totalCost.costLT(this.cheapestPlanCost) || forceEnum) {
             pn = new PlanNode(allPlans.size(), joinEnum, this, leftPlan, rightPlan, outerJoin);
             pn.setJoinAndHintInfo(PlanNode.JoinMethod.CARTESIAN_PRODUCT_JOIN,
-                    Objects.requireNonNullElse(cpJoinExpr, ConstantExpression.TRUE), null, null);
+                    Objects.requireNonNullElse(cpJoinExpr, ConstantExpression.TRUE), null, null, null);
             pn.setJoinCosts(cpCost, totalCost, leftExchangeCost, rightExchangeCost);
             planIndexesArray.add(pn.allPlansIndex);
             allPlans.add(pn);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/PlanNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/PlanNode.java
index e514f5e..e51990e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/PlanNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/PlanNode.java
@@ -21,6 +21,7 @@
 
 import java.util.List;
 
+import org.apache.asterix.common.annotations.IndexedNLJoinExpressionAnnotation;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.optimizer.cost.ICost;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -38,7 +39,6 @@
     private final JoinEnum joinEnum;
 
     protected String datasetName;
-
     protected ILogicalOperator leafInput;
 
     protected JoinNode jn;
@@ -54,6 +54,8 @@
 
     protected ILogicalExpression joinExpr;
 
+    Pair<AbstractFunctionCallExpression, IndexedNLJoinExpressionAnnotation> exprAndHint;
+
     // Used to indicate which side to build for HJ and which side to broadcast for BHJ.
     protected HashJoinExpressionAnnotation.BuildSide side;
     protected IExpressionAnnotation joinHint;
@@ -291,9 +293,11 @@
     }
 
     protected void setJoinAndHintInfo(JoinMethod joinMethod, ILogicalExpression joinExpr,
+            Pair<AbstractFunctionCallExpression, IndexedNLJoinExpressionAnnotation> exprAndHint,
             HashJoinExpressionAnnotation.BuildSide side, IExpressionAnnotation hint) {
         joinOp = joinMethod;
         this.joinExpr = joinExpr;
+        this.exprAndHint = exprAndHint;
         this.side = side;
         joinHint = hint;
         numHintsUsed = joinEnum.allPlans.get(getLeftPlanIndex()).numHintsUsed
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/btree-index-join/hints-indexnl-params/hints-indexnl-params-4.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/btree-index-join/hints-indexnl-params/hints-indexnl-params-4.plan
new file mode 100644
index 0000000..29d7941
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/btree-index-join/hints-indexnl-params/hints-indexnl-params-4.plan
@@ -0,0 +1,24 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- SORT_MERGE_EXCHANGE [$$39(ASC) ]  |PARTITIONED|
+    -- STABLE_SORT [$$39(ASC)]  |PARTITIONED|
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- HYBRID_HASH_JOIN [$$36][$$37]  |PARTITIONED|
+              -- HASH_PARTITION_EXCHANGE [$$36]  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- BTREE_SEARCH (test.tenk1.tenk1)  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- ASSIGN  |PARTITIONED|
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              -- HASH_PARTITION_EXCHANGE [$$37]  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ASSIGN  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- DATASOURCE_SCAN (test.tenk2)  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/nested-open-index/inverted-index-join/ngram-contains_01_ps.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/nested-open-index/inverted-index-join/ngram-contains_01_ps.plan
new file mode 100644
index 0000000..84cd82b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/nested-open-index/inverted-index-join/ngram-contains_01_ps.plan
@@ -0,0 +1,44 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- STABLE_SORT [$$40(ASC), $$41(ASC)]  |PARTITIONED|
+              -- RANGE_PARTITION_EXCHANGE [$$40(ASC), $$41(ASC)]  |PARTITIONED|
+                -- FORWARD  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- REPLICATE  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_SELECT  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ASSIGN  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- BTREE_SEARCH (test.CSX.CSX)  |PARTITIONED|
+                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- DATASOURCE_SCAN (test.DBLP)  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                    -- AGGREGATE  |UNPARTITIONED|
+                      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                        -- AGGREGATE  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- REPLICATE  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STREAM_SELECT  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- BTREE_SEARCH (test.CSX.CSX)  |PARTITIONED|
+                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- DATASOURCE_SCAN (test.DBLP)  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/nested-open-index/inverted-index-join/ngram-contains_02_ps.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/nested-open-index/inverted-index-join/ngram-contains_02_ps.plan
new file mode 100644
index 0000000..74fe06d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/nested-open-index/inverted-index-join/ngram-contains_02_ps.plan
@@ -0,0 +1,44 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- STABLE_SORT [$$40(ASC), $$41(ASC)]  |PARTITIONED|
+              -- RANGE_PARTITION_EXCHANGE [$$40(ASC), $$41(ASC)]  |PARTITIONED|
+                -- FORWARD  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- REPLICATE  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_SELECT  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ASSIGN  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- BTREE_SEARCH (test.DBLP.DBLP)  |PARTITIONED|
+                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- DATASOURCE_SCAN (test.CSX)  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                    -- AGGREGATE  |UNPARTITIONED|
+                      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                        -- AGGREGATE  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- REPLICATE  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STREAM_SELECT  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- BTREE_SEARCH (test.DBLP.DBLP)  |PARTITIONED|
+                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- DATASOURCE_SCAN (test.CSX)  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/open-index-enforced/inverted-index-join/ngram-contains_01_ps.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/open-index-enforced/inverted-index-join/ngram-contains_01_ps.plan
new file mode 100644
index 0000000..84cd82b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results_cbo/open-index-enforced/inverted-index-join/ngram-contains_01_ps.plan
@@ -0,0 +1,44 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- STABLE_SORT [$$40(ASC), $$41(ASC)]  |PARTITIONED|
+              -- RANGE_PARTITION_EXCHANGE [$$40(ASC), $$41(ASC)]  |PARTITIONED|
+                -- FORWARD  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- REPLICATE  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STREAM_SELECT  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ASSIGN  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- BTREE_SEARCH (test.CSX.CSX)  |PARTITIONED|
+                                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- DATASOURCE_SCAN (test.DBLP)  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  -- BROADCAST_EXCHANGE  |PARTITIONED|
+                    -- AGGREGATE  |UNPARTITIONED|
+                      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                        -- AGGREGATE  |PARTITIONED|
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- REPLICATE  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STREAM_SELECT  |PARTITIONED|
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- BTREE_SEARCH (test.CSX.CSX)  |PARTITIONED|
+                                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- DATASOURCE_SCAN (test.DBLP)  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/join/hash-join-with-redundant-variable/hash-join-with-redundant-variable.06.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/join/hash-join-with-redundant-variable/hash-join-with-redundant-variable.06.plan
index 70813fa..668a8ec 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/join/hash-join-with-redundant-variable/hash-join-with-redundant-variable.06.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/join/hash-join-with-redundant-variable/hash-join-with-redundant-variable.06.plan
@@ -1,50 +1,38 @@
-distribute result [$$36] [cardinality: 6016.3, op-cost: 0.0, total-cost: 98061.73]
+distribute result [$$36] [cardinality: 6016.3, op-cost: 0.0, total-cost: 95054.55]
 -- DISTRIBUTE_RESULT  |PARTITIONED|
-  exchange [cardinality: 6016.3, op-cost: 0.0, total-cost: 98061.73]
+  exchange [cardinality: 6016.3, op-cost: 0.0, total-cost: 95054.55]
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    project ([$$36]) [cardinality: 6016.3, op-cost: 0.0, total-cost: 98061.73]
+    project ([$$36]) [cardinality: 6016.3, op-cost: 0.0, total-cost: 95054.55]
     -- STREAM_PROJECT  |PARTITIONED|
-      assign [$$36] <- [{"o_orderkey": $$43, "l_orderkey": $$44, "l_suppkey": $$42}] [cardinality: 6016.3, op-cost: 0.0, total-cost: 98061.73]
+      assign [$$36] <- [{"o_orderkey": $$43, "l_orderkey": $$44, "l_suppkey": $$42}] [cardinality: 6016.3, op-cost: 0.0, total-cost: 95054.55]
       -- ASSIGN  |PARTITIONED|
-        exchange [cardinality: 6016.3, op-cost: 0.0, total-cost: 98061.73]
+        exchange [cardinality: 6016.3, op-cost: 0.0, total-cost: 95054.55]
         -- SORT_MERGE_EXCHANGE [$$43(ASC), $$44(ASC), $$42(ASC) ]  |PARTITIONED|
-          order (ASC, $$43) (ASC, $$44) (ASC, $$42) [cardinality: 6016.3, op-cost: 75532.61, total-cost: 98061.73]
+          order (ASC, $$43) (ASC, $$44) (ASC, $$42) [cardinality: 6016.3, op-cost: 75532.61, total-cost: 95054.55]
           -- STABLE_SORT [$$43(ASC), $$44(ASC), $$42(ASC)]  |PARTITIONED|
-            exchange [cardinality: 6016.3, op-cost: 0.0, total-cost: 22529.12]
+            exchange [cardinality: 6016.3, op-cost: 0.0, total-cost: 19521.94]
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              project ([$$43, $$44, $$42]) [cardinality: 6016.3, op-cost: 0.0, total-cost: 22529.12]
-              -- STREAM_PROJECT  |PARTITIONED|
-                exchange [cardinality: 6016.3, op-cost: 0.0, total-cost: 22529.12]
-                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  join (and(eq($$43, $$44), eq($$49, $$42))) [cardinality: 6016.3, op-cost: 7512.06, total-cost: 22529.12]
-                  -- HYBRID_HASH_JOIN [$$44, $$42][$$43, $$49]  |PARTITIONED|
-                    exchange [cardinality: 6005.0, op-cost: 6010.65, total-cost: 12015.65]
-                    -- HASH_PARTITION_EXCHANGE [$$44, $$42]  |PARTITIONED|
-                      project ([$$44, $$42]) [cardinality: 6005.0, op-cost: 0.0, total-cost: 6005.0]
-                      -- STREAM_PROJECT  |PARTITIONED|
-                        assign [$$42] <- [$$l.getField(2)] [cardinality: 6005.0, op-cost: 0.0, total-cost: 6005.0]
-                        -- ASSIGN  |PARTITIONED|
-                          project ([$$44, $$l]) [cardinality: 6005.0, op-cost: 0.0, total-cost: 6005.0]
-                          -- STREAM_PROJECT  |PARTITIONED|
-                            exchange [cardinality: 6005.0, op-cost: 6010.65, total-cost: 12015.65]
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              data-scan []<-[$$44, $$45, $$l] <- tpch.LineItem [cardinality: 6005.0, op-cost: 6005.0, total-cost: 6005.0]
-                              -- DATASOURCE_SCAN  |PARTITIONED|
-                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                    exchange [cardinality: 1500.0, op-cost: 1501.41, total-cost: 3001.41]
-                    -- HASH_PARTITION_EXCHANGE [$$43, $$49]  |PARTITIONED|
-                      assign [$$49] <- [$$43] [cardinality: 1500.0, op-cost: 0.0, total-cost: 1500.0]
-                      -- ASSIGN  |PARTITIONED|
-                        project ([$$43]) [cardinality: 1500.0, op-cost: 0.0, total-cost: 1500.0]
-                        -- STREAM_PROJECT  |PARTITIONED|
-                          exchange [cardinality: 1500.0, op-cost: 1501.41, total-cost: 3001.41]
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            data-scan []<-[$$43, $$o] <- tpch.Orders [cardinality: 1500.0, op-cost: 1500.0, total-cost: 1500.0]
-                            -- DATASOURCE_SCAN  |PARTITIONED|
+              select (eq($$43, $$42)) [cardinality: 6016.3, op-cost: 12016.29, total-cost: 19521.94]
+              -- STREAM_SELECT  |PARTITIONED|
+                project ([$$43, $$44, $$42]) [cardinality: 6010.65, op-cost: 0.0, total-cost: 6005.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$42] <- [$$l.getField(2)] [cardinality: 6010.65, op-cost: 0.0, total-cost: 6005.0]
+                  -- ASSIGN  |PARTITIONED|
+                    project ([$$43, $$44, $$l]) [cardinality: 6010.65, op-cost: 0.0, total-cost: 6005.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      exchange [cardinality: 6010.65, op-cost: 0.0, total-cost: 6005.0]
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        unnest-map [$$44, $$45, $$l] <- index-search("LineItem", 0, "Default", "tpch", "LineItem", true, true, 1, $$43, 1, $$43, true, true, true) [cardinality: 6010.65, op-cost: 6005.0, total-cost: 6005.0]
+                        -- BTREE_SEARCH  |PARTITIONED|
+                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                            project ([$$43]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STREAM_PROJECT  |PARTITIONED|
                               exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                data-scan []<-[$$43, $$o] <- tpch.Orders [cardinality: 1500.0, op-cost: 1500.0, total-cost: 1500.0]
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|