[ASTERIXDB-3303][COMP] Projection Sizes Phase 2
Change-Id: Ia31e20ad92ad35598062649815e8f5c7e9695dbf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17985
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Vijay Sarathy <vijay.sarathy@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
index d9174d8..e748e55 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinEnum.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.annotations.IndexedNLJoinExpressionAnnotation;
import org.apache.asterix.common.annotations.SecondaryIndexSearchPreferenceAnnotation;
+import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.metadata.declared.DataSource;
@@ -801,6 +802,8 @@
}
jn.setOrigCardinality(idxDetails.getSourceCardinality());
jn.setAvgDocSize(idxDetails.getSourceAvgItemSize());
+ jn.setSizeVarsFromDisk(10); // dummy value
+ jn.setSizeVarsAfterScan(10); // dummy value
}
// multiply by the respective predicate selectivities
jn.setCardinality(jn.origCardinality * stats.getSelectivity(leafInput, false));
@@ -903,33 +906,48 @@
LOGGER.trace(viewPlan);
}
+ // find if row or columnar format
+ DatasetDataSource dds = (DatasetDataSource) scanOp.getDataSource();
+ if (dds.getDataset().getDatasetFormatInfo().getFormat() == DatasetConfig.DatasetFormat.ROW) {
+ jn.setColumnar(false);
+ }
+
SampleDataSource sampledatasource = getSampleDataSource(scanOp);
DataSourceScanOperator deepCopyofScan =
(DataSourceScanOperator) OperatorManipulationUtil.bottomUpCopyOperators(scanOp);
deepCopyofScan.setDataSource(sampledatasource);
-
+ LogicalVariable primaryKey;
+ if (deepCopyofScan.getVariables().size() > 1) {
+ primaryKey = deepCopyofScan.getVariables().get(1);
+ } else {
+ primaryKey = deepCopyofScan.getVariables().get(0);
+ }
// if there is only one conjunct, I do not have to call the sampling query during index selection!
// insert this in place of the scandatasourceOp.
parent.getInputs().get(0).setValue(deepCopyofScan);
// There are predicates here. So skip the predicates and get the original dataset card.
// Now apply all the predicates and get the card after all predicates are applied.
- result = stats.runSamplingQueryProjection(this.optCtx, leafInput);
+ result = stats.runSamplingQueryProjection(this.optCtx, leafInput, i, primaryKey);
double predicateCardinality = stats.findPredicateCardinality(result, true);
- double projectedSize;
+ double sizeVarsFromDisk;
+ double sizeVarsAfterScan;
+
if (predicateCardinality > 0.0) { // otherwise, we get nulls for the averages
- projectedSize = stats.findProjectedSize(result);
+ sizeVarsFromDisk = stats.findSizeVarsFromDisk(result, jn.getNumVarsFromDisk());
+ sizeVarsAfterScan = stats.findSizeVarsAfterScan(result, jn.getNumVarsFromDisk());
} else { // in case we did not get any tuples from the sample, get the size by setting the predicate to true.
ILogicalExpression saveExpr = selop.getCondition().getValue();
selop.getCondition().setValue(ConstantExpression.TRUE);
- result = stats.runSamplingQueryProjection(this.optCtx, leafInput);
+ result = stats.runSamplingQueryProjection(this.optCtx, leafInput, i, primaryKey);
double x = stats.findPredicateCardinality(result, true);
// better to check if x is 0
if (x == 0.0) {
- int fields = stats.numberOfFields(result);
- projectedSize = fields * 100; // cant think of anything better... cards are more important anyway
+ sizeVarsFromDisk = jn.getNumVarsFromDisk() * 100;
+ sizeVarsAfterScan = jn.getNumVarsAfterScan() * 100; // cant think of anything better... cards are more important anyway
} else {
- projectedSize = stats.findProjectedSize(result);
+ sizeVarsFromDisk = stats.findSizeVarsFromDisk(result, jn.getNumVarsFromDisk());
+ sizeVarsAfterScan = stats.findSizeVarsAfterScan(result, jn.getNumVarsFromDisk());
}
selop.getCondition().setValue(saveExpr); // restore the expression
}
@@ -952,7 +970,9 @@
if (jn.getCardinality() == jn.getOrigCardinality()) { // this means there was no selectivity hint provided
jn.setCardinality(finalDatasetCard);
}
- jn.setAvgDocSize(projectedSize);
+ jn.setSizeVarsFromDisk(sizeVarsFromDisk);
+ jn.setSizeVarsAfterScan(sizeVarsAfterScan);
+ jn.setAvgDocSize(idxDetails.getSourceAvgItemSize());
}
dataScanPlan = jn.addSingleDatasetPlans();
if (dataScanPlan == PlanNode.NO_PLAN) {
@@ -1075,6 +1095,7 @@
// replace the dataScanSourceOperator with the sampling source
SampleDataSource sampledatasource = getSampleDataSource(scanOp);
+
DataSourceScanOperator deepCopyofScan =
(DataSourceScanOperator) OperatorManipulationUtil.bottomUpCopyOperators(scanOp);
deepCopyofScan.setDataSource(sampledatasource);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
index 48d88a9..e8d96b3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/JoinNode.java
@@ -79,7 +79,10 @@
private ICost cheapestPlanCost;
protected double origCardinality; // without any selections
protected double cardinality;
- protected double size;
+ protected double size; // avg size of whole document; available from the sample
+ protected double diskProjectionSize; // what is coming out of the disk; in case of row format, it is the entire document
+ // in case of columnar we need to add sizes of individual fields.
+ protected double projectionSizeAfterScan; // excludes fields only used for selections
protected double distinctCardinality; // estimated distinct cardinality for this joinNode
protected List<Integer> planIndexesArray; // indexes into the PlanNode array in enumerateJoins
protected int jnIndex;
@@ -97,6 +100,11 @@
// The triple above is : Index, selectivity, and the index expression
protected static int NO_JN = -1;
private static int NO_CARDS = -2;
+ private int numVarsFromDisk = -1; // number of variables projected from disk
+ private int numVarsAfterScan = -1; // number of variables after all selection fields have been removed and are needed for joins and final projects
+ private double sizeVarsFromDisk = -1.0;
+ private double sizeVarsAfterScan = -1.0;
+ private boolean columnar = true; // default
private JoinNode(int i) {
this.jnArrayIndex = i;
@@ -137,10 +145,14 @@
origCardinality = Math.max(card, Cost.MIN_CARD);
}
- protected void setAvgDocSize(double avgDocSize) {
+ public void setAvgDocSize(double avgDocSize) {
size = avgDocSize;
}
+ public double getAvgDocSize() {
+ return size;
+ }
+
public void setLimitVal(int val) {
limitVal = val;
}
@@ -150,11 +162,11 @@
}
public double getInputSize() {
- return size;
+ return sizeVarsFromDisk;
}
public double getOutputSize() {
- return size; // need to change this to account for projections
+ return sizeVarsAfterScan;
}
public JoinNode getLeftJn() {
@@ -177,6 +189,46 @@
return idxDetails;
}
+ public void setNumVarsFromDisk(int num) {
+ numVarsFromDisk = num;
+ }
+
+ public void setNumVarsAfterScan(int num) {
+ numVarsAfterScan = num;
+ }
+
+ public void setSizeVarsFromDisk(double size) {
+ sizeVarsFromDisk = size;
+ }
+
+ public void setSizeVarsAfterScan(double size) {
+ sizeVarsAfterScan = size;
+ }
+
+ public int getNumVarsFromDisk() {
+ return numVarsFromDisk;
+ }
+
+ public int getNumVarsAfterScan() {
+ return numVarsAfterScan;
+ }
+
+ public double getSizeVarsFromDisk() {
+ return sizeVarsFromDisk;
+ }
+
+ public double getSizeVarsAfterScan() {
+ return sizeVarsAfterScan;
+ }
+
+ public void setColumnar(boolean format) {
+ columnar = format;
+ }
+
+ public boolean getColumnar() {
+ return columnar;
+ }
+
private boolean nestedLoopsApplicable(ILogicalExpression joinExpr) throws AlgebricksException {
List<LogicalVariable> usedVarList = new ArrayList<>();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
index b6bcd43..f3d89c4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/Stats.java
@@ -553,13 +553,26 @@
return record.numberOfFields();
}
- // Can have null returned, so this routine should only be called if at least tuple is returned by the sample
- public double findProjectedSize(List<List<IAObject>> result) {
+ public double findSizeVarsFromDisk(List<List<IAObject>> result, int numDiskVars) {
ARecord record = (ARecord) (((IAObject) ((List<IAObject>) (result.get(0))).get(0)));
// Now figure out the projected size
double projectedSize = 0.0;
int fields = record.numberOfFields();
- for (int j = 1; j < fields; j++) {
+ for (int j = 1; j <= numDiskVars; j++) {
+ IAObject field = record.getValueByPos(j);
+ double size = ((double) ((ADouble) field).getDoubleValue());
+ projectedSize += size;
+ }
+ return projectedSize;
+ }
+
+ // Can have null returned, so this routine should only be called if at least tuple is returned by the sample
+ public double findSizeVarsAfterScan(List<List<IAObject>> result, int numDiskVars) {
+ ARecord record = (ARecord) (((IAObject) ((List<IAObject>) (result.get(0))).get(0)));
+ // Now figure out the projected size
+ double projectedSize = 0.0;
+ int fields = record.numberOfFields();
+ for (int j = 1 + numDiskVars; j < fields; j++) { // must skip the disk vars
IAObject field = record.getValueByPos(j);
double size = ((double) ((ADouble) field).getDoubleValue());
projectedSize += size;
@@ -603,8 +616,8 @@
}
// This one gets the cardinality and also projection sizes
- protected List<List<IAObject>> runSamplingQueryProjection(IOptimizationContext ctx, ILogicalOperator logOp)
- throws AlgebricksException {
+ protected List<List<IAObject>> runSamplingQueryProjection(IOptimizationContext ctx, ILogicalOperator logOp,
+ int dataset, LogicalVariable primaryKey) throws AlgebricksException {
LOGGER.info("***running sample query***");
IOptimizationContext newCtx = ctx.getOptimizationContextFactory().cloneOptimizationContext(ctx);
@@ -619,9 +632,21 @@
// add the assign [$$56, ..., ] <- [encoded-size($$67), ..., ] on top of newAggOp
List<LogicalVariable> vars1 = new ArrayList<>();
VariableUtilities.getLiveVariables(logOp, vars1); // all the variables in the leafInput
+ // Depending on the order here. Assuming the first three variables are from the data scan operator.
+ if (!joinEnum.resultAndJoinVars.contains(primaryKey)) { // if the entire row is not being projected, we must remove $$p
+ vars1.remove(primaryKey);
+ }
+
List<LogicalVariable> vars3 = // these variables can be thrown away as they are not present joins and in the final project
new ArrayList<>(CollectionUtils.subtract(vars1, joinEnum.resultAndJoinVars /* vars2 */));
- List<LogicalVariable> vars = new ArrayList<>(CollectionUtils.subtract(vars1, vars3)); // variables that will flow up the tree
+ List<LogicalVariable> vars4 = new ArrayList<>(CollectionUtils.subtract(vars1, vars3)); // variables that will flow up the tree
+
+ List<LogicalVariable> vars = new ArrayList<>();
+ vars.addAll(vars1);
+ vars.addAll(vars4); // doing a union all; duplicates must not be removed
+ //vars1 is what comes out of the disk
+ joinEnum.jnArray[dataset].setNumVarsFromDisk(vars1.size());
+ joinEnum.jnArray[dataset].setNumVarsAfterScan(vars4.size()); // Is this used? check.
LogicalVariable newVar;
// array to keep track of the assigns
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/cardinality-estimation/join-queries/join-queries.8.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/cardinality-estimation/join-queries/join-queries.8.plan
index aa774b6..7db992d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/cardinality-estimation/join-queries/join-queries.8.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/cardinality-estimation/join-queries/join-queries.8.plan
@@ -46,30 +46,12 @@
-- BTREE_SEARCH |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- BROADCAST_EXCHANGE |PARTITIONED|
- project ([$$124, $$120, $$128]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ project ([$$120, $$128, $$124]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- STREAM_PROJECT |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
join (eq($$123, $$136)) [cardinality: 248.35, op-cost: 398.35, total-cost: 2821.71]
- -- HYBRID_HASH_JOIN [$$136][$$123] |PARTITIONED|
- exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
- -- HASH_PARTITION_EXCHANGE [$$136] |PARTITIONED|
- project ([$$124, $$136]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
- -- STREAM_PROJECT |PARTITIONED|
- select (and(lt($$121, "1994-01-01"), ge($$121, "1993-01-01"))) [cardinality: 248.35, op-cost: 0.0, total-cost: 1500.0]
- -- STREAM_SELECT |PARTITIONED|
- project ([$$124, $$136, $$121]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
- -- STREAM_PROJECT |PARTITIONED|
- assign [$$136, $$121] <- [$$o.getField(1), $$o.getField(4)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
- -- ASSIGN |PARTITIONED|
- exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$124, $$o] <- tpch.Orders [cardinality: 1500.0, op-cost: 1500.0, total-cost: 1500.0]
- -- DATASOURCE_SCAN |PARTITIONED|
- exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$123][$$136] |PARTITIONED|
exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- HASH_PARTITION_EXCHANGE [$$123] |PARTITIONED|
project ([$$120, $$128, $$123]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
@@ -106,6 +88,24 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- HASH_PARTITION_EXCHANGE [$$136] |PARTITIONED|
+ project ([$$124, $$136]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ select (and(lt($$121, "1994-01-01"), ge($$121, "1993-01-01"))) [cardinality: 248.35, op-cost: 0.0, total-cost: 1500.0]
+ -- STREAM_SELECT |PARTITIONED|
+ project ([$$124, $$136, $$121]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$136, $$121] <- [$$o.getField(1), $$o.getField(4)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$124, $$o] <- tpch.Orders [cardinality: 1500.0, op-cost: 1500.0, total-cost: 1500.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
exchange [cardinality: 10.0, op-cost: 40.0, total-cost: 50.0]
-- BROADCAST_EXCHANGE |PARTITIONED|
project ([$$130, $$127]) [cardinality: 10.0, op-cost: 0.0, total-cost: 10.0]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/field-access-pushdown/field-access-pushdown.008.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/field-access-pushdown/field-access-pushdown.008.plan
index 640f9b7..d1f713d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/field-access-pushdown/field-access-pushdown.008.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/field-access-pushdown/field-access-pushdown.008.plan
@@ -19,21 +19,7 @@
exchange [cardinality: 8.0, op-cost: 0.0, total-cost: 45.0]
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
join (eq($$33, $$34)) [cardinality: 8.0, op-cost: 15.0, total-cost: 45.0]
- -- HYBRID_HASH_JOIN [$$33][$$34] |PARTITIONED|
- exchange [cardinality: 7.0, op-cost: 7.0, total-cost: 14.0]
- -- HASH_PARTITION_EXCHANGE [$$33] |PARTITIONED|
- project ([$$38, $$33]) [cardinality: 7.0, op-cost: 0.0, total-cost: 7.0]
- -- STREAM_PROJECT |PARTITIONED|
- assign [$$38] <- [$$p1.getField("age")] [cardinality: 7.0, op-cost: 0.0, total-cost: 7.0]
- -- ASSIGN |PARTITIONED|
- exchange [cardinality: 7.0, op-cost: 7.0, total-cost: 14.0]
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$33, $$p1] <- test.ColumnDataset1 project ({age:any}) [cardinality: 7.0, op-cost: 7.0, total-cost: 7.0]
- -- DATASOURCE_SCAN |PARTITIONED|
- exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$34][$$33] |PARTITIONED|
exchange [cardinality: 8.0, op-cost: 8.0, total-cost: 16.0]
-- HASH_PARTITION_EXCHANGE [$$34] |PARTITIONED|
project ([$$39, $$34]) [cardinality: 8.0, op-cost: 0.0, total-cost: 8.0]
@@ -48,3 +34,17 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ exchange [cardinality: 7.0, op-cost: 7.0, total-cost: 14.0]
+ -- HASH_PARTITION_EXCHANGE [$$33] |PARTITIONED|
+ project ([$$38, $$33]) [cardinality: 7.0, op-cost: 0.0, total-cost: 7.0]
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$38] <- [$$p1.getField("age")] [cardinality: 7.0, op-cost: 0.0, total-cost: 7.0]
+ -- ASSIGN |PARTITIONED|
+ exchange [cardinality: 7.0, op-cost: 7.0, total-cost: 14.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$33, $$p1] <- test.ColumnDataset1 project ({age:any}) [cardinality: 7.0, op-cost: 7.0, total-cost: 7.0]
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|