[ASTERIXDB-3303][COMP] Projection Sizes Phase 2

Change-Id: Ia31e20ad92ad35598062649815e8f5c7e9695dbf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17985
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Vijay Sarathy <vijay.sarathy@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
index d9174d8..e748e55 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
@@ -30,6 +30,7 @@
 
 import org.apache.asterix.common.annotations.IndexedNLJoinExpressionAnnotation;
 import org.apache.asterix.common.annotations.SecondaryIndexSearchPreferenceAnnotation;
+import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.metadata.declared.DataSource;
@@ -801,6 +802,8 @@
                     }
                     jn.setOrigCardinality(idxDetails.getSourceCardinality());
                     jn.setAvgDocSize(idxDetails.getSourceAvgItemSize());
+                    jn.setSizeVarsFromDisk(10); // dummy value
+                    jn.setSizeVarsAfterScan(10); // dummy value
                 }
                 // multiply by the respective predicate selectivities
                 jn.setCardinality(jn.origCardinality * stats.getSelectivity(leafInput, false));
@@ -903,33 +906,48 @@
                     LOGGER.trace(viewPlan);
                 }
 
+                // find if row or columnar format
+                DatasetDataSource dds = (DatasetDataSource) scanOp.getDataSource();
+                if (dds.getDataset().getDatasetFormatInfo().getFormat() == DatasetConfig.DatasetFormat.ROW) {
+                    jn.setColumnar(false);
+                }
+
                 SampleDataSource sampledatasource = getSampleDataSource(scanOp);
                 DataSourceScanOperator deepCopyofScan =
                         (DataSourceScanOperator) OperatorManipulationUtil.bottomUpCopyOperators(scanOp);
                 deepCopyofScan.setDataSource(sampledatasource);
-
+                LogicalVariable primaryKey;
+                if (deepCopyofScan.getVariables().size() > 1) {
+                    primaryKey = deepCopyofScan.getVariables().get(1);
+                } else {
+                    primaryKey = deepCopyofScan.getVariables().get(0);
+                }
                 // if there is only one conjunct, I do not have to call the sampling query during index selection!
                 // insert this in place of the scandatasourceOp.
                 parent.getInputs().get(0).setValue(deepCopyofScan);
                 // There are predicates here. So skip the predicates and get the original dataset card.
                 // Now apply all the predicates and get the card after all predicates are applied.
-                result = stats.runSamplingQueryProjection(this.optCtx, leafInput);
+                result = stats.runSamplingQueryProjection(this.optCtx, leafInput, i, primaryKey);
                 double predicateCardinality = stats.findPredicateCardinality(result, true);
 
-                double projectedSize;
+                double sizeVarsFromDisk;
+                double sizeVarsAfterScan;
+
                 if (predicateCardinality > 0.0) { // otherwise, we get nulls for the averages
-                    projectedSize = stats.findProjectedSize(result);
+                    sizeVarsFromDisk = stats.findSizeVarsFromDisk(result, jn.getNumVarsFromDisk());
+                    sizeVarsAfterScan = stats.findSizeVarsAfterScan(result, jn.getNumVarsFromDisk());
                 } else { // in case we did not get any tuples from the sample, get the size by setting the predicate to true.
                     ILogicalExpression saveExpr = selop.getCondition().getValue();
                     selop.getCondition().setValue(ConstantExpression.TRUE);
-                    result = stats.runSamplingQueryProjection(this.optCtx, leafInput);
+                    result = stats.runSamplingQueryProjection(this.optCtx, leafInput, i, primaryKey);
                     double x = stats.findPredicateCardinality(result, true);
                     // better to check if x is 0
                     if (x == 0.0) {
-                        int fields = stats.numberOfFields(result);
-                        projectedSize = fields * 100; // cant think of anything better... cards are more important anyway
+                        sizeVarsFromDisk = jn.getNumVarsFromDisk() * 100;
+                        sizeVarsAfterScan = jn.getNumVarsAfterScan() * 100; // cant think of anything better... cards are more important anyway
                     } else {
-                        projectedSize = stats.findProjectedSize(result);
+                        sizeVarsFromDisk = stats.findSizeVarsFromDisk(result, jn.getNumVarsFromDisk());
+                        sizeVarsAfterScan = stats.findSizeVarsAfterScan(result, jn.getNumVarsFromDisk());
                     }
                     selop.getCondition().setValue(saveExpr); // restore the expression
                 }
@@ -952,7 +970,9 @@
                 if (jn.getCardinality() == jn.getOrigCardinality()) { // this means there was no selectivity hint provided
                     jn.setCardinality(finalDatasetCard);
                 }
-                jn.setAvgDocSize(projectedSize);
+                jn.setSizeVarsFromDisk(sizeVarsFromDisk);
+                jn.setSizeVarsAfterScan(sizeVarsAfterScan);
+                jn.setAvgDocSize(idxDetails.getSourceAvgItemSize());
             }
             dataScanPlan = jn.addSingleDatasetPlans();
             if (dataScanPlan == PlanNode.NO_PLAN) {
@@ -1075,6 +1095,7 @@
 
                 // replace the dataScanSourceOperator with the sampling source
                 SampleDataSource sampledatasource = getSampleDataSource(scanOp);
+
                 DataSourceScanOperator deepCopyofScan =
                         (DataSourceScanOperator) OperatorManipulationUtil.bottomUpCopyOperators(scanOp);
                 deepCopyofScan.setDataSource(sampledatasource);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
index 48d88a9..e8d96b3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
@@ -79,7 +79,10 @@
     private ICost cheapestPlanCost;
     protected double origCardinality; // without any selections
     protected double cardinality;
-    protected double size;
+    protected double size; // avg size of whole document; available from the sample
+    protected double diskProjectionSize; // what is coming out of the disk; in case of row format, it is the entire document
+                                         // in case of columnar we need to add sizes of individual fields.
+    protected double projectionSizeAfterScan; // excludes fields only used for selections
     protected double distinctCardinality; // estimated distinct cardinality for this joinNode
     protected List<Integer> planIndexesArray; // indexes into the PlanNode array in enumerateJoins
     protected int jnIndex;
@@ -97,6 +100,11 @@
     // The triple above is : Index, selectivity, and the index expression
     protected static int NO_JN = -1;
     private static int NO_CARDS = -2;
+    private int numVarsFromDisk = -1; // number of variables projected from disk
+    private int numVarsAfterScan = -1; // number of variables after all selection fields have been removed and are needed for joins and final projects
+    private double sizeVarsFromDisk = -1.0;
+    private double sizeVarsAfterScan = -1.0;
+    private boolean columnar = true; // default
 
     private JoinNode(int i) {
         this.jnArrayIndex = i;
@@ -137,10 +145,14 @@
         origCardinality = Math.max(card, Cost.MIN_CARD);
     }
 
-    protected void setAvgDocSize(double avgDocSize) {
+    public void setAvgDocSize(double avgDocSize) {
         size = avgDocSize;
     }
 
+    public double getAvgDocSize() {
+        return size;
+    }
+
     public void setLimitVal(int val) {
         limitVal = val;
     }
@@ -150,11 +162,11 @@
     }
 
     public double getInputSize() {
-        return size;
+        return sizeVarsFromDisk;
     }
 
     public double getOutputSize() {
-        return size; // need to change this to account for projections
+        return sizeVarsAfterScan;
     }
 
     public JoinNode getLeftJn() {
@@ -177,6 +189,46 @@
         return idxDetails;
     }
 
+    public void setNumVarsFromDisk(int num) {
+        numVarsFromDisk = num;
+    }
+
+    public void setNumVarsAfterScan(int num) {
+        numVarsAfterScan = num;
+    }
+
+    public void setSizeVarsFromDisk(double size) {
+        sizeVarsFromDisk = size;
+    }
+
+    public void setSizeVarsAfterScan(double size) {
+        sizeVarsAfterScan = size;
+    }
+
+    public int getNumVarsFromDisk() {
+        return numVarsFromDisk;
+    }
+
+    public int getNumVarsAfterScan() {
+        return numVarsAfterScan;
+    }
+
+    public double getSizeVarsFromDisk() {
+        return sizeVarsFromDisk;
+    }
+
+    public double getSizeVarsAfterScan() {
+        return sizeVarsAfterScan;
+    }
+
+    public void setColumnar(boolean format) {
+        columnar = format;
+    }
+
+    public boolean getColumnar() {
+        return columnar;
+    }
+
     private boolean nestedLoopsApplicable(ILogicalExpression joinExpr) throws AlgebricksException {
 
         List<LogicalVariable> usedVarList = new ArrayList<>();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
index b6bcd43..f3d89c4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
@@ -553,13 +553,26 @@
         return record.numberOfFields();
     }
 
-    // Can have null returned, so this routine should only be called if at least tuple is returned by the sample
-    public double findProjectedSize(List<List<IAObject>> result) {
+    public double findSizeVarsFromDisk(List<List<IAObject>> result, int numDiskVars) {
         ARecord record = (ARecord) (((IAObject) ((List<IAObject>) (result.get(0))).get(0)));
         // Now figure out the projected size
         double projectedSize = 0.0;
         int fields = record.numberOfFields();
-        for (int j = 1; j < fields; j++) {
+        for (int j = 1; j <= numDiskVars; j++) {
+            IAObject field = record.getValueByPos(j);
+            double size = ((double) ((ADouble) field).getDoubleValue());
+            projectedSize += size;
+        }
+        return projectedSize;
+    }
+
+    // Can have null returned, so this routine should only be called if at least tuple is returned by the sample
+    public double findSizeVarsAfterScan(List<List<IAObject>> result, int numDiskVars) {
+        ARecord record = (ARecord) (((IAObject) ((List<IAObject>) (result.get(0))).get(0)));
+        // Now figure out the projected size
+        double projectedSize = 0.0;
+        int fields = record.numberOfFields();
+        for (int j = 1 + numDiskVars; j < fields; j++) { // must skip the disk vars
             IAObject field = record.getValueByPos(j);
             double size = ((double) ((ADouble) field).getDoubleValue());
             projectedSize += size;
@@ -603,8 +616,8 @@
     }
 
     // This one gets the cardinality and also projection sizes
-    protected List<List<IAObject>> runSamplingQueryProjection(IOptimizationContext ctx, ILogicalOperator logOp)
-            throws AlgebricksException {
+    protected List<List<IAObject>> runSamplingQueryProjection(IOptimizationContext ctx, ILogicalOperator logOp,
+            int dataset, LogicalVariable primaryKey) throws AlgebricksException {
         LOGGER.info("***running sample query***");
 
         IOptimizationContext newCtx = ctx.getOptimizationContextFactory().cloneOptimizationContext(ctx);
@@ -619,9 +632,21 @@
         // add the assign [$$56, ..., ] <- [encoded-size($$67), ..., ] on top of newAggOp
         List<LogicalVariable> vars1 = new ArrayList<>();
         VariableUtilities.getLiveVariables(logOp, vars1); // all the variables in the leafInput
+        // Depending on the order here. Assuming the first three variables are from the data scan operator.
+        if (!joinEnum.resultAndJoinVars.contains(primaryKey)) { // if the entire row is not being projected, we must remove $$p
+            vars1.remove(primaryKey);
+        }
+
         List<LogicalVariable> vars3 = // these variables can be thrown away as they are not present joins and in the final project
                 new ArrayList<>(CollectionUtils.subtract(vars1, joinEnum.resultAndJoinVars /* vars2 */));
-        List<LogicalVariable> vars = new ArrayList<>(CollectionUtils.subtract(vars1, vars3)); // variables that will flow up the tree
+        List<LogicalVariable> vars4 = new ArrayList<>(CollectionUtils.subtract(vars1, vars3)); // variables that will flow up the tree
+
+        List<LogicalVariable> vars = new ArrayList<>();
+        vars.addAll(vars1);
+        vars.addAll(vars4); // doing a union all; duplicates must not be removed
+        //vars1 is what comes out of the disk
+        joinEnum.jnArray[dataset].setNumVarsFromDisk(vars1.size());
+        joinEnum.jnArray[dataset].setNumVarsAfterScan(vars4.size()); // Is this used? check.
 
         LogicalVariable newVar;
         // array to keep track of the assigns
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/cardinality-estimation/join-queries/join-queries.8.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/cardinality-estimation/join-queries/join-queries.8.plan
index aa774b6..7db992d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/cardinality-estimation/join-queries/join-queries.8.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/cardinality-estimation/join-queries/join-queries.8.plan
@@ -46,30 +46,12 @@
                                   -- BTREE_SEARCH  |PARTITIONED|
                                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                                     -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                      project ([$$124, $$120, $$128]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      project ([$$120, $$128, $$124]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                                       -- STREAM_PROJECT  |PARTITIONED|
                                         exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                           join (eq($$123, $$136)) [cardinality: 248.35, op-cost: 398.35, total-cost: 2821.71]
-                                          -- HYBRID_HASH_JOIN [$$136][$$123]  |PARTITIONED|
-                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                            -- HASH_PARTITION_EXCHANGE [$$136]  |PARTITIONED|
-                                              project ([$$124, $$136]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                select (and(lt($$121, "1994-01-01"), ge($$121, "1993-01-01"))) [cardinality: 248.35, op-cost: 0.0, total-cost: 1500.0]
-                                                -- STREAM_SELECT  |PARTITIONED|
-                                                  project ([$$124, $$136, $$121]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                    assign [$$136, $$121] <- [$$o.getField(1), $$o.getField(4)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                    -- ASSIGN  |PARTITIONED|
-                                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        data-scan []<-[$$124, $$o] <- tpch.Orders [cardinality: 1500.0, op-cost: 1500.0, total-cost: 1500.0]
-                                                        -- DATASOURCE_SCAN  |PARTITIONED|
-                                                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                          -- HYBRID_HASH_JOIN [$$123][$$136]  |PARTITIONED|
                                             exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                                             -- HASH_PARTITION_EXCHANGE [$$123]  |PARTITIONED|
                                               project ([$$120, $$128, $$123]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
@@ -106,6 +88,24 @@
                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                 empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                                                                 -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- HASH_PARTITION_EXCHANGE [$$136]  |PARTITIONED|
+                                              project ([$$124, $$136]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                select (and(lt($$121, "1994-01-01"), ge($$121, "1993-01-01"))) [cardinality: 248.35, op-cost: 0.0, total-cost: 1500.0]
+                                                -- STREAM_SELECT  |PARTITIONED|
+                                                  project ([$$124, $$136, $$121]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    assign [$$136, $$121] <- [$$o.getField(1), $$o.getField(4)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        data-scan []<-[$$124, $$o] <- tpch.Orders [cardinality: 1500.0, op-cost: 1500.0, total-cost: 1500.0]
+                                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                         exchange [cardinality: 10.0, op-cost: 40.0, total-cost: 50.0]
                         -- BROADCAST_EXCHANGE  |PARTITIONED|
                           project ([$$130, $$127]) [cardinality: 10.0, op-cost: 0.0, total-cost: 10.0]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/field-access-pushdown/field-access-pushdown.008.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/field-access-pushdown/field-access-pushdown.008.plan
index 640f9b7..d1f713d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/field-access-pushdown/field-access-pushdown.008.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/field-access-pushdown/field-access-pushdown.008.plan
@@ -19,21 +19,7 @@
                   exchange [cardinality: 8.0, op-cost: 0.0, total-cost: 45.0]
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                     join (eq($$33, $$34)) [cardinality: 8.0, op-cost: 15.0, total-cost: 45.0]
-                    -- HYBRID_HASH_JOIN [$$33][$$34]  |PARTITIONED|
-                      exchange [cardinality: 7.0, op-cost: 7.0, total-cost: 14.0]
-                      -- HASH_PARTITION_EXCHANGE [$$33]  |PARTITIONED|
-                        project ([$$38, $$33]) [cardinality: 7.0, op-cost: 0.0, total-cost: 7.0]
-                        -- STREAM_PROJECT  |PARTITIONED|
-                          assign [$$38] <- [$$p1.getField("age")] [cardinality: 7.0, op-cost: 0.0, total-cost: 7.0]
-                          -- ASSIGN  |PARTITIONED|
-                            exchange [cardinality: 7.0, op-cost: 7.0, total-cost: 14.0]
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              data-scan []<-[$$33, $$p1] <- test.ColumnDataset1 project ({age:any}) [cardinality: 7.0, op-cost: 7.0, total-cost: 7.0]
-                              -- DATASOURCE_SCAN  |PARTITIONED|
-                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                    -- HYBRID_HASH_JOIN [$$34][$$33]  |PARTITIONED|
                       exchange [cardinality: 8.0, op-cost: 8.0, total-cost: 16.0]
                       -- HASH_PARTITION_EXCHANGE [$$34]  |PARTITIONED|
                         project ([$$39, $$34]) [cardinality: 8.0, op-cost: 0.0, total-cost: 8.0]
@@ -48,3 +34,17 @@
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                   empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                                   -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                      exchange [cardinality: 7.0, op-cost: 7.0, total-cost: 14.0]
+                      -- HASH_PARTITION_EXCHANGE [$$33]  |PARTITIONED|
+                        project ([$$38, $$33]) [cardinality: 7.0, op-cost: 0.0, total-cost: 7.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          assign [$$38] <- [$$p1.getField("age")] [cardinality: 7.0, op-cost: 0.0, total-cost: 7.0]
+                          -- ASSIGN  |PARTITIONED|
+                            exchange [cardinality: 7.0, op-cost: 7.0, total-cost: 14.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$33, $$p1] <- test.ColumnDataset1 project ({age:any}) [cardinality: 7.0, op-cost: 7.0, total-cost: 7.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|