[ASTERIXDB-3232][COMP] cleanup and simplification

Change-Id: Ie21d9d4bb9b24ff52974f6d60b2fa6e77dc5c28b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17663
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Vijay Sarathy <vijay.sarathy@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
index 3a482a4..52b4a4c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
@@ -23,7 +23,6 @@
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -59,6 +58,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.IPlanPrettyPrinter;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
@@ -76,11 +76,9 @@
     List<ILogicalOperator> newJoinOps;
     boolean[] unUsedJoinOps;
     List<JoinOperator> allJoinOps; // can be inner join or left outer join
-    HashMap<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap;
+    // Will be in the order of the from clause. Important for position ordering when assigning bits to join expressions.
+    List<ILogicalOperator> leafInputs;
     HashMap<LogicalVariable, Integer> varLeafInputIds;
-    // The data scan operators. Will be in the order of the from clause.
-    // Important for position ordering when assigning bits to join expressions.
-    List<Pair<EmptyTupleSourceOperator, DataSourceScanOperator>> emptyTupleAndDataSourceOps;
     List<Triple<Integer, Integer, Boolean>> buildSets; // the first is the bits and the second is the number of tables.
     List<Quadruple<Integer, Integer, JoinOperator, Integer>> outerJoinsDependencyList;
     List<AssignOperator> assignOps;
@@ -128,11 +126,8 @@
         //joinOps = new ArrayList<>();
         allJoinOps = new ArrayList<>();
         newJoinOps = new ArrayList<>();
-        joinLeafInputsHashMap = new HashMap<>();
+        leafInputs = new ArrayList<>();
         varLeafInputIds = new HashMap<>();
-        // The data scan operators. Will be in the order of the from clause.
-        // Important for position ordering when assigning bits to join expressions.
-        emptyTupleAndDataSourceOps = new ArrayList<>();
         outerJoinsDependencyList = new ArrayList<>();
         assignOps = new ArrayList<>();
         assignJoinExprs = new ArrayList<>();
@@ -149,47 +144,35 @@
 
         //convertOuterJoinstoJoinsIfPossible(outerJoinsDependencyList);
 
-        // if this happens, something in the input plan is not acceptable to the new code.
-        if (emptyTupleAndDataSourceOps.size() != joinLeafInputsHashMap.size()) {
-            throw new IllegalStateException(
-                    "ETS " + emptyTupleAndDataSourceOps.size() + " != LI " + joinLeafInputsHashMap.size());
-        }
-
         printPlan(pp, (AbstractLogicalOperator) op, "Original Whole plan2");
+        int numberOfFromTerms = leafInputs.size();
 
-        int numberOfFromTerms = emptyTupleAndDataSourceOps.size();
-
-        //String viewInPlan = new ALogicalPlanImpl(opRef).toString(); //useful when debugging
-        //System.out.println("viewInPlan");
-        //System.out.println(viewInPlan);
+        if (LOGGER.isTraceEnabled()) {
+            String viewInPlan = new ALogicalPlanImpl(opRef).toString(); //useful when debugging
+            LOGGER.trace("viewInPlan");
+            LOGGER.trace(viewInPlan);
+        }
 
         if (buildSets.size() > 1) {
             buildSets.sort(Comparator.comparingDouble(o -> o.second)); // sort on the number of tables in each set
             // we need to build the smaller sets first. So we need to find these first.
         }
-        joinEnum.initEnum((AbstractLogicalOperator) op, cboMode, cboTestMode, numberOfFromTerms,
-                emptyTupleAndDataSourceOps, joinLeafInputsHashMap, allJoinOps, assignOps, outerJoinsDependencyList,
-                buildSets, context);
+        joinEnum.initEnum((AbstractLogicalOperator) op, cboMode, cboTestMode, numberOfFromTerms, leafInputs, allJoinOps,
+                assignOps, outerJoinsDependencyList, buildSets, varLeafInputIds, context);
 
         if (cboMode) {
-            if (!doAllDataSourcesHaveSamples(emptyTupleAndDataSourceOps, context)) {
+            if (!doAllDataSourcesHaveSamples(leafInputs, context)) {
                 return false;
             }
         }
 
-        if (LOGGER.isTraceEnabled()) {
-            LOGGER.trace("---------------------------- Printing Leaf Inputs1");
-            printLeafPlans(pp, joinLeafInputsHashMap);
-        }
+        printLeafPlans(pp, leafInputs, "Inputs1");
 
         if (assignOps.size() > 0) {
-            pushAssignsIntoLeafInputs(pp, joinLeafInputsHashMap, assignOps, assignJoinExprs);
+            pushAssignsIntoLeafInputs(pp, leafInputs, assignOps, assignJoinExprs);
         }
 
-        if (LOGGER.isTraceEnabled()) {
-            LOGGER.trace("---------------------------- Printing Leaf Inputs2");
-            printLeafPlans(pp, joinLeafInputsHashMap);
-        }
+        printLeafPlans(pp, leafInputs, "Inputs2");
 
         int cheapestPlan = joinEnum.enumerateJoins(); // MAIN CALL INTO CBO
         if (cheapestPlan == PlanNode.NO_PLAN) {
@@ -209,9 +192,8 @@
                 return false; // there are some cases such as R OJ S on true. Here there is an OJ predicate but the code in findJoinConditions
                 // in JoinEnum does not capture this. Will fix later. Just bail for now.
             }
-            buildNewTree(cheapestPlanNode, joinLeafInputsHashMap, newJoinOps, new MutableInt(0));
+            buildNewTree(cheapestPlanNode, newJoinOps, new MutableInt(0));
             opRef.setValue(newJoinOps.get(0));
-            //String vp = new ALogicalPlanImpl(opRef).toString();
 
             if (assignOps.size() > 0) {
                 for (int i = assignOps.size() - 1; i >= 0; i--) {
@@ -231,15 +213,19 @@
 
             // this will be the new root
             opRef.setValue(root);
-            //String viewOutPlan = new ALogicalPlanImpl(opRef).toString(); //useful when debugging
-            //System.out.println("viewInPlan again");
-            //System.out.println(viewInPlan);
-            //System.out.println("viewOutPlan");
-            //System.out.println(viewOutPlan);
+
+            if (LOGGER.isTraceEnabled()) {
+                String viewInPlan = new ALogicalPlanImpl(opRef).toString(); //useful when debugging
+                LOGGER.trace("viewInPlanAgain");
+                LOGGER.trace(viewInPlan);
+                String viewOutPlan = new ALogicalPlanImpl(opRef).toString(); //useful when debugging
+                LOGGER.trace("viewOutPlan");
+                LOGGER.trace(viewOutPlan);
+            }
 
             if (LOGGER.isTraceEnabled()) {
                 LOGGER.trace("---------------------------- Printing Leaf Inputs");
-                printLeafPlans(pp, joinLeafInputsHashMap);
+                printLeafPlans(pp, leafInputs, "Inputs");
                 // print joins starting from the bottom
                 for (int i = newJoinOps.size() - 1; i >= 0; i--) {
                     printPlan(pp, (AbstractLogicalOperator) newJoinOps.get(i), "join " + i);
@@ -254,7 +240,7 @@
             }
 
         } else {
-            buildNewTree(cheapestPlanNode, joinLeafInputsHashMap);
+            buildNewTree(cheapestPlanNode);
         }
 
         return true;
@@ -484,10 +470,13 @@
                 rightSideExprBits = 0;
                 conj.getValue().getUsedVariables(joinExprVars);
                 for (LogicalVariable lv : joinExprVars) {
-                    if (foundVar(lv, leftOp)) {
-                        leftSideExprBits |= 1 << (getLeafInputId(lv) - 1);
-                    } else {
-                        rightSideExprBits |= 1 << (getLeafInputId(lv) - 1);
+                    int id = getLeafInputId(lv);
+                    if (id != -1) {
+                        if (foundVar(lv, leftOp)) {
+                            leftSideExprBits |= 1 << (id - 1);
+                        } else {
+                            rightSideExprBits |= 1 << (id - 1);
+                        }
                     }
                 }
                 if (leftSideExprBits != 0 && rightSideExprBits != 0) {// avoid expressions like true
@@ -500,10 +489,13 @@
             joinExprVars = new ArrayList<>();
             expr.getUsedVariables(joinExprVars);
             for (LogicalVariable lv : joinExprVars) {
-                if (foundVar(lv, leftOp)) {
-                    leftSideExprBits |= 1 << (getLeafInputId(lv) - 1);
-                } else {
-                    rightSideExprBits |= 1 << (getLeafInputId(lv) - 1);
+                int id = getLeafInputId(lv);
+                if (id != -1) {
+                    if (foundVar(lv, leftOp)) {
+                        leftSideExprBits |= 1 << (id - 1);
+                    } else {
+                        rightSideExprBits |= 1 << (id - 1);
+                    }
                 }
             }
             if (leftSideExprBits != 0 && rightSideExprBits != 0) {// avoid expressions like true
@@ -576,17 +568,16 @@
             if (etsDataSource != null) { // a leaf input
                 EmptyTupleSourceOperator etsOp = etsDataSource.first;
                 DataSourceScanOperator dataSourceOp = etsDataSource.second;
-                emptyTupleAndDataSourceOps.add(new Pair<>(etsOp, dataSourceOp));
                 if (op.getOperatorTag().equals(LogicalOperatorTag.DISTRIBUTE_RESULT)) {// single table query
                     ILogicalOperator selectOp = findSelectOrDataScan(op);
                     if (selectOp == null) {
                         return false;
                     } else {
-                        joinLeafInputsHashMap.put(etsOp, selectOp);
+                        leafInputs.add(selectOp);
                     }
                 } else {
                     leafInputNumber++;
-                    joinLeafInputsHashMap.put(etsOp, op);
+                    leafInputs.add(op);
                     if (!addLeafInputNumbersToVars(op)) {
                         return false;
                     }
@@ -607,6 +598,8 @@
     }
 
     private void addCardCostAnnotations(ILogicalOperator op, PlanNode plan) {
+        if (op == null)
+            return;
         op.getAnnotations().put(OperatorAnnotations.OP_OUTPUT_CARDINALITY,
                 (double) Math.round(plan.getJoinNode().getCardinality() * 100) / 100);
         op.getAnnotations().put(OperatorAnnotations.OP_COST_TOTAL,
@@ -641,7 +634,7 @@
             }
             op = op.getInputs().get(0).getValue();
         }
-        return origOp;
+        return null;
     }
 
     private void removeJoinAnnotations(AbstractFunctionCallExpression afcExpr) {
@@ -719,9 +712,8 @@
     }
 
     // This is for single table queries
-    private void buildNewTree(PlanNode plan,
-            HashMap<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap) {
-        ILogicalOperator leftInput = joinLeafInputsHashMap.get(plan.getEmptyTupleSourceOp());
+    private void buildNewTree(PlanNode plan) {
+        ILogicalOperator leftInput = plan.getLeafInput();
         skipAllIndexes(plan, leftInput);
         ILogicalOperator selOp = findSelectOrDataScan(leftInput);
         if (selOp != null) {
@@ -759,8 +751,8 @@
     }
 
     // This one is for join queries
-    private void buildNewTree(PlanNode plan, HashMap<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap,
-            List<ILogicalOperator> joinOps, MutableInt totalNumberOfJoins) throws AlgebricksException {
+    private void buildNewTree(PlanNode plan, List<ILogicalOperator> joinOps, MutableInt totalNumberOfJoins)
+            throws AlgebricksException {
         // we have to move the inputs in op around so that they match the tree structure in pn
         // we use the existing joinOps and switch the leafInputs appropriately.
         List<PlanNode> allPlans = joinEnum.getAllPlans();
@@ -783,6 +775,9 @@
                 AbstractFunctionCallExpression afcExpr = (AbstractFunctionCallExpression) expr;
                 removeJoinAnnotations(afcExpr);
                 setAnnotation(afcExpr, IndexedNLJoinExpressionAnnotation.INSTANCE_ANY_INDEX);
+                if (LOGGER.isTraceEnabled()) {
+                    LOGGER.trace("Added IndexedNLJoinExpressionAnnotation.INSTANCE_ANY_INDEX to " + afcExpr.toString());
+                }
             } else if (plan.getJoinOp() == PlanNode.JoinMethod.HYBRID_HASH_JOIN
                     || plan.getJoinOp() == PlanNode.JoinMethod.BROADCAST_HASH_JOIN
                     || plan.getJoinOp() == PlanNode.JoinMethod.CARTESIAN_PRODUCT_JOIN) {
@@ -795,11 +790,17 @@
                     AbstractFunctionCallExpression afcExpr = (AbstractFunctionCallExpression) expr;
                     removeJoinAnnotations(afcExpr);
                     setAnnotation(afcExpr, bcast);
+                    if (LOGGER.isTraceEnabled()) {
+                        LOGGER.trace("Added BroadCastAnnotation to " + afcExpr.toString());
+                    }
                 } else if (plan.getJoinOp() == PlanNode.JoinMethod.HYBRID_HASH_JOIN) {
                     HashJoinExpressionAnnotation hjAnnotation = new HashJoinExpressionAnnotation(plan.side);
                     AbstractFunctionCallExpression afcExpr = (AbstractFunctionCallExpression) expr;
                     removeJoinAnnotations(afcExpr);
                     setAnnotation(afcExpr, hjAnnotation);
+                    if (LOGGER.isTraceEnabled()) {
+                        LOGGER.trace("Added HashJoinAnnotation to " + afcExpr.toString());
+                    }
                 } else {
                     if (expr != ConstantExpression.TRUE) {
                         AbstractFunctionCallExpression afcExpr = (AbstractFunctionCallExpression) expr;
@@ -812,7 +813,7 @@
 
         if (leftPlan.IsScanNode()) {
             // leaf
-            ILogicalOperator leftInput = joinLeafInputsHashMap.get(leftPlan.getEmptyTupleSourceOp());
+            ILogicalOperator leftInput = leftPlan.getLeafInput();
             skipAllIndexes(leftPlan, leftInput);
             ILogicalOperator selOp = findSelectOrDataScan(leftInput);
             if (selOp != null) {
@@ -825,12 +826,12 @@
             totalNumberOfJoins.increment();
             ILogicalOperator leftInput = joinOps.get(totalNumberOfJoins.intValue());
             joinOp.getInputs().get(0).setValue(leftInput);
-            buildNewTree(leftPlan, joinLeafInputsHashMap, joinOps, totalNumberOfJoins);
+            buildNewTree(leftPlan, joinOps, totalNumberOfJoins);
         }
 
         if (rightPlan.IsScanNode()) {
             // leaf
-            ILogicalOperator rightInput = joinLeafInputsHashMap.get(rightPlan.getEmptyTupleSourceOp());
+            ILogicalOperator rightInput = rightPlan.getLeafInput();
             skipAllIndexes(rightPlan, rightInput);
             ILogicalOperator selOp = findSelectOrDataScan(rightInput);
             if (selOp != null) {
@@ -843,7 +844,7 @@
             totalNumberOfJoins.increment();
             ILogicalOperator rightInput = joinOps.get(totalNumberOfJoins.intValue());
             joinOp.getInputs().get(1).setValue(rightInput);
-            buildNewTree(rightPlan, joinLeafInputsHashMap, joinOps, totalNumberOfJoins);
+            buildNewTree(rightPlan, joinOps, totalNumberOfJoins);
         }
     }
 
@@ -870,53 +871,49 @@
         }
     }
 
-    private void printLeafPlans(IPlanPrettyPrinter pp,
-            HashMap<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap) throws AlgebricksException {
-        Iterator<Map.Entry<EmptyTupleSourceOperator, ILogicalOperator>> li =
-                joinLeafInputsHashMap.entrySet().iterator();
-        int i = 0;
-        while (li.hasNext()) {
-            Map.Entry<EmptyTupleSourceOperator, ILogicalOperator> pair = li.next();
-            ILogicalOperator element = pair.getValue();
-            printPlan(pp, (AbstractLogicalOperator) element, "Printing Leaf Input" + i);
-            i++;
+    private void printLeafPlans(IPlanPrettyPrinter pp, List<ILogicalOperator> leafInputs, String msg)
+            throws AlgebricksException {
+        if (LOGGER.isTraceEnabled()) {
+            LOGGER.trace(msg);
+            int i = 0;
+            for (ILogicalOperator element : leafInputs) {
+                printPlan(pp, (AbstractLogicalOperator) element, "Printing Leaf Input" + i);
+                i++;
+            }
         }
     }
 
     // for every internal edge assign (again assuming only 1 for now), find the corresponding leafInput and place the assign
     // on top of that LeafInput. Modify the joinLeafInputsHashMap as well.
-    private void pushAssignsIntoLeafInputs(IPlanPrettyPrinter pp,
-            HashMap<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap, List<AssignOperator> assignOps,
-            List<ILogicalExpression> assignJoinExprs) throws AlgebricksException {
-
-        for (Map.Entry<EmptyTupleSourceOperator, ILogicalOperator> mapElement : joinLeafInputsHashMap.entrySet()) {
-            ILogicalOperator joinLeafInput = mapElement.getValue();
+    private void pushAssignsIntoLeafInputs(IPlanPrettyPrinter pp, List<ILogicalOperator> leafInputs,
+            List<AssignOperator> assignOps, List<ILogicalExpression> assignJoinExprs) throws AlgebricksException {
+        int pos = 0;
+        for (ILogicalOperator lo : leafInputs) {
+            ILogicalOperator joinLeafInput = lo;
             printPlan(pp, (AbstractLogicalOperator) joinLeafInput, "Incoming leaf Input");
-            EmptyTupleSourceOperator ets = mapElement.getKey();
             int assignNumber = findAssignOp(joinLeafInput, assignOps, assignJoinExprs);
             if (assignNumber != -1) {
                 joinLeafInput = addAssignToLeafInput(joinLeafInput, assignOps.get(assignNumber));
                 printPlan(pp, (AbstractLogicalOperator) joinLeafInput, "Modified leaf Input");
-                joinLeafInputsHashMap.put(ets, joinLeafInput);
+                leafInputs.add(pos, joinLeafInput);
                 assignOps.remove(assignNumber);
             }
+            pos++;
         }
-
     }
 
     // check to see if every dataset has a sample. If not, CBO code cannot run. A warning message must be issued as well.
-    private boolean doAllDataSourcesHaveSamples(
-            List<Pair<EmptyTupleSourceOperator, DataSourceScanOperator>> emptyTupleAndDataSourceOps,
-            IOptimizationContext context) throws AlgebricksException {
-        for (Pair<EmptyTupleSourceOperator, DataSourceScanOperator> emptyTupleAndDataSourceOp : emptyTupleAndDataSourceOps) {
-            if (emptyTupleAndDataSourceOp.getSecond() != null) {
-                DataSourceScanOperator scanOp = emptyTupleAndDataSourceOp.getSecond();
-                Index index = joinEnum.getStatsHandle().findSampleIndex(scanOp, context);
-                if (index == null) {
-                    return false;
-                }
+    private boolean doAllDataSourcesHaveSamples(List<ILogicalOperator> leafInputs, IOptimizationContext context)
+            throws AlgebricksException {
+        for (ILogicalOperator li : leafInputs) {
+            DataSourceScanOperator scanOp = (DataSourceScanOperator) findDataSourceScanOperator(li);
+            if (scanOp == null)
+                continue;
+            Index index = joinEnum.getStatsHandle().findSampleIndex(scanOp, context);
+            if (index == null) {
+                return false;
             }
         }
         return true;
     }
-}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
index 140a069..3ce4e9f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
@@ -52,7 +52,6 @@
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Quadruple;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -74,7 +73,6 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -104,11 +102,11 @@
     protected List<PlanNode> allPlans; // list of all plans
     protected JoinNode[] jnArray; // array of all join nodes
     protected int jnArraySize;
-    private List<Pair<EmptyTupleSourceOperator, DataSourceScanOperator>> emptyTupleAndDataSourceOps;
-    protected Map<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap;
+    protected List<ILogicalOperator> leafInputs;
     protected List<ILogicalExpression> singleDatasetPreds;
     protected List<AssignOperator> assignOps;
     List<Quadruple<Integer, Integer, JoinOperator, Integer>> outerJoinsDependencyList;
+    HashMap<LogicalVariable, Integer> varLeafInputIds;
     protected List<JoinOperator> allJoinOps;
     protected ILogicalOperator localJoinOp; // used in nestedLoopsApplicable code.
     protected IOptimizationContext optCtx;
@@ -134,11 +132,10 @@
     }
 
     protected void initEnum(AbstractLogicalOperator op, boolean cboMode, boolean cboTestMode, int numberOfFromTerms,
-            List<Pair<EmptyTupleSourceOperator, DataSourceScanOperator>> emptyTupleAndDataSourceOps,
-            Map<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap, List<JoinOperator> allJoinOps,
-            List<AssignOperator> assignOps,
+            List<ILogicalOperator> leafInputs, List<JoinOperator> allJoinOps, List<AssignOperator> assignOps,
             List<Quadruple<Integer, Integer, JoinOperator, Integer>> outerJoinsDependencyList,
-            List<Triple<Integer, Integer, Boolean>> buildSets, IOptimizationContext context) throws AsterixException {
+            List<Triple<Integer, Integer, Boolean>> buildSets, HashMap<LogicalVariable, Integer> varLeafInputIds,
+            IOptimizationContext context) throws AsterixException {
         this.singleDatasetPreds = new ArrayList<>();
         this.joinConditions = new ArrayList<>();
         this.joinHints = new HashMap<>();
@@ -150,13 +147,13 @@
         this.cboCPEnumMode = getCBOCPEnumMode(context);
         this.connectedJoinGraph = true;
         this.optCtx = context;
-        this.emptyTupleAndDataSourceOps = emptyTupleAndDataSourceOps;
-        this.joinLeafInputsHashMap = joinLeafInputsHashMap;
+        this.leafInputs = leafInputs;
         this.assignOps = assignOps;
         this.outerJoin = false; // assume no outerjoins anywhere in the query at first.
         this.outerJoinsDependencyList = outerJoinsDependencyList;
         this.allJoinOps = allJoinOps;
         this.buildSets = buildSets;
+        this.varLeafInputIds = varLeafInputIds;
         this.op = op;
         this.forceJoinOrderMode = getForceJoinOrderMode(context);
         this.queryPlanShape = getQueryPlanShape(context);
@@ -221,9 +218,7 @@
 
     protected ILogicalOperator findLeafInput(List<LogicalVariable> logicalVars) throws AlgebricksException {
         Set<LogicalVariable> vars = new HashSet<>();
-        for (Pair<EmptyTupleSourceOperator, DataSourceScanOperator> emptyTupleAndDataSourceOp : emptyTupleAndDataSourceOps) {
-            EmptyTupleSourceOperator emptyOp = emptyTupleAndDataSourceOp.getFirst();
-            ILogicalOperator op = joinLeafInputsHashMap.get(emptyOp);
+        for (ILogicalOperator op : leafInputs) {
             vars.clear();
             // this is expensive to do. So store this once and reuse
             VariableUtilities.getLiveVariables(op, vars);
@@ -385,48 +380,6 @@
         return JoinNode.NO_JN;
     }
 
-    protected int findJoinNodeIndex(LogicalVariable lv) throws AlgebricksException {
-        List<Pair<EmptyTupleSourceOperator, DataSourceScanOperator>> emptyTupleAndDataSourceOps =
-                this.emptyTupleAndDataSourceOps;
-        Map<EmptyTupleSourceOperator, ILogicalOperator> joinLeafInputsHashMap = this.joinLeafInputsHashMap;
-
-        for (Map.Entry<EmptyTupleSourceOperator, ILogicalOperator> mapElement : joinLeafInputsHashMap.entrySet()) {
-            ILogicalOperator joinLeafInput = mapElement.getValue();
-            HashSet<LogicalVariable> vars = new HashSet<>();
-            // this should get the variables from the inputs only, since the join condition is itself set to null
-            VariableUtilities.getLiveVariables(joinLeafInput, vars);
-            if (vars.contains(lv)) {
-                EmptyTupleSourceOperator key = mapElement.getKey();
-                for (int i = 0; i < emptyTupleAndDataSourceOps.size(); i++) {
-                    if (key.equals(emptyTupleAndDataSourceOps.get(i).getFirst())) {
-                        return i;
-                    }
-                }
-            }
-        }
-        return JoinNode.NO_JN;
-    }
-
-    private int findBits(LogicalVariable lv) throws AlgebricksException {
-        int idx = findJoinNodeIndex(lv);
-        if (idx >= 0) {
-            return 1 << idx;
-        }
-
-        // so this variable must be in an internal edge in an assign statement. Find the RHS variables there
-        for (AssignOperator op : this.assignOps) {
-            List<LogicalVariable> vars2 = new ArrayList<>();
-            VariableUtilities.getUsedVariables(op, vars2);
-            int bits = 0;
-            for (LogicalVariable lv2 : vars2) {
-                bits |= findBits(lv2);
-            }
-            return bits;
-        }
-
-        return JoinNode.NO_JN;
-    }
-
     // This finds all the join Conditions in the whole query. This is a global list of all join predicates.
     // It also fills in the dataset Bits for each join predicate.
     private void findJoinConditionsAndAssignSels() throws AlgebricksException {
@@ -496,7 +449,7 @@
             jc.numberOfVars = usedVars.size();
 
             for (int i = 0; i < jc.numberOfVars; i++) {
-                int bits = findBits(usedVars.get(i)); // rename to findInWhichLeaf
+                int bits = 1 << (varLeafInputIds.get(usedVars.get(i)) - 1);
                 if (bits != JoinCondition.NO_JC) {
                     if (i == 0) {
                         jc.leftSideBits = bits;
@@ -630,7 +583,6 @@
             return -1;
         }
         for (i = 0; i < buildSets.size(); i++) {
-            //System.out.println("first " + buildSets.get(i).first + " second " + buildSets.get(i).second + " numtabs " + numbTabs + " bits " + jbits);
             if ((buildSets.get(i).third) && (buildSets.get(i).first & jbits) > 0) {
                 return i;
             }
@@ -650,7 +602,6 @@
             JoinNode jnI = jnArray[i];
             jnI.jnArrayIndex = i;
             if (jnI.highestDatasetId == 0) {
-                // this jn can be skipped
                 continue;
             }
 
@@ -790,10 +741,8 @@
             jn.jnArrayIndex = i;
             jn.datasetBits = 1 << (i - 1);
             jn.datasetIndexes = new ArrayList<>(Collections.singleton(i));
-            EmptyTupleSourceOperator ets = emptyTupleAndDataSourceOps.get(i - 1).getFirst();
-            ILogicalOperator leafInput = joinLeafInputsHashMap.get(ets);
-
-            DataSourceScanOperator scanOp = emptyTupleAndDataSourceOps.get(i - 1).getSecond();
+            ILogicalOperator leafInput = leafInputs.get(i - 1);
+            DataSourceScanOperator scanOp = findDataSourceScanOperator(leafInput);
             if (scanOp != null) {
                 DataSourceId id = (DataSourceId) scanOp.getDataSource().getId();
                 jn.aliases = new ArrayList<>(Collections.singleton(findAlias(scanOp)));
@@ -835,7 +784,7 @@
             if (jn.origCardinality >= Cost.MAX_CARD) {
                 noCards = true;
             }
-            jn.correspondingEmptyTupleSourceOp = emptyTupleAndDataSourceOps.get(i - 1).getFirst();
+            jn.leafInput = leafInputs.get(i - 1);
             jn.highestDatasetId = i;
             jn.level = 1;
         }
@@ -845,6 +794,17 @@
         return numberOfTerms;
     }
 
+    private DataSourceScanOperator findDataSourceScanOperator(ILogicalOperator op) {
+        ILogicalOperator origOp = op;
+        while (op != null && op.getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE) {
+            if (op.getOperatorTag().equals(LogicalOperatorTag.DATASOURCESCAN)) {
+                return (DataSourceScanOperator) op;
+            }
+            op = op.getInputs().get(0).getValue();
+        }
+        return null;
+    }
+
     // Most of this work is done in the very first line by calling initializeBaseLevelJoinNodes().
     // the remaining work here is to find the selectivities of the predicates using sampling.
     // By the time execution reaches this point, samples are guaranteed to exist on all datasets,
@@ -860,8 +820,7 @@
         for (int i = 1; i <= this.numberOfTerms; i++) {
             JoinNode jn = jnArray[i];
             Index.SampleIndexDetails idxDetails = jn.getIdxDetails();
-            EmptyTupleSourceOperator ets = this.emptyTupleAndDataSourceOps.get(i - 1).getFirst();
-            ILogicalOperator leafInput = this.joinLeafInputsHashMap.get(ets);
+            ILogicalOperator leafInput = this.leafInputs.get(i - 1);
             if (!cboTestMode) {
                 if (idxDetails == null) {
                     dataScanPlan = jn.addSingleDatasetPlans();
@@ -873,7 +832,7 @@
                 double origDatasetCard, finalDatasetCard, sampleCard;
 
                 ILogicalOperator parent = findDataSourceScanOperatorParent(leafInput);
-                DataSourceScanOperator scanOp = this.emptyTupleAndDataSourceOps.get(i - 1).getSecond();
+                DataSourceScanOperator scanOp = findDataSourceScanOperator(leafInput);
                 if (scanOp == null) {
                     continue; // what happens to the cards and sizes then? this may happen in case of in lists
                 }
@@ -1078,6 +1037,7 @@
         markCompositeJoinPredicates();
         int lastJnNum = enumerateHigherLevelJoinNodes();
         JoinNode lastJn = jnArray[allTabsJnNum];
+        //System.out.println(dumpJoinNodes(allTabsJnNum));
         if (LOGGER.isTraceEnabled()) {
             EnumerateJoinsRule.printPlan(pp, op, "Original Whole plan in JN END");
             LOGGER.trace(dumpJoinNodes(lastJnNum));
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
index 41ef4af..4c49cb3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
@@ -59,7 +59,6 @@
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
 import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -88,7 +87,7 @@
     private JoinNode rightJn;
     private JoinNode leftJn;
     private List<Integer> applicableJoinConditions;
-    protected EmptyTupleSourceOperator correspondingEmptyTupleSourceOp; // There is a 1-1 relationship between the LVs and the dataSourceScanOps and the leafInputs.
+    protected ILogicalOperator leafInput;
     private List<Pair<IAccessMethod, Index>> chosenIndexes;
     private Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs;
     protected Index.SampleIndexDetails idxDetails;
@@ -193,8 +192,7 @@
             return false;
         }
 
-        // We need to find out which one of these is the inner joinLeafInput. So for that get the joinLeafInput of this join node.
-        ILogicalOperator innerLeafInput = joinEnum.joinLeafInputsHashMap.get(this.correspondingEmptyTupleSourceOp);
+        ILogicalOperator innerLeafInput = this.leafInput;
 
         // This must equal one of the two joinLeafInputsHashMap found above. check for sanity!!
         if (innerLeafInput != joinLeafInput1 && innerLeafInput != joinLeafInput0) {
@@ -388,7 +386,7 @@
             pn = new PlanNode(allPlans.size(), joinEnum);
             pn.setJoinNode(this);
             pn.datasetName = this.datasetNames.get(0);
-            pn.correspondingEmptyTupleSourceOp = this.correspondingEmptyTupleSourceOp;
+            pn.leafInput = this.leafInput;
             pn.setLeftJoinIndex(this.jnArrayIndex);
             pn.setRightJoinIndex(JoinNode.NO_JN);
             pn.setLeftPlanIndex(PlanNode.NO_PLAN); // There ane no plans below this plan.
@@ -581,7 +579,7 @@
             pn = new PlanNode(allPlans.size(), joinEnum);
             pn.setJoinNode(this);
             pn.setDatasetName(getDatasetNames().get(0));
-            pn.setEmptyTupleSourceOp(this.correspondingEmptyTupleSourceOp);
+            pn.setLeafInput(this.leafInput);
             pn.setLeftJoinIndex(this.jnArrayIndex);
             pn.setRightJoinIndex(JoinNode.NO_JN);
             pn.setLeftPlanIndex(PlanNode.NO_PLAN); // There ane no plans below this plan.
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/PlanNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/PlanNode.java
index 9dc7d6c..b017140 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/PlanNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/PlanNode.java
@@ -22,10 +22,10 @@
 import org.apache.asterix.optimizer.cost.ICost;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.expressions.HashJoinExpressionAnnotation;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 
 public class PlanNode {
 
@@ -50,7 +50,7 @@
     protected ScanMethod scanOp;
     protected ILogicalExpression joinExpr;
     private DataSourceScanOperator correspondingDataSourceScanOp;
-    protected EmptyTupleSourceOperator correspondingEmptyTupleSourceOp;
+    protected ILogicalOperator leafInput;
 
     public enum ScanMethod {
         INDEX_SCAN,
@@ -166,12 +166,12 @@
         return correspondingDataSourceScanOp; // This applies only to singleDataSetPlans
     }
 
-    protected EmptyTupleSourceOperator getEmptyTupleSourceOp() {
-        return correspondingEmptyTupleSourceOp; // This applies only to singleDataSetPlans
+    protected ILogicalOperator getLeafInput() {
+        return leafInput; // This applies only to singleDataSetPlans
     }
 
-    protected void setEmptyTupleSourceOp(EmptyTupleSourceOperator emptyTupleSourceOp) {
-        this.correspondingEmptyTupleSourceOp = emptyTupleSourceOp; // This applies only to singleDataSetPlans
+    protected void setLeafInput(ILogicalOperator leafInput) {
+        this.leafInput = leafInput; // This applies only to singleDataSetPlans
     }
 
     public ICost getOpCost() {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
index b285de2..13082f2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
@@ -54,6 +54,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -98,8 +99,16 @@
             // Since there is a left and right dataset here, expecting only two variables.
             return 1.0;
         }
-        int idx1 = joinEnum.findJoinNodeIndex(exprUsedVars.get(0)) + 1;
-        int idx2 = joinEnum.findJoinNodeIndex(exprUsedVars.get(1)) + 1;
+        int idx1, idx2;
+        if (joinEnum.varLeafInputIds.containsKey(exprUsedVars.get(0))) {
+            idx1 = joinEnum.varLeafInputIds.get(exprUsedVars.get(0));
+        } else
+            return 1.0;
+        if (joinEnum.varLeafInputIds.containsKey(exprUsedVars.get(1))) {
+            idx2 = joinEnum.varLeafInputIds.get(exprUsedVars.get(1));
+        } else
+            return 1.0;
+
         double card1 = joinEnum.getJnArray()[idx1].origCardinality;
         double card2 = joinEnum.getJnArray()[idx2].origCardinality;
         if (card1 == 0.0 || card2 == 0.0) // should not happen
@@ -503,6 +512,9 @@
         OperatorPropertiesUtil.typeOpRec(newAggOpRef, newCtx);
         LOGGER.info("***returning from sample query***");
 
+        String viewInPlan = new ALogicalPlanImpl(newAggOpRef).toString(); //useful when debugging
+        LOGGER.trace("viewInPlan");
+        LOGGER.trace(viewInPlan);
         return AnalysisUtil.runQuery(newAggOpRef, Arrays.asList(aggVar), newCtx, IRuleSetFactory.RuleSetKind.SAMPLING);
     }
 }