[ASTERIXDB-2933][COMP][EXT] Pushdowns Part2: Pushdown Rule
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Add a rule that computes the expected schema and sets it
to the DataSourceScanOperator. The computed schema is then
passed to the Parquet Reader to 'clip' the Parquet file's
schema. The resulting clipped schema is then used to tell
the reader what column should be read.
Interface changes:
- Change IProjectionInfo<List<T>> to IProjectionInfo<T>
Change-Id: If0c0d05473be72df6f08dfcbab2d25c36c71368e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12964
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wael Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessToExternalDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessToExternalDataScanRule.java
index f25e058..e0bd22e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessToExternalDataScanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessToExternalDataScanRule.java
@@ -52,30 +52,10 @@
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
/**
- * Pushes field-access expression to the external dataset scan to minimize the size of the record.
- * This rule currently does not remove the field access expression in ASSIGN and SCAN operators. Instead,
- * it adds the requested field names to external dataset details to produce records that only contain the requested
- * fields. Thus, no changes would occur in the plan's structure after firing this rule.
- * Example:
- * Before plan:
- * ...
- * select (and(gt($$00, 20), gt($$r.getField("salary"), 70000)))
- * ...
- * assign [$$00] <- [$$r.getField("personalInfo").getField("age")]
- * ...
- * data-scan []<-[$$r] <- ParquetDataverse.ParquetDataset
- * <p>
- * After plan:
- * ...
- * select (and(gt($$00, 20), gt($$r.getField("salary"), 70000)))
- * ...
- * assign [$$00] <- [$$r.getField("personalInfo").getField("age")]
- * ...
- * data-scan []<-[$$r] <- ParquetDataverse.ParquetDataset project (personalInfo.age, salary)
- * <p>
- * The resulting record $$r will be {"personalInfo":{"age": *AGE*}, "salary": *SALARY*}
- * and other fields will not be included in $$r.
+ * TODO Use {@link PushValueAccessToExternalDataScanRule}
+ * Will be removed in a follow up change
*/
+@Deprecated
public class PushFieldAccessToExternalDataScanRule implements IAlgebraicRewriteRule {
//Datasets payload variables
private final List<LogicalVariable> recordVariables = new ArrayList<>();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessToExternalDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessToExternalDataScanRule.java
new file mode 100644
index 0000000..405e2bd
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessToExternalDataScanRule.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules;
+
+import java.util.Set;
+
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.optimizer.base.AsterixOptimizationContext;
+import org.apache.asterix.optimizer.rules.pushdown.OperatorValueAccessPushdownVisitor;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+
+/**
+ * Pushes value access expressions to the external dataset scan to minimize the size of the record.
+ * This rule currently does not remove the value access expression. Instead, it adds the requested field names to
+ * external dataset details to produce records that only contain the requested values. Thus, no changes would occur
+ * to the plan's structure after firing this rule.
+ * Example:
+ * Before plan:
+ * ...
+ * select (and(gt($$00, 20), gt($$r.getField("salary"), 70000)))
+ * ...
+ * assign [$$00] <- [$$r.getField("personalInfo").getField("age")]
+ * ...
+ * data-scan []<-[$$r] <- ParquetDataverse.ParquetDataset
+ * <p>
+ * After plan:
+ * ...
+ * select (and(gt($$00, 20), gt($$r.getField("salary"), 70000)))
+ * ...
+ * assign [$$00] <- [$$r.getField("personalInfo").getField("age")]
+ * ...
+ * data-scan []<-[$$r] <- ParquetDataverse.ParquetDataset project ({personalInfo:{age: VALUE},salary:VALUE})
+ * <p>
+ * The resulting record $$r will be {"personalInfo":{"age": *AGE*}, "salary": *SALARY*}
+ * and other fields will not be included in $$r.
+ */
+public class PushValueAccessToExternalDataScanRule implements IAlgebraicRewriteRule {
+ //Initially, assume we need to run the rule
+ private boolean run = true;
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ if (!context.getPhysicalOptimizationConfig().isExternalFieldPushdown() || !run) {
+ //The rule was fired, or value access pushdown is disabled
+ return false;
+ }
+
+ /*
+ * Only run the rewrite rule once and only if the plan contains a data-scan on an external dataset that
+ * support value access pushdown.
+ */
+ run = shouldRun(context);
+ if (run) {
+ run = false;
+ OperatorValueAccessPushdownVisitor visitor = new OperatorValueAccessPushdownVisitor(context);
+ opRef.getValue().accept(visitor, null);
+ visitor.finish();
+ }
+
+ //This rule does not do any actual structural changes to the plan
+ return false;
+ }
+
+ /**
+ * Check whether the plan contains an external dataset that supports pushdown
+ *
+ * @param context optimization context
+ * @return true if the plan contains such dataset, false otherwise
+ */
+ private boolean shouldRun(IOptimizationContext context) throws AlgebricksException {
+ ObjectSet<Int2ObjectMap.Entry<Set<DataSource>>> entrySet =
+ ((AsterixOptimizationContext) context).getDataSourceMap().int2ObjectEntrySet();
+ MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+ for (Int2ObjectMap.Entry<Set<DataSource>> dataSources : entrySet) {
+ for (DataSource dataSource : dataSources.getValue()) {
+ if (supportPushdown(metadataProvider, dataSource)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean supportPushdown(MetadataProvider metadataProvider, DataSource dataSource)
+ throws AlgebricksException {
+ DataverseName dataverse = dataSource.getId().getDataverseName();
+ String datasetName = dataSource.getId().getDatasourceName();
+ Dataset dataset = metadataProvider.findDataset(dataverse, datasetName);
+
+ return dataset != null && dataset.getDatasetType() == DatasetConfig.DatasetType.EXTERNAL && ExternalDataUtils
+ .supportsPushdown(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaBuilder.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaBuilder.java
new file mode 100644
index 0000000..b7632db
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaBuilder.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown;
+
+import static org.apache.asterix.optimizer.rules.pushdown.ExpressionValueAccessPushdownVisitor.ARRAY_FUNCTIONS;
+import static org.apache.asterix.optimizer.rules.pushdown.ExpressionValueAccessPushdownVisitor.SUPPORTED_FUNCTIONS;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.optimizer.rules.pushdown.schema.AbstractComplexExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ArrayExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ObjectExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.RootExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.UnionExpectedSchemaNode;
+import org.apache.asterix.runtime.projection.DataProjectionInfo;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+
+/**
+ * This class takes a value access expression and produces an expected schema (given the expression).
+ * Example:
+ * - $$t.getField("hashtags").getItem(0)
+ * We expect:
+ * 1- $$t is OBJECT
+ * 2- the output type of getField("hashtags") is ARRAY
+ * 3- the output type of getItem(0) is ANY node
+ */
+class ExpectedSchemaBuilder {
+ //Registered Variables
+ private final Map<LogicalVariable, IExpectedSchemaNode> varToNode;
+ private final ExpectedSchemaNodeToIATypeTranslatorVisitor typeBuilder;
+
+ public ExpectedSchemaBuilder() {
+ varToNode = new HashMap<>();
+ typeBuilder = new ExpectedSchemaNodeToIATypeTranslatorVisitor();
+ }
+
+ public DataProjectionInfo createProjectionInfo(LogicalVariable recordVariable) {
+ IExpectedSchemaNode rootNode = varToNode.get(recordVariable);
+ Map<String, FunctionCallInformation> sourceInformation = new HashMap<>();
+ typeBuilder.reset(sourceInformation);
+ ARecordType recordType = (ARecordType) rootNode.accept(typeBuilder, null);
+ return new DataProjectionInfo(recordType, sourceInformation);
+ }
+
+ public boolean setSchemaFromExpression(AbstractFunctionCallExpression expr, LogicalVariable producedVar) {
+ //Parent always nested
+ AbstractComplexExpectedSchemaNode parent = (AbstractComplexExpectedSchemaNode) buildNestedNode(expr);
+ if (parent != null) {
+ IExpectedSchemaNode leaf =
+ new AnyExpectedSchemaNode(parent, expr.getSourceLocation(), expr.getFunctionIdentifier().getName());
+ addChild(expr, parent, leaf);
+ if (producedVar != null) {
+ //Register the node if a variable is produced
+ varToNode.put(producedVar, leaf);
+ }
+ }
+ return parent != null;
+ }
+
+ public void registerDataset(LogicalVariable recordVar, RootExpectedSchemaNode rootNode) {
+ varToNode.put(recordVar, rootNode);
+ }
+
+ public void unregisterVariable(LogicalVariable variable) {
+ //Remove the node so no other expression will pushdown any expression in the future
+ IExpectedSchemaNode node = varToNode.remove(variable);
+ AbstractComplexExpectedSchemaNode parent = node.getParent();
+ if (parent == null) {
+ //It is a root node. Request the entire record
+ varToNode.put(variable, RootExpectedSchemaNode.ALL_FIELDS_ROOT_NODE);
+ } else {
+ //It is a nested node. Replace the node to a LEAF node
+ node.replaceIfNeeded(ExpectedSchemaNodeType.ANY, parent.getSourceLocation(), parent.getFunctionName());
+ }
+ }
+
+ public boolean isVariableRegistered(LogicalVariable recordVar) {
+ return varToNode.containsKey(recordVar);
+ }
+
+ public boolean containsRegisteredDatasets() {
+ return !varToNode.isEmpty();
+ }
+
+ private IExpectedSchemaNode buildNestedNode(ILogicalExpression expr) {
+ //The current node expression
+ AbstractFunctionCallExpression myExpr = (AbstractFunctionCallExpression) expr;
+ if (!SUPPORTED_FUNCTIONS.contains(myExpr.getFunctionIdentifier())) {
+ //Return null if the function is not supported.
+ return null;
+ }
+
+ //The parent expression
+ ILogicalExpression parentExpr = myExpr.getArguments().get(0).getValue();
+ if (isVariable(parentExpr)) {
+ //A variable could be the record's originated from data-scan or an expression from assign
+ LogicalVariable sourceVar = VariableUtilities.getVariable(parentExpr);
+ return changeNodeForVariable(sourceVar, myExpr);
+ }
+
+ //Recursively create the parent nodes. Parent is always a nested node
+ AbstractComplexExpectedSchemaNode newParent = (AbstractComplexExpectedSchemaNode) buildNestedNode(parentExpr);
+ //newParent could be null if the expression is not supported
+ if (newParent != null) {
+ //Parent expression must be a function call (as parent is a nested node)
+ AbstractFunctionCallExpression parentFuncExpr = (AbstractFunctionCallExpression) parentExpr;
+ //Get 'myType' as we will create the child type of the newParent
+ ExpectedSchemaNodeType myType = getExpectedNestedNodeType(myExpr);
+ /*
+ * Create 'myNode'. It is a nested node because the function is either getField() or supported array
+ * function
+ */
+ AbstractComplexExpectedSchemaNode myNode = AbstractComplexExpectedSchemaNode.createNestedNode(myType,
+ newParent, myExpr.getSourceLocation(), myExpr.getFunctionIdentifier().getName());
+ //Add myNode to the parent
+ addChild(parentFuncExpr, newParent, myNode);
+ return myNode;
+ }
+ return null;
+ }
+
+ private IExpectedSchemaNode changeNodeForVariable(LogicalVariable sourceVar,
+ AbstractFunctionCallExpression myExpr) {
+ //Get the associated node with the sourceVar (if any)
+ IExpectedSchemaNode oldNode = varToNode.get(sourceVar);
+ if (oldNode == null) {
+ //Variable is not associated with a node. No pushdown is possible
+ return null;
+ }
+ //What is the expected type of the variable
+ ExpectedSchemaNodeType varExpectedType = getExpectedNestedNodeType(myExpr);
+ // Get the node associated with the variable (or change its type if needed).
+ IExpectedSchemaNode newNode = oldNode.replaceIfNeeded(varExpectedType, myExpr.getSourceLocation(),
+ myExpr.getFunctionIdentifier().getName());
+ //Map the sourceVar to the node
+ varToNode.put(sourceVar, newNode);
+ return newNode;
+ }
+
+ private void addChild(AbstractFunctionCallExpression parentExpr, AbstractComplexExpectedSchemaNode parent,
+ IExpectedSchemaNode child) {
+ switch (parent.getType()) {
+ case OBJECT:
+ handleObject(parentExpr, parent, child);
+ break;
+ case ARRAY:
+ handleArray(parent, child);
+ break;
+ case UNION:
+ handleUnion(parentExpr, parent, child);
+ break;
+ default:
+ throw new IllegalStateException("Node " + parent.getType() + " is not nested");
+
+ }
+ }
+
+ private void handleObject(AbstractFunctionCallExpression parentExpr, AbstractComplexExpectedSchemaNode parent,
+ IExpectedSchemaNode child) {
+ ObjectExpectedSchemaNode objectNode = (ObjectExpectedSchemaNode) parent;
+ objectNode.addChild(ConstantExpressionUtil.getStringArgument(parentExpr, 1), child);
+ }
+
+ private void handleArray(AbstractComplexExpectedSchemaNode parent, IExpectedSchemaNode child) {
+ ArrayExpectedSchemaNode arrayNode = (ArrayExpectedSchemaNode) parent;
+ arrayNode.addChild(child);
+ }
+
+ private void handleUnion(AbstractFunctionCallExpression parentExpr, AbstractComplexExpectedSchemaNode parent,
+ IExpectedSchemaNode child) {
+ UnionExpectedSchemaNode unionNode = (UnionExpectedSchemaNode) parent;
+ ExpectedSchemaNodeType parentType = getExpectedNestedNodeType(parentExpr);
+ addChild(parentExpr, unionNode.getChild(parentType), child);
+ }
+
+ private static ExpectedSchemaNodeType getExpectedNestedNodeType(AbstractFunctionCallExpression funcExpr) {
+ FunctionIdentifier fid = funcExpr.getFunctionIdentifier();
+ if (BuiltinFunctions.FIELD_ACCESS_BY_NAME.equals(fid)) {
+ return ExpectedSchemaNodeType.OBJECT;
+ } else if (ARRAY_FUNCTIONS.contains(fid)) {
+ return ExpectedSchemaNodeType.ARRAY;
+ }
+ throw new IllegalStateException("Function " + fid + " should not be pushed down");
+ }
+
+ private static boolean isVariable(ILogicalExpression expr) {
+ return expr.getExpressionTag() == LogicalExpressionTag.VARIABLE;
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaNodeToIATypeTranslatorVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
new file mode 100644
index 0000000..c746994
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.AbstractComplexExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ArrayExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNodeVisitor;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ObjectExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.RootExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.UnionExpectedSchemaNode;
+import org.apache.asterix.runtime.projection.DataProjectionInfo;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
+
+/**
+ * This visitor translates the {@link IExpectedSchemaNode} to {@link IAType} record.
+ * The {@link IAType#getTypeName()} is used to map each {@link IAType} to its {@link FunctionCallInformation}
+ */
+class ExpectedSchemaNodeToIATypeTranslatorVisitor implements IExpectedSchemaNodeVisitor<IAType, String> {
+ //Map typeName to source information
+ private Map<String, FunctionCallInformation> sourceInformationMap;
+ //To give a unique name for each type
+ private int counter;
+
+ public void reset(Map<String, FunctionCallInformation> sourceInformationMap) {
+ this.sourceInformationMap = sourceInformationMap;
+ }
+
+ @Override
+ public IAType visit(RootExpectedSchemaNode node, String arg) {
+ if (node.isAllFields()) {
+ return DataProjectionInfo.ALL_FIELDS_TYPE;
+ } else if (node.isEmpty()) {
+ return DataProjectionInfo.EMPTY_TYPE;
+ }
+ return createRecordType(node, String.valueOf(counter++));
+ }
+
+ @Override
+ public IAType visit(ObjectExpectedSchemaNode node, String arg) {
+ IAType recordType = createRecordType(node, arg);
+ sourceInformationMap.put(arg, createFunctionCallInformation(node));
+ return recordType;
+ }
+
+ @Override
+ public IAType visit(ArrayExpectedSchemaNode node, String arg) {
+ IAType itemType = node.getChild().accept(this, String.valueOf(counter++));
+ IAType listType = new AOrderedListType(itemType, arg);
+ sourceInformationMap.put(arg, createFunctionCallInformation(node));
+ return listType;
+ }
+
+ @Override
+ public IAType visit(UnionExpectedSchemaNode node, String arg) {
+ List<IAType> unionTypes = new ArrayList<>();
+ for (Map.Entry<ExpectedSchemaNodeType, AbstractComplexExpectedSchemaNode> child : node.getChildren()) {
+ unionTypes.add(child.getValue().accept(this, String.valueOf(counter++)));
+ }
+ IAType unionType = new AUnionType(unionTypes, arg);
+ sourceInformationMap.put(arg, createFunctionCallInformation(node));
+ return unionType;
+ }
+
+ @Override
+ public IAType visit(AnyExpectedSchemaNode node, String arg) {
+ return BuiltinType.ANY;
+ }
+
+ private ARecordType createRecordType(ObjectExpectedSchemaNode node, String arg) {
+ Set<Map.Entry<String, IExpectedSchemaNode>> children = node.getChildren();
+ String[] childrenFieldNames = new String[children.size()];
+ IAType[] childrenTypes = new IAType[children.size()];
+ int i = 0;
+ for (Map.Entry<String, IExpectedSchemaNode> child : children) {
+ childrenFieldNames[i] = child.getKey();
+ childrenTypes[i++] = child.getValue().accept(this, String.valueOf(counter++));
+ }
+
+ return new ARecordType(arg, childrenFieldNames, childrenTypes, true);
+ }
+
+ private FunctionCallInformation createFunctionCallInformation(IExpectedSchemaNode node) {
+ return new FunctionCallInformation(node.getFunctionName(), node.getSourceLocation());
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpressionValueAccessPushdownVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpressionValueAccessPushdownVisitor.java
new file mode 100644
index 0000000..5616f3f
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpressionValueAccessPushdownVisitor.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+class ExpressionValueAccessPushdownVisitor implements ILogicalExpressionReferenceTransform {
+ //Set of supported type-check functions
+ static final Set<FunctionIdentifier> TYPE_CHECK_FUNCTIONS = createSupportedTypeCheckFunctions();
+ //Set of supported array functions
+ static final Set<FunctionIdentifier> ARRAY_FUNCTIONS = createSupportedArrayFunctions();
+ //Set of supported functions that we can pushdown
+ static final Set<FunctionIdentifier> SUPPORTED_FUNCTIONS = createSupportedFunctions();
+
+ private final ExpectedSchemaBuilder builder;
+ private List<LogicalVariable> producedVariables;
+ private int producedVariableIndex;
+
+ public ExpressionValueAccessPushdownVisitor(ExpectedSchemaBuilder builder) {
+ this.builder = builder;
+ end();
+ }
+
+ public void init(List<LogicalVariable> producedVariables) {
+ this.producedVariables = producedVariables;
+ producedVariableIndex = 0;
+ }
+
+ @Override
+ public boolean transform(Mutable<ILogicalExpression> expression) throws AlgebricksException {
+ if (producedVariableIndex == -1) {
+ //This for ensuring that the produced variables (if any) should be set
+ throw new IllegalStateException("init must be called first");
+ }
+ pushValueAccessExpression(expression, getNextProducedVariable());
+ return false;
+ }
+
+ public void end() {
+ producedVariables = null;
+ producedVariableIndex = -1;
+ }
+
+ private LogicalVariable getNextProducedVariable() {
+ LogicalVariable variable = producedVariables != null ? producedVariables.get(producedVariableIndex) : null;
+ producedVariableIndex++;
+ return variable;
+ }
+
+ /**
+ * Pushdown field access expressions and array access expressions down
+ */
+ private void pushValueAccessExpression(Mutable<ILogicalExpression> exprRef, LogicalVariable producedVar) {
+ final ILogicalExpression expr = exprRef.getValue();
+ if (skipPushdown(expr)) {
+ return;
+ }
+
+ final AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+
+ if (isSuccessfullyPushedDown(funcExpr, producedVar)) {
+ //We successfully pushed down the value access function
+ return;
+ }
+
+ //Check nested arguments if contains any pushable value access
+ pushValueAccessExpressionArg(funcExpr.getArguments());
+ }
+
+ /**
+ * Check if we can pushdown an expression. Also, unregister a variable if we found that a common expression value is
+ * required in its entirety.
+ */
+ private boolean skipPushdown(ILogicalExpression expr) {
+ if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ LogicalVariable variable = VariableUtilities.getVariable(expr);
+ unregisterVariableIfNeeded(variable);
+ return true;
+ }
+ return expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL || !builder.containsRegisteredDatasets()
+ || isTypeCheckOnVariable(expr);
+ }
+
+ /**
+ * If the expression is a type-check function on a variable. We should stop as we do not want to unregister
+ * the variable used by the type-check function.
+ * <p>
+ * Example:
+ * SELECT p.personInfo.name
+ * FROM Person p
+ * WHERE p.personInfo IS NOT MISSING;
+ * <p>
+ * Plan:
+ * ...
+ * assign [$$17] <- [$$18.getField(\"name\")]
+ * select (not(is-missing($$18)))
+ * ...
+ * assign [$$18] <- [$$p.getField(\"personInfo\")]
+ * ...
+ * data-scan []<-[$$p] <- test.ParquetDataset project ({personInfo:{name:VALUE}})
+ * <p>
+ * In this case, is-missing($$18) could unregister $$18 since it requires the entire value (personInfo) and we
+ * won't be able to pushdown the access of (personInfo.name). This check would allow (personInfo.name) to be
+ * pushed down to data scan.
+ *
+ * @param expression expression
+ * @return if the function is a type-check function and has a variable argument.
+ */
+ private boolean isTypeCheckOnVariable(ILogicalExpression expression) {
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expression;
+ return TYPE_CHECK_FUNCTIONS.contains(funcExpr.getFunctionIdentifier())
+ && funcExpr.getArguments().get(0).getValue().getExpressionTag() == LogicalExpressionTag.VARIABLE;
+ }
+
+ private void pushValueAccessExpressionArg(List<Mutable<ILogicalExpression>> exprList) {
+ for (Mutable<ILogicalExpression> exprRef : exprList) {
+ /*
+ * We need to set the produced variable as null here as the produced variable will not correspond to the
+ * nested expression.
+ */
+ pushValueAccessExpression(exprRef, null);
+ }
+ }
+
+ private boolean isSuccessfullyPushedDown(AbstractFunctionCallExpression funcExpr, LogicalVariable producedVar) {
+ return SUPPORTED_FUNCTIONS.contains(funcExpr.getFunctionIdentifier())
+ && builder.setSchemaFromExpression(funcExpr, producedVar);
+ }
+
+ private void unregisterVariableIfNeeded(LogicalVariable variable) {
+ if (builder.isVariableRegistered(variable)) {
+ builder.unregisterVariable(variable);
+ }
+ }
+
+ private static Set<FunctionIdentifier> createSupportedArrayFunctions() {
+ return Set.of(BuiltinFunctions.GET_ITEM, BuiltinFunctions.ARRAY_STAR, BuiltinFunctions.SCAN_COLLECTION);
+ }
+
+ private static Set<FunctionIdentifier> createSupportedFunctions() {
+ Set<FunctionIdentifier> supportedFunctions = new HashSet<>();
+ //For objects, only field-access-by-name is supported
+ supportedFunctions.add(BuiltinFunctions.FIELD_ACCESS_BY_NAME);
+ supportedFunctions.addAll(ARRAY_FUNCTIONS);
+ return supportedFunctions;
+ }
+
+ private static Set<FunctionIdentifier> createSupportedTypeCheckFunctions() {
+ return Set.of(BuiltinFunctions.IS_ARRAY, BuiltinFunctions.IS_OBJECT, BuiltinFunctions.IS_ATOMIC,
+ BuiltinFunctions.IS_NUMBER, BuiltinFunctions.IS_BOOLEAN, BuiltinFunctions.IS_STRING,
+ BuiltinFunctions.IS_MISSING, BuiltinFunctions.IS_NULL, BuiltinFunctions.IS_UNKNOWN);
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/OperatorValueAccessPushdownVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/OperatorValueAccessPushdownVisitor.java
new file mode 100644
index 0000000..6739384
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/OperatorValueAccessPushdownVisitor.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.optimizer.rules.pushdown.schema.RootExpectedSchemaNode;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * This visitor visits the entire plan and tries to build the information of the required values from all dataset
+ */
+public class OperatorValueAccessPushdownVisitor implements ILogicalOperatorVisitor<Void, Void> {
+
+ private final IOptimizationContext context;
+ //Requested schema builder. It is only expected schema not a definite one
+ private final ExpectedSchemaBuilder builder;
+ //To visit every expression in each operator
+ private final ExpressionValueAccessPushdownVisitor pushdownVisitor;
+ //Datasets that allow pushdowns
+ private final Map<LogicalVariable, DataSourceScanOperator> registeredDatasets;
+ //visitedOperators so we do not visit the same operator twice (in case of REPLICATE)
+ private final Set<ILogicalOperator> visitedOperators;
+
+ public OperatorValueAccessPushdownVisitor(IOptimizationContext context) {
+ this.context = context;
+ builder = new ExpectedSchemaBuilder();
+ registeredDatasets = new HashMap<>();
+ pushdownVisitor = new ExpressionValueAccessPushdownVisitor(builder);
+ visitedOperators = new HashSet<>();
+ }
+
+ public void finish() {
+ for (Map.Entry<LogicalVariable, DataSourceScanOperator> scan : registeredDatasets.entrySet()) {
+ scan.getValue().setProjectionInfo(builder.createProjectionInfo(scan.getKey()));
+ }
+ }
+
+ /**
+ * Visit every input of an operator. Then, start pushdown any value expression that the operator has
+ *
+ * @param op the operator to process
+ * @param producedVariables any produced variables by the operator. We only care about the {@link AssignOperator}
+ * and {@link UnnestOperator} variables for now.
+ */
+ private void visitInputs(ILogicalOperator op, List<LogicalVariable> producedVariables) throws AlgebricksException {
+ if (visitedOperators.contains(op)) {
+ return;
+ }
+ for (Mutable<ILogicalOperator> child : op.getInputs()) {
+ child.getValue().accept(this, null);
+ }
+ visitedOperators.add(op);
+ //Initiate the pushdown visitor
+ pushdownVisitor.init(producedVariables);
+ //pushdown any expression the operator has
+ op.acceptExpressionTransform(pushdownVisitor);
+ pushdownVisitor.end();
+ }
+
+ /*
+ * ******************************************************************************
+ * Operators that need to handle special cases
+ * ******************************************************************************
+ */
+
+ @Override
+ public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ if (op.getVariables().isEmpty()) {
+ //If the variables are empty and the next operator is DataSourceScanOperator, then set empty record
+ setEmptyRecord(op.getInputs().get(0).getValue());
+ }
+ return null;
+ }
+
+ /**
+ * From the {@link DataSourceScanOperator}, we need to register the payload variable (record variable) to check
+ * which expression in the plan is using it.
+ */
+ @Override
+ public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ DatasetDataSource datasetDataSource = getDatasetDataSourceIfApplicable(op);
+ if (datasetDataSource != null) {
+ LogicalVariable recordVar = datasetDataSource.getDataRecordVariable(op.getVariables());
+ if (!builder.isVariableRegistered(recordVar)) {
+ /*
+ * This is the first time we see the dataset, and we know we might only need part of the record.
+ * Register the dataset to prepare for value access expression pushdowns.
+ * Initially, we will request the entire record.
+ */
+ builder.registerDataset(recordVar, RootExpectedSchemaNode.ALL_FIELDS_ROOT_NODE);
+ registeredDatasets.put(recordVar, op);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ if (!op.isGlobal() && isCountConstant(op.getExpressions())) {
+ /*
+ * Optimize the SELECT COUNT(*) case
+ * It is local aggregate and has agg-sql-count function with a constant argument. Set empty record if the
+ * input operator is DataSourceScanOperator
+ */
+ setEmptyRecord(op.getInputs().get(0).getValue());
+ }
+ return null;
+ }
+
+ /*
+ * ******************************************************************************
+ * Helper methods
+ * ******************************************************************************
+ */
+
+ /**
+ * The role of this method is:
+ * 1- Check whether the dataset is an external dataset and allows value access pushdowns
+ * 2- return the actual DatasetDataSource
+ */
+ private DatasetDataSource getDatasetDataSourceIfApplicable(DataSourceScanOperator scan) throws AlgebricksException {
+ DataSource dataSource = (DataSource) scan.getDataSource();
+ if (dataSource == null) {
+ return null;
+ }
+
+ MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
+ DataverseName dataverse = dataSource.getId().getDataverseName();
+ String datasetName = dataSource.getId().getDatasourceName();
+ Dataset dataset = mp.findDataset(dataverse, datasetName);
+
+ //Only external dataset can have pushed down expressions
+ if (dataset == null || dataset.getDatasetType() == DatasetConfig.DatasetType.INTERNAL
+ || dataset.getDatasetType() == DatasetConfig.DatasetType.EXTERNAL && !ExternalDataUtils
+ .supportsPushdown(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties())) {
+ return null;
+ }
+
+ return (DatasetDataSource) dataSource;
+ }
+
+ /**
+ * If the inputOp is a {@link DataSourceScanOperator}, then set the projected value needed as empty record
+ *
+ * @param inputOp an operator that is potentially a {@link DataSourceScanOperator}
+ * @see #visitAggregateOperator(AggregateOperator, Void)
+ * @see #visitProjectOperator(ProjectOperator, Void)
+ */
+ private void setEmptyRecord(ILogicalOperator inputOp) throws AlgebricksException {
+ if (inputOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+ DataSourceScanOperator scan = (DataSourceScanOperator) inputOp;
+ DatasetDataSource datasetDataSource = getDatasetDataSourceIfApplicable(scan);
+ if (datasetDataSource != null) {
+ //We know that we only need the count of objects. So return empty objects only
+ LogicalVariable recordVar = datasetDataSource.getDataRecordVariable(scan.getVariables());
+ /*
+ * Set the root node as EMPTY_ROOT_NODE (i.e., no fields will be read from disk). We register the
+ * dataset with EMPTY_ROOT_NODE so that we skip pushdowns on empty node.
+ */
+ builder.registerDataset(recordVar, RootExpectedSchemaNode.EMPTY_ROOT_NODE);
+ }
+ }
+ }
+
+ private boolean isCountConstant(List<Mutable<ILogicalExpression>> expressions) {
+ if (expressions.size() != 1) {
+ return false;
+ }
+ ILogicalExpression expression = expressions.get(0).getValue();
+ if (expression.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ return false;
+ }
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expression;
+ FunctionIdentifier fid = funcExpr.getFunctionIdentifier();
+ return BuiltinFunctions.SQL_COUNT.equals(fid)
+ && funcExpr.getArguments().get(0).getValue().getExpressionTag() == LogicalExpressionTag.CONSTANT;
+ }
+
+ private void visitSubplans(List<ILogicalPlan> nestedPlans) throws AlgebricksException {
+ for (ILogicalPlan plan : nestedPlans) {
+ for (Mutable<ILogicalOperator> root : plan.getRoots()) {
+ visitInputs(root.getValue());
+ }
+ }
+ }
+
+ /*
+ * ******************************************************************************
+ * Pushdown when possible for each operator
+ * ******************************************************************************
+ */
+
+ @Override
+ public Void visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op, op.getVariables());
+ return null;
+ }
+
+ @Override
+ public Void visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ visitSubplans(op.getNestedPlans());
+ return null;
+ }
+
+ @Override
+ public Void visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op, op.getVariables());
+ return null;
+ }
+
+ @Override
+ public Void visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ visitSubplans(op.getNestedPlans());
+ return null;
+ }
+
+ @Override
+ public Void visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
+ throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ visitSubplans(op.getNestedPlans());
+ return null;
+ }
+
+ private void visitInputs(ILogicalOperator op) throws AlgebricksException {
+ visitInputs(op, null);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ExternalDataProjectionInfo.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ExternalDataProjectionInfo.java
index fcbf522..9e8247e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ExternalDataProjectionInfo.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ExternalDataProjectionInfo.java
@@ -26,7 +26,12 @@
import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionInfo;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
-public class ExternalDataProjectionInfo implements IProjectionInfo<List<String>> {
+/**
+ * TODO Use {@link org.apache.asterix.runtime.projection.DataProjectionInfo}
+ * Will be removed in a follow up change
+ */
+@Deprecated
+public class ExternalDataProjectionInfo implements IProjectionInfo<List<List<String>>> {
private final List<List<String>> projectedFieldNames;
public ExternalDataProjectionInfo() {
@@ -47,7 +52,7 @@
}
@Override
- public IProjectionInfo<List<String>> createCopy() {
+ public IProjectionInfo<List<List<String>>> createCopy() {
return new ExternalDataProjectionInfo(projectedFieldNames);
}
@@ -61,6 +66,7 @@
&& VariableUtilities.varListEqualUnordered(projectedFieldNames, otherProjectedFieldNames);
}
+ @Override
public String toString() {
if (projectedFieldNames.isEmpty()) {
return "";
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/DataProjectionInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/DataProjectionInfo.java
new file mode 100644
index 0000000..de402ec
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/DataProjectionInfo.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.projection;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.visitor.SimpleStringBuilderForIATypeVisitor;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionInfo;
+
+public class DataProjectionInfo implements IProjectionInfo<ARecordType> {
+ //Default open record type when requesting the entire fields
+ public static final ARecordType ALL_FIELDS_TYPE = createType("");
+ //Default open record type when requesting none of the fields
+ public static final ARecordType EMPTY_TYPE = createType("{}");
+
+ private final ARecordType root;
+ private final Map<String, FunctionCallInformation> functionCallInfoMap;
+
+ public DataProjectionInfo(ARecordType root, Map<String, FunctionCallInformation> sourceInformationMap) {
+ this.root = root;
+ this.functionCallInfoMap = sourceInformationMap;
+ }
+
+ private DataProjectionInfo(DataProjectionInfo other) {
+ if (other.root == ALL_FIELDS_TYPE) {
+ root = ALL_FIELDS_TYPE;
+ } else if (other.root == EMPTY_TYPE) {
+ root = EMPTY_TYPE;
+ } else {
+ root = other.root.deepCopy(other.root);
+ }
+ functionCallInfoMap = new HashMap<>(other.functionCallInfoMap);
+ }
+
+ @Override
+ public ARecordType getProjectionInfo() {
+ return root;
+ }
+
+ @Override
+ public DataProjectionInfo createCopy() {
+ return new DataProjectionInfo(this);
+ }
+
+ public Map<String, FunctionCallInformation> getFunctionCallInfoMap() {
+ return functionCallInfoMap;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DataProjectionInfo otherInfo = (DataProjectionInfo) o;
+ return root.deepEqual(otherInfo.root) && Objects.equals(functionCallInfoMap, otherInfo.functionCallInfoMap);
+ }
+
+ @Override
+ public String toString() {
+ if (root == ALL_FIELDS_TYPE || root == EMPTY_TYPE) {
+ //Return the type name if all fields or empty types
+ return root.getTypeName();
+ }
+ //Return a oneliner JSON like representation for the requested fields
+ StringBuilder builder = new StringBuilder();
+ SimpleStringBuilderForIATypeVisitor visitor = new SimpleStringBuilderForIATypeVisitor();
+ root.accept(visitor, builder);
+ return builder.toString();
+ }
+
+ /**
+ * Serialize expected record type
+ *
+ * @param expectedRecordType expected record type
+ * @param output data output
+ */
+ public static void writeTypeField(ARecordType expectedRecordType, DataOutput output) throws IOException {
+ byte[] recordTypeBytes = SerializationUtils.serialize(expectedRecordType);
+ output.writeInt(recordTypeBytes.length);
+ output.write(recordTypeBytes);
+ }
+
+ /**
+ * Deserialize expected record type
+ *
+ * @param input data input
+ * @return deserialized expected record type
+ */
+ public static ARecordType createTypeField(DataInput input) throws IOException {
+ int length = input.readInt();
+ byte[] recordTypeBytes = new byte[length];
+ input.readFully(recordTypeBytes, 0, length);
+ return SerializationUtils.deserialize(recordTypeBytes);
+ }
+
+ /**
+ * Serialize function call information map
+ *
+ * @param functionCallInfoMap function information map
+ * @param output data output
+ */
+ public static void writeFunctionCallInformationMapField(Map<String, FunctionCallInformation> functionCallInfoMap,
+ DataOutput output) throws IOException {
+ output.writeInt(functionCallInfoMap.size());
+ for (Map.Entry<String, FunctionCallInformation> info : functionCallInfoMap.entrySet()) {
+ output.writeUTF(info.getKey());
+ info.getValue().writeFields(output);
+ }
+ }
+
+ /**
+ * Deserialize function call information map
+ *
+ * @param input data input
+ * @return deserialized function call information map
+ */
+ public static Map<String, FunctionCallInformation> createFunctionCallInformationMap(DataInput input)
+ throws IOException {
+ int size = input.readInt();
+ Map<String, FunctionCallInformation> functionCallInfoMap = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ String key = input.readUTF();
+ FunctionCallInformation functionCallInfo = FunctionCallInformation.create(input);
+ functionCallInfoMap.put(key, functionCallInfo);
+ }
+ return functionCallInfoMap;
+ }
+
+ private static ARecordType createType(String typeName) {
+ return new ARecordType(typeName, new String[] {}, new IAType[] {}, true);
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/FunctionCallInformation.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/FunctionCallInformation.java
new file mode 100644
index 0000000..5cb26fd
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/FunctionCallInformation.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.projection;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.om.exceptions.ExceptionUtil;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+
+/**
+ * Function call information that holds {@link FunctionIdentifier#getName()} and {@link SourceLocation}
+ */
+public class FunctionCallInformation implements Serializable {
+ private static final long serialVersionUID = -7884346933746232736L;
+ private final String functionName;
+ private final SourceLocation sourceLocation;
+ private Set<ATypeTag> typeMismatches;
+
+ public FunctionCallInformation(String functionName, SourceLocation sourceLocation) {
+ this(functionName, sourceLocation, Collections.emptySet());
+ }
+
+ private FunctionCallInformation(String functionName, SourceLocation sourceLocation, Set<ATypeTag> typeMismatches) {
+ this.functionName = functionName;
+ this.sourceLocation = sourceLocation;
+ this.typeMismatches = typeMismatches;
+ }
+
+ public String getFunctionName() {
+ return functionName;
+ }
+
+ public SourceLocation getSourceLocation() {
+ return sourceLocation;
+ }
+
+ public Warning createTypeMismatchWarning(ATypeTag expectedType, ATypeTag actualType) {
+ if (typeMismatches == null) {
+ typeMismatches = EnumSet.noneOf(ATypeTag.class);
+ } else if (typeMismatches.contains(actualType)) {
+ //We already issued a warning containing the same actual type. So, we ignore it
+ return null;
+ }
+ typeMismatches.add(actualType);
+ return Warning.of(getSourceLocation(), ErrorCode.TYPE_MISMATCH_FUNCTION, getFunctionName(),
+ ExceptionUtil.indexToPosition(0), expectedType, actualType);
+ }
+
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeUTF(functionName);
+ SourceLocation.writeFields(sourceLocation, output);
+ output.writeInt(typeMismatches.size());
+ for (ATypeTag typeTag : typeMismatches) {
+ output.write(typeTag.serialize());
+ }
+ }
+
+ public static FunctionCallInformation create(DataInput in) throws IOException {
+ String functionName = in.readUTF();
+ SourceLocation sourceLocation = SourceLocation.create(in);
+ int typeMismatchesLength = in.readInt();
+ Set<ATypeTag> typeMismatches = EnumSet.noneOf(ATypeTag.class);
+ for (int i = 0; i < typeMismatchesLength; i++) {
+ typeMismatches.add(ATypeTag.VALUE_TYPE_MAPPING[in.readByte()]);
+ }
+ return new FunctionCallInformation(functionName, sourceLocation, typeMismatches);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(functionName, sourceLocation);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FunctionCallInformation that = (FunctionCallInformation) o;
+ return Objects.equals(functionName, that.functionName) && Objects.equals(sourceLocation, that.sourceLocation);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionInfo.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionInfo.java
index 9de591e..3c1a24d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionInfo.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionInfo.java
@@ -18,17 +18,15 @@
*/
package org.apache.hyracks.algebricks.core.algebra.metadata;
-import java.util.List;
-
/**
* Generic interface to include the projection information for
* {@link org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator}
*/
public interface IProjectionInfo<T> {
/**
- * @return list of projected values' information
+ * @return projected values' information
*/
- List<T> getProjectionInfo();
+ T getProjectionInfo();
/**
* @return a copy of the {@link IProjectionInfo}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index 059a357..69db58d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -30,8 +30,11 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -179,4 +182,10 @@
return varSet.equals(varArgSet);
}
+ public static LogicalVariable getVariable(ILogicalExpression expr) {
+ if (expr != null && expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ return ((VariableReferenceExpression) expr).getVariableReference();
+ }
+ return null;
+ }
}