[ASTERIXDB-3303][COMP] Projection Sizes continued

Change-Id: Iba8bf1171750994195a3426f22c25a99720f0983
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17967
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Vijay Sarathy <vijay.sarathy@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
index 82e7b32..e8e19e1 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EnumerateJoinsRule.java
@@ -94,6 +94,8 @@
     // The OrderBy operator at root of the query tree (if exists)
     private ILogicalOperator rootOrderByOp;
 
+    private List<LogicalVariable> resultAndJoinVars = new ArrayList();
+
     public EnumerateJoinsRule(JoinEnum joinEnum) {
         this.joinEnum = joinEnum;
         dataScanAndGroupByDistinctOps = new HashMap<>(); // initialized only once at the beginning of the rule
@@ -138,6 +140,17 @@
 
             // Find the order by op, so we can annotate cost/cards
             findOrderByOp(op);
+
+            // Find the topmost assign, so we can find all the final projected variables.
+            ILogicalOperator tmp = op;
+
+            while (tmp.getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE) {
+                if (tmp.getOperatorTag().equals(LogicalOperatorTag.ASSIGN)) {
+                    addAllAssignExprVars(resultAndJoinVars, (AssignOperator) tmp);
+                    break;
+                }
+                tmp = tmp.getInputs().get(0).getValue();
+            }
         }
 
         // if this join has already been seen before, no need to apply the rule again
@@ -163,6 +176,8 @@
             return false;
         }
 
+        collectJoinConditionsVariables(); // will be used for determining which variables will be projected from the base levels
+
         convertOuterJoinstoJoinsIfPossible(outerJoinsDependencyList);
 
         printPlan(pp, (AbstractLogicalOperator) op, "Original Whole plan2");
@@ -180,7 +195,7 @@
         }
         joinEnum.initEnum((AbstractLogicalOperator) op, cboMode, cboTestMode, numberOfFromTerms, leafInputs, allJoinOps,
                 assignOps, outerJoinsDependencyList, buildSets, varLeafInputIds, dataScanAndGroupByDistinctOps,
-                rootGroupByDistinctOp, rootOrderByOp, context);
+                rootGroupByDistinctOp, rootOrderByOp, resultAndJoinVars, context);
 
         if (cboMode) {
             if (!doAllDataSourcesHaveSamples(leafInputs, context)) {
@@ -252,19 +267,34 @@
                 printPlan(pp, (AbstractLogicalOperator) newJoinOps.get(0), "New Whole Plan");
                 printPlan(pp, (AbstractLogicalOperator) root, "New Whole Plan");
             }
-
             // turn of this rule for all joins in this set (subtree)
             for (ILogicalOperator joinOp : newJoinOps) {
                 context.addToDontApplySet(this, joinOp);
             }
-
         } else {
             buildNewTree(cheapestPlanNode);
         }
-
         return true;
     }
 
+    private void collectJoinConditionsVariables() {
+        for (JoinOperator jOp : allJoinOps) {
+            AbstractBinaryJoinOperator joinOp = jOp.getAbstractJoinOp();
+            ILogicalExpression expr = joinOp.getCondition().getValue();
+            List<LogicalVariable> vars = new ArrayList<>();
+            expr.getUsedVariables(vars);
+            resultAndJoinVars.addAll(vars); // collect all the variables used in join expressions. These will be projected from the base level
+        }
+    }
+
+    private void addAllAssignExprVars(List<LogicalVariable> resultAndJoinVars, AssignOperator op) {
+        for (Mutable<ILogicalExpression> exp : op.getExpressions()) {
+            List<LogicalVariable> vars = new ArrayList<>();
+            exp.getValue().getUsedVariables(vars);
+            resultAndJoinVars.addAll(vars);
+        }
+    }
+
     private void pushAssignsAboveJoins(ILogicalOperator op, AssignOperator aOp, ILogicalExpression jexpr,
             MutableBoolean removed) {
         System.out.println("op " + op.toString());
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
index 3a5dda7..510d8bb 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
@@ -38,6 +38,7 @@
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.declared.SampleDataSource;
 import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.base.AInt64;
 import org.apache.asterix.om.base.AOrderedList;
 import org.apache.asterix.om.base.IAObject;
 import org.apache.asterix.om.constants.AsterixConstantValue;
@@ -138,6 +139,7 @@
     protected String queryPlanShape;
     protected ICost cost;
     protected ICostMethods costMethods;
+    List<LogicalVariable> resultAndJoinVars;
 
     public JoinEnum() {
     }
@@ -147,8 +149,8 @@
             List<Quadruple<Integer, Integer, JoinOperator, Integer>> outerJoinsDependencyList,
             List<Triple<Integer, Integer, Boolean>> buildSets, HashMap<LogicalVariable, Integer> varLeafInputIds,
             HashMap<DataSourceScanOperator, ILogicalOperator> dataScanAndGroupByDistinctOps,
-            ILogicalOperator grpByDistinctOp, ILogicalOperator orderByOp, IOptimizationContext context)
-            throws AsterixException {
+            ILogicalOperator grpByDistinctOp, ILogicalOperator orderByOp, List<LogicalVariable> resultAndJoinVars,
+            IOptimizationContext context) throws AsterixException {
         this.singleDatasetPreds = new ArrayList<>();
         this.joinConditions = new ArrayList<>();
         this.joinHints = new HashMap<>();
@@ -170,6 +172,7 @@
         this.dataScanAndGroupByDistinctOps = dataScanAndGroupByDistinctOps;
         this.rootGroupByDistinctOp = grpByDistinctOp;
         this.rootOrderByOp = orderByOp;
+        this.resultAndJoinVars = resultAndJoinVars;
         this.op = op;
         this.forceJoinOrderMode = getForceJoinOrderMode(context);
         this.queryPlanShape = getQueryPlanShape(context);
@@ -911,7 +914,7 @@
                 parent.getInputs().get(0).setValue(deepCopyofScan);
                 // There are predicates here. So skip the predicates and get the original dataset card.
                 // Now apply all the predicates and get the card after all predicates are applied.
-                result = stats.runSamplingQuery(this.optCtx, leafInput);
+                result = stats.runSamplingQueryProjection(this.optCtx, leafInput);
                 double predicateCardinality = stats.findPredicateCardinality(result);
 
                 double projectedSize;
@@ -920,7 +923,7 @@
                 } else { // in case we did not get any tuples from the sample, get the size by setting the predicate to true.
                     ILogicalExpression saveExpr = selop.getCondition().getValue();
                     selop.getCondition().setValue(ConstantExpression.TRUE);
-                    result = stats.runSamplingQuery(this.optCtx, leafInput);
+                    result = stats.runSamplingQueryProjection(this.optCtx, leafInput);
                     double x = stats.findPredicateCardinality(result);
                     // better to check if x is 0
                     if (x == 0.0) {
@@ -1084,7 +1087,7 @@
                 SelectOperator selOp = new SelectOperator(new MutableObject<>(exp));
                 selOp.getInputs().add(new MutableObject<>(leafInput));
                 result = stats.runSamplingQuery(this.optCtx, selOp);
-                predicateCardinality = stats.findPredicateCardinality(result);
+                predicateCardinality = (double) ((AInt64) result.get(0).get(0)).getLongValue();
 
                 if (predicateCardinality == 0.0) {
                     predicateCardinality = 0.0001 * idxDetails.getSampleCardinalityTarget();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
index 95f2da6..9615386 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
@@ -42,6 +42,7 @@
 import org.apache.asterix.optimizer.base.AnalysisUtil;
 import org.apache.asterix.optimizer.rules.am.array.AbstractOperatorFromSubplanRewrite;
 import org.apache.asterix.translator.ConstantHelper;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -504,7 +505,7 @@
             }
         }
 
-        double predicateCardinality = findPredicateCardinality(result); // this routine knows how to look into the record inside result
+        double predicateCardinality = (double) ((AInt64) result.get(0).get(0)).getLongValue();
         if (predicateCardinality == 0.0) {
             predicateCardinality = 0.0001 * idxDetails.getSampleCardinalityTarget();
         }
@@ -527,7 +528,7 @@
             selOp.getCondition().setValue(ConstantExpression.TRUE);
             result = runSamplingQuery(optCtx, selOp);
             selOp.getCondition().setValue(saveExprs);
-            sampleCard = findPredicateCardinality(result);
+            sampleCard = (double) ((AInt64) result.get(0).get(0)).getLongValue();
         }
         // switch  the scanOp back
         parent.getInputs().get(0).setValue(scanOp);
@@ -563,6 +564,7 @@
         return projectedSize;
     }
 
+    // This one only gets the cardinality
     protected List<List<IAObject>> runSamplingQuery(IOptimizationContext ctx, ILogicalOperator logOp)
             throws AlgebricksException {
         LOGGER.info("***running sample query***");
@@ -570,6 +572,41 @@
         IOptimizationContext newCtx = ctx.getOptimizationContextFactory().cloneOptimizationContext(ctx);
 
         ILogicalOperator newScanOp = OperatorManipulationUtil.bottomUpCopyOperators(logOp);
+
+        List<Mutable<ILogicalExpression>> aggFunArgs = new ArrayList<>(1);
+        aggFunArgs.add(new MutableObject<>(ConstantExpression.TRUE));
+        BuiltinFunctionInfo countFn = BuiltinFunctions.getBuiltinFunctionInfo(BuiltinFunctions.COUNT);
+        AggregateFunctionCallExpression aggExpr = new AggregateFunctionCallExpression(countFn, false, aggFunArgs);
+
+        List<Mutable<ILogicalExpression>> aggExprList = new ArrayList<>(1);
+        aggExprList.add(new MutableObject<>(aggExpr));
+
+        List<LogicalVariable> aggVarList = new ArrayList<>(1);
+        LogicalVariable aggVar = newCtx.newVar();
+        aggVarList.add(aggVar);
+
+        AggregateOperator newAggOp = new AggregateOperator(aggVarList, aggExprList);
+        newAggOp.getInputs().add(new MutableObject<>(newScanOp));
+
+        Mutable<ILogicalOperator> newAggOpRef = new MutableObject<>(newAggOp);
+
+        OperatorPropertiesUtil.typeOpRec(newAggOpRef, newCtx);
+        LOGGER.info("***returning from sample query***");
+
+        String viewInPlan = new ALogicalPlanImpl(newAggOpRef).toString(); //useful when debugging
+        LOGGER.trace("viewInPlan");
+        LOGGER.trace(viewInPlan);
+        return AnalysisUtil.runQuery(newAggOpRef, Arrays.asList(aggVar), newCtx, IRuleSetFactory.RuleSetKind.SAMPLING);
+    }
+
+    // This one gets the cardinality and also projection sizes
+    protected List<List<IAObject>> runSamplingQueryProjection(IOptimizationContext ctx, ILogicalOperator logOp)
+            throws AlgebricksException {
+        LOGGER.info("***running sample query***");
+
+        IOptimizationContext newCtx = ctx.getOptimizationContextFactory().cloneOptimizationContext(ctx);
+
+        ILogicalOperator newScanOp = OperatorManipulationUtil.bottomUpCopyOperators(logOp);
         // Now we have to generate plans like this on top of the scanOp (logOp)
         // project ([$$79])
         // assign [$$79] <- [{"$1": $$73, "$2": $$74, "$3": $$75, "$4": $$76, "$5": $$77, "$6": $$78}]
@@ -577,8 +614,12 @@
         // assign [$$68, $$69, $$70, $$71, $$72] <- [serialized-size($$60), serialized-size($$str), serialized-size($$61), serialized-size($$65), serialized-size($$67)]
 
         // add the assign [$$56, ..., ] <- [encoded-size($$67), ..., ] on top of newAggOp
-        List<LogicalVariable> vars = new ArrayList<>();
-        VariableUtilities.getLiveVariables(logOp, vars);
+        List<LogicalVariable> vars1 = new ArrayList<>();
+        VariableUtilities.getLiveVariables(logOp, vars1); // all the variables in the leafInput
+        List<LogicalVariable> vars3 = // these variables can be thrown away as they are not present joins and in the final project
+                new ArrayList<>(CollectionUtils.subtract(vars1, joinEnum.resultAndJoinVars /* vars2 */));
+        List<LogicalVariable> vars = new ArrayList<>(CollectionUtils.subtract(vars1, vars3)); // variables that will flow up the tree
+
         LogicalVariable newVar;
         // array to keep track of the assigns
         List<LogicalVariable> newVars = new ArrayList<>();
@@ -602,7 +643,6 @@
         AssignOperator assignOp = new AssignOperator(newVars, exprs);
         assignOp.getInputs().add(new MutableObject<>(newScanOp));
         Mutable<ILogicalOperator> tmpRef = new MutableObject<>(assignOp);
-        String viewInPlan = new ALogicalPlanImpl(tmpRef).toString();
 
         // aggregate [$$73, $$74, $$75, $$76, $$77, $$78] <- [agg-count(true), sql-avg($$68), sql-avg($$69), sql-avg($$70), sql-avg($$71), sql-avg($$72)]
         // add the count-agg (true) first
@@ -649,14 +689,16 @@
         pOp.getInputs().add(new MutableObject<>(assignOp));
 
         Mutable<ILogicalOperator> Ref = new MutableObject<>(pOp);
-        LOGGER.info("***returning from sample query***");
 
         OperatorPropertiesUtil.typeOpRec(Ref, newCtx);
-        String viewInPlan3 = new ALogicalPlanImpl(Ref).toString(); //useful when debugging
-        LOGGER.trace("viewInPlan3");
-        LOGGER.trace(viewInPlan3);
-        return AnalysisUtil.runQuery(Ref, Arrays.asList(newVar), newCtx, IRuleSetFactory.RuleSetKind.SAMPLING);
+        if (LOGGER.isTraceEnabled()) {
+            String viewInPlan = new ALogicalPlanImpl(Ref).toString(); //useful when debugging
+            LOGGER.trace("sampling query before calling runQuery");
+            LOGGER.trace(viewInPlan);
+        }
 
+        LOGGER.info("***returning from sample query***");
+        return AnalysisUtil.runQuery(Ref, Arrays.asList(newVar), newCtx, IRuleSetFactory.RuleSetKind.SAMPLING);
     }
 
     private List<MutableObject> createMutableObjectArray(List<LogicalVariable> vars) {
@@ -721,7 +763,7 @@
         ILogicalOperator copyOfSelOp = OperatorManipulationUtil.bottomUpCopyOperators(selOp);
         if (setSampleDataSource(copyOfSelOp, sampleDataSource)) {
             List<List<IAObject>> result = runSamplingQuery(optCtx, copyOfSelOp);
-            sampleSize = (long) findPredicateCardinality(result);
+            sampleSize = (long) ((AInt64) result.get(0).get(0)).getLongValue();
         }
         return sampleSize;
     }
@@ -737,7 +779,7 @@
             if (setSampleDataSource(copyOfGrpByDistinctOp, sampleDataSource)) {
                 // get distinct cardinality from the sampling source
                 List<List<IAObject>> result = runSamplingQuery(optCtx, copyOfGrpByDistinctOp);
-                estDistCardinalityFromSample = findPredicateCardinality(result);
+                estDistCardinalityFromSample = (double) ((AInt64) result.get(0).get(0)).getLongValue();
             }
         }
         if (estDistCardinalityFromSample != -1.0) { // estimate distinct cardinality for the dataset from the sampled cardinality
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/cardinality-estimation/join-queries/join-queries.8.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/cardinality-estimation/join-queries/join-queries.8.plan
index 7db992d..aa774b6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/cardinality-estimation/join-queries/join-queries.8.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/cardinality-estimation/join-queries/join-queries.8.plan
@@ -46,12 +46,30 @@
                                   -- BTREE_SEARCH  |PARTITIONED|
                                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                                     -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                      project ([$$120, $$128, $$124]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      project ([$$124, $$120, $$128]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                           join (eq($$123, $$136)) [cardinality: 248.35, op-cost: 398.35, total-cost: 2821.71]
-                                          -- HYBRID_HASH_JOIN [$$123][$$136]  |PARTITIONED|
+                                          -- HYBRID_HASH_JOIN [$$136][$$123]  |PARTITIONED|
+                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- HASH_PARTITION_EXCHANGE [$$136]  |PARTITIONED|
+                                              project ([$$124, $$136]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                select (and(lt($$121, "1994-01-01"), ge($$121, "1993-01-01"))) [cardinality: 248.35, op-cost: 0.0, total-cost: 1500.0]
+                                                -- STREAM_SELECT  |PARTITIONED|
+                                                  project ([$$124, $$136, $$121]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    assign [$$136, $$121] <- [$$o.getField(1), $$o.getField(4)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        data-scan []<-[$$124, $$o] <- tpch.Orders [cardinality: 1500.0, op-cost: 1500.0, total-cost: 1500.0]
+                                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                             exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                                             -- HASH_PARTITION_EXCHANGE [$$123]  |PARTITIONED|
                                               project ([$$120, $$128, $$123]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
@@ -88,24 +106,6 @@
                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                 empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                                                                 -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                            -- HASH_PARTITION_EXCHANGE [$$136]  |PARTITIONED|
-                                              project ([$$124, $$136]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                select (and(lt($$121, "1994-01-01"), ge($$121, "1993-01-01"))) [cardinality: 248.35, op-cost: 0.0, total-cost: 1500.0]
-                                                -- STREAM_SELECT  |PARTITIONED|
-                                                  project ([$$124, $$136, $$121]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                    assign [$$136, $$121] <- [$$o.getField(1), $$o.getField(4)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                    -- ASSIGN  |PARTITIONED|
-                                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        data-scan []<-[$$124, $$o] <- tpch.Orders [cardinality: 1500.0, op-cost: 1500.0, total-cost: 1500.0]
-                                                        -- DATASOURCE_SCAN  |PARTITIONED|
-                                                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                         exchange [cardinality: 10.0, op-cost: 40.0, total-cost: 50.0]
                         -- BROADCAST_EXCHANGE  |PARTITIONED|
                           project ([$$130, $$127]) [cardinality: 10.0, op-cost: 0.0, total-cost: 10.0]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/field-access-pushdown/field-access-pushdown.008.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/field-access-pushdown/field-access-pushdown.008.plan
index d1f713d..640f9b7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/field-access-pushdown/field-access-pushdown.008.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/field-access-pushdown/field-access-pushdown.008.plan
@@ -19,21 +19,7 @@
                   exchange [cardinality: 8.0, op-cost: 0.0, total-cost: 45.0]
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                     join (eq($$33, $$34)) [cardinality: 8.0, op-cost: 15.0, total-cost: 45.0]
-                    -- HYBRID_HASH_JOIN [$$34][$$33]  |PARTITIONED|
-                      exchange [cardinality: 8.0, op-cost: 8.0, total-cost: 16.0]
-                      -- HASH_PARTITION_EXCHANGE [$$34]  |PARTITIONED|
-                        project ([$$39, $$34]) [cardinality: 8.0, op-cost: 0.0, total-cost: 8.0]
-                        -- STREAM_PROJECT  |PARTITIONED|
-                          assign [$$39] <- [$$p2.getField("name")] [cardinality: 8.0, op-cost: 0.0, total-cost: 8.0]
-                          -- ASSIGN  |PARTITIONED|
-                            exchange [cardinality: 8.0, op-cost: 8.0, total-cost: 16.0]
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              data-scan []<-[$$34, $$p2] <- test.ColumnDataset3 project ({name:any}) [cardinality: 8.0, op-cost: 8.0, total-cost: 8.0]
-                              -- DATASOURCE_SCAN  |PARTITIONED|
-                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                    -- HYBRID_HASH_JOIN [$$33][$$34]  |PARTITIONED|
                       exchange [cardinality: 7.0, op-cost: 7.0, total-cost: 14.0]
                       -- HASH_PARTITION_EXCHANGE [$$33]  |PARTITIONED|
                         project ([$$38, $$33]) [cardinality: 7.0, op-cost: 0.0, total-cost: 7.0]
@@ -48,3 +34,17 @@
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                   empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                                   -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                      exchange [cardinality: 8.0, op-cost: 8.0, total-cost: 16.0]
+                      -- HASH_PARTITION_EXCHANGE [$$34]  |PARTITIONED|
+                        project ([$$39, $$34]) [cardinality: 8.0, op-cost: 0.0, total-cost: 8.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          assign [$$39] <- [$$p2.getField("name")] [cardinality: 8.0, op-cost: 0.0, total-cost: 8.0]
+                          -- ASSIGN  |PARTITIONED|
+                            exchange [cardinality: 8.0, op-cost: 8.0, total-cost: 16.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$34, $$p2] <- test.ColumnDataset3 project ({name:any}) [cardinality: 8.0, op-cost: 8.0, total-cost: 8.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|