[ASTERIXDB-3229][COMP] Part 1: Introduce DefUseChain computer
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
To generalize the pushdown effort to include
dynamic prefixes, we need to have a general approach
for handling pushdowns (e.g., value-access pushdown,
range-filter pushdown, predicate pushdown). Each one
of those pushdowns requires certain information about
expressions like where they were defined and where they
were used. This patch introduces a way to collect such
information.
Change-Id: I985a779fe74a57cccbe1e625cec4ff69cda84924
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17655
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaBuilder.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaBuilder.java
index 36a77c9..edebed9 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaBuilder.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaBuilder.java
@@ -37,6 +37,7 @@
import org.apache.asterix.optimizer.rules.pushdown.schema.ObjectExpectedSchemaNode;
import org.apache.asterix.optimizer.rules.pushdown.schema.RootExpectedSchemaNode;
import org.apache.asterix.optimizer.rules.pushdown.schema.UnionExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.visitor.ExpectedSchemaNodeToIATypeTranslatorVisitor;
import org.apache.asterix.runtime.projection.DataProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
import org.apache.commons.lang3.mutable.Mutable;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
new file mode 100644
index 0000000..878d4b1
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.DefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.UseDescriptor;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class PushdownContext {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final Set<LogicalOperatorTag> SCOPE_OPERATORS = getScopeOperators();
+ private final List<ScanDefineDescriptor> registeredScans;
+ // For debugging purposes only
+ private final Map<ILogicalExpression, DefineDescriptor> definedVariable;
+ private final Map<LogicalVariable, DefineDescriptor> defineChain;
+ private final Map<LogicalVariable, List<UseDescriptor>> useChain;
+ private final List<ILogicalOperator> scopes;
+ private final Map<ILogicalOperator, ILogicalExpression> inlinedCache;
+
+ public PushdownContext() {
+ registeredScans = new ArrayList<>();
+ this.definedVariable = new HashMap<>();
+ this.defineChain = new HashMap<>();
+ this.useChain = new HashMap<>();
+ scopes = new ArrayList<>();
+ inlinedCache = new HashMap<>();
+ }
+
+ public void enterScope(ILogicalOperator operator) {
+ if (SCOPE_OPERATORS.contains(operator.getOperatorTag())) {
+ scopes.add(operator);
+ }
+ }
+
+ public void registerScan(Dataset dataset, List<LogicalVariable> pkList, LogicalVariable recordVariable,
+ LogicalVariable metaVariable, AbstractScanOperator scanOperator) {
+ ScanDefineDescriptor scanDefDesc =
+ new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable, scanOperator);
+ defineChain.put(recordVariable, scanDefDesc);
+ useChain.put(recordVariable, new ArrayList<>());
+ if (metaVariable != null) {
+ defineChain.put(metaVariable, scanDefDesc);
+ useChain.put(metaVariable, new ArrayList<>());
+ }
+ for (LogicalVariable pkVar : pkList) {
+ defineChain.put(pkVar, scanDefDesc);
+ useChain.put(pkVar, new ArrayList<>());
+ }
+ registeredScans.add(scanDefDesc);
+ }
+
+ public void define(LogicalVariable variable, ILogicalOperator operator, ILogicalExpression expression,
+ int expressionIndex) {
+ if (defineChain.containsKey(variable)) {
+ LOGGER.warn("Variable {} declared twice", variable);
+ return;
+ } else if (definedVariable.containsKey(expression)) {
+ DefineDescriptor defineDescriptor = definedVariable.get(expression);
+ /*
+ * Currently, we know that scan-collection of some constant array can appear multiple times as REPLICATE
+ * wasn't fired yet to remove common operators. However, this log is to track any issue could occur due to
+ * having redundant expressions declared in different operators.
+ */
+ LOGGER.debug("Expression {} is redundant. It was seen at {}", expression, defineDescriptor.toString());
+ }
+
+ int scope = scopes.size();
+ DefineDescriptor defineDescriptor =
+ new DefineDescriptor(scope, variable, operator, expression, expressionIndex);
+ definedVariable.put(expression, defineDescriptor);
+ defineChain.put(variable, defineDescriptor);
+ useChain.put(variable, new ArrayList<>());
+ }
+
+ public void use(ILogicalOperator operator, ILogicalExpression expression, int expressionIndex,
+ LogicalVariable producedVariable) {
+ int scope = scopes.size();
+ UseDescriptor useDescriptor = new UseDescriptor(scope, operator, expression, expressionIndex, producedVariable);
+ Set<LogicalVariable> usedVariables = useDescriptor.getUsedVariables();
+ expression.getUsedVariables(usedVariables);
+ for (LogicalVariable variable : usedVariables) {
+ DefineDescriptor defineDescriptor = defineChain.get(variable);
+ if (defineDescriptor == null) {
+ // Log to track any definition that we may have missed
+ LOGGER.warn("Variable {} is not defined", variable);
+ return;
+ }
+
+ List<UseDescriptor> uses = useChain.get(variable);
+ uses.add(useDescriptor);
+ }
+ }
+
+ public DefineDescriptor getDefineDescriptor(LogicalVariable variable) {
+ return defineChain.get(variable);
+ }
+
+ public DefineDescriptor getDefineDescriptor(UseDescriptor useDescriptor) {
+ LogicalVariable producedVariable = useDescriptor.getProducedVariable();
+ if (producedVariable == null) {
+ return null;
+ }
+ return getDefineDescriptor(producedVariable);
+ }
+
+ public List<UseDescriptor> getUseDescriptors(DefineDescriptor defineDescriptor) {
+ return useChain.get(defineDescriptor.getVariable());
+ }
+
+ public List<ScanDefineDescriptor> getRegisteredScans() {
+ return registeredScans;
+ }
+
+ public ILogicalExpression cloneAndInlineExpression(UseDescriptor useDescriptor) throws CompilationException {
+ ILogicalOperator op = useDescriptor.getOperator();
+ ILogicalExpression inlinedExpr = inlinedCache.get(op);
+ if (inlinedExpr == null) {
+ inlinedExpr = cloneAndInline(useDescriptor.getExpression());
+ inlinedCache.put(op, inlinedExpr);
+ }
+
+ // Clone the cached expression as a processor may change it
+ return inlinedExpr.cloneExpression();
+ }
+
+ private ILogicalExpression cloneAndInline(ILogicalExpression expression) throws CompilationException {
+ switch (expression.getExpressionTag()) {
+ case CONSTANT:
+ return expression;
+ case FUNCTION_CALL:
+ return cloneAndInlineFunction(expression);
+ case VARIABLE:
+ LogicalVariable variable = ((VariableReferenceExpression) expression).getVariableReference();
+ DefineDescriptor defineDescriptor = defineChain.get(variable);
+ if (defineDescriptor.isScanDefinition()) {
+ // Reached the recordVariable
+ return expression;
+ }
+ return cloneAndInline(defineDescriptor.getExpression());
+ default:
+ throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, expression.getSourceLocation());
+ }
+ }
+
+ private ILogicalExpression cloneAndInlineFunction(ILogicalExpression expression) throws CompilationException {
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expression.cloneExpression();
+ for (Mutable<ILogicalExpression> arg : funcExpr.getArguments()) {
+ arg.setValue(cloneAndInline(arg.getValue()));
+ }
+ return funcExpr;
+ }
+
+ private static Set<LogicalOperatorTag> getScopeOperators() {
+ return EnumSet.of(LogicalOperatorTag.INNERJOIN, LogicalOperatorTag.LEFTOUTERJOIN, LogicalOperatorTag.GROUP,
+ LogicalOperatorTag.AGGREGATE, LogicalOperatorTag.WINDOW);
+ }
+
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/AbstractDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/AbstractDescriptor.java
new file mode 100644
index 0000000..601b7d5
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/AbstractDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.descriptor;
+
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+
+class AbstractDescriptor {
+ protected final int scope;
+ protected final ILogicalOperator operator;
+ protected final ILogicalExpression expression;
+ protected final int expressionIndex;
+
+ public AbstractDescriptor(int scope, ILogicalOperator operator, ILogicalExpression expression,
+ int expressionIndex) {
+ this.scope = scope;
+ this.operator = operator;
+ this.expression = expression;
+ this.expressionIndex = expressionIndex;
+ }
+
+ public ILogicalOperator getOperator() {
+ return operator;
+ }
+
+ public ILogicalExpression getExpression() {
+ return expression;
+ }
+
+ public int getExpressionIndex() {
+ return expressionIndex;
+ }
+
+ public int getScope() {
+ return scope;
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/DefineDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/DefineDescriptor.java
new file mode 100644
index 0000000..e263e7f
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/DefineDescriptor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.descriptor;
+
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+
+public class DefineDescriptor extends AbstractDescriptor {
+ private final LogicalVariable variable;
+
+ public DefineDescriptor(int scope, LogicalVariable variable, ILogicalOperator operator,
+ ILogicalExpression expression, int expressionIndex) {
+ super(scope, operator, expression, expressionIndex);
+ this.variable = variable;
+ }
+
+ public LogicalVariable getVariable() {
+ return variable;
+ }
+
+ public boolean isScanDefinition() {
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return operator.getOperatorTag() + ": [" + variable + "<-" + expression + "]";
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ScanDefineDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ScanDefineDescriptor.java
new file mode 100644
index 0000000..a8246cb
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ScanDefineDescriptor.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.descriptor;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.visitor.SimpleStringBuilderForIATypeVisitor;
+import org.apache.asterix.optimizer.rules.pushdown.schema.RootExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.visitor.ExpectedSchemaNodeToIATypeTranslatorVisitor;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
+
+public class ScanDefineDescriptor extends DefineDescriptor {
+ private final Dataset dataset;
+ private final List<LogicalVariable> primaryKeyVariables;
+ private final LogicalVariable metaRecordVariable;
+ private final Map<ILogicalExpression, ARecordType> paths;
+ private final Map<String, FunctionCallInformation> pathLocations;
+ private RootExpectedSchemaNode recordNode;
+ private RootExpectedSchemaNode metaNode;
+ private ILogicalExpression filterExpression;
+ private ILogicalExpression rangeFilterExpression;
+
+ public ScanDefineDescriptor(int scope, Dataset dataset, List<LogicalVariable> primaryKeyVariables,
+ LogicalVariable recordVariable, LogicalVariable metaRecordVariable, ILogicalOperator operator) {
+ super(scope, recordVariable, operator, null, -1);
+ this.primaryKeyVariables = primaryKeyVariables;
+ this.metaRecordVariable = metaRecordVariable;
+ this.dataset = dataset;
+ paths = new HashMap<>();
+ pathLocations = new HashMap<>();
+
+ recordNode = RootExpectedSchemaNode.ALL_FIELDS_ROOT_NODE;
+ if (hasMeta()) {
+ metaNode = RootExpectedSchemaNode.ALL_FIELDS_ROOT_NODE;
+ }
+ }
+
+ @Override
+ public boolean isScanDefinition() {
+ return true;
+ }
+
+ public Dataset getDataset() {
+ return dataset;
+ }
+
+ public List<LogicalVariable> getPrimaryKeyVariables() {
+ return primaryKeyVariables;
+ }
+
+ public boolean hasMeta() {
+ return metaRecordVariable != null;
+ }
+
+ public LogicalVariable getMetaRecordVariable() {
+ return metaRecordVariable;
+ }
+
+ public void setRecordNode(RootExpectedSchemaNode recordNode) {
+ this.recordNode = recordNode;
+ }
+
+ public RootExpectedSchemaNode getRecordNode() {
+ return recordNode;
+ }
+
+ public void setMetaNode(RootExpectedSchemaNode metaNode) {
+ this.metaNode = metaNode;
+ }
+
+ public RootExpectedSchemaNode getMetaNode() {
+ return metaNode;
+ }
+
+ public Map<ILogicalExpression, ARecordType> getFilterPaths() {
+ return paths;
+ }
+
+ public Map<String, FunctionCallInformation> getPathLocations() {
+ return pathLocations;
+ }
+
+ public void setFilterExpression(ILogicalExpression expression) {
+ this.filterExpression = expression;
+ }
+
+ public ILogicalExpression getFilterExpression() {
+ return filterExpression;
+ }
+
+ public void setRangeFilterExpression(ILogicalExpression rangeFilterExpression) {
+ this.rangeFilterExpression = rangeFilterExpression;
+ }
+
+ public ILogicalExpression getRangeFilterExpression() {
+ return rangeFilterExpression;
+ }
+
+ @Override
+ public String toString() {
+ ExpectedSchemaNodeToIATypeTranslatorVisitor converter =
+ new ExpectedSchemaNodeToIATypeTranslatorVisitor(new HashMap<>());
+ SimpleStringBuilderForIATypeVisitor typeStringVisitor = new SimpleStringBuilderForIATypeVisitor();
+ StringBuilder builder = new StringBuilder();
+
+ AbstractScanOperator scanOp = (AbstractScanOperator) operator;
+ builder.append("[record: ");
+ builder.append(getVariable());
+ if (hasMeta()) {
+ builder.append(", meta: ");
+ builder.append(metaRecordVariable);
+ }
+ builder.append("] <- ");
+ builder.append(scanOp.getOperatorTag());
+
+ builder.append('\n');
+ boolean fieldAccessPushdown = DatasetUtil.isFieldAccessPushdownSupported(dataset);
+ if (fieldAccessPushdown && !recordNode.isAllFields()) {
+ builder.append("project: ");
+ ARecordType recordType = (ARecordType) recordNode.accept(converter, "root");
+ recordType.accept(typeStringVisitor, builder);
+
+ if (hasMeta()) {
+ builder.append(" project-meta: ");
+ ARecordType metaType = (ARecordType) metaNode.accept(converter, "meta");
+ metaType.accept(typeStringVisitor, builder);
+ }
+ }
+
+ builder.append('\n');
+ if (filterExpression != null) {
+ builder.append("filter: ");
+ builder.append(filterExpression);
+ }
+
+ builder.append('\n');
+ if (rangeFilterExpression != null) {
+ builder.append("range-filter: ");
+ builder.append(rangeFilterExpression);
+ }
+
+ return builder.toString();
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/UseDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/UseDescriptor.java
new file mode 100644
index 0000000..7502013
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/UseDescriptor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.descriptor;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+
+public class UseDescriptor extends AbstractDescriptor {
+ private final Set<LogicalVariable> usedVariables;
+ private final LogicalVariable producedVariable;
+
+ public UseDescriptor(int scope, ILogicalOperator operator, ILogicalExpression expression, int expressionIndex,
+ LogicalVariable producedVariable) {
+ super(scope, operator, expression, expressionIndex);
+ this.usedVariables = new HashSet<>();
+ this.producedVariable = producedVariable;
+ }
+
+ public Set<LogicalVariable> getUsedVariables() {
+ return usedVariables;
+ }
+
+ public LogicalVariable getProducedVariable() {
+ return producedVariable;
+ }
+
+ @Override
+ public String toString() {
+ return operator.getOperatorTag() + ": [" + expression + "]";
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/DefUseChainComputerVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/DefUseChainComputerVisitor.java
new file mode 100644
index 0000000..5fd8f1e
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/DefUseChainComputerVisitor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.visitor;
+
+import java.util.List;
+
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+public class DefUseChainComputerVisitor implements ILogicalExpressionReferenceTransform {
+ private final PushdownContext pushdownContext;
+ private ILogicalOperator operator;
+ private List<LogicalVariable> producedVariables;
+ private int expressionIndex;
+
+ public DefUseChainComputerVisitor(PushdownContext pushdownContext) {
+ this.pushdownContext = pushdownContext;
+ }
+
+ public void init(ILogicalOperator operator, List<LogicalVariable> producedVariables) {
+ this.operator = operator;
+ this.producedVariables = producedVariables;
+ expressionIndex = 0;
+ }
+
+ @Override
+ public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+ ILogicalExpression expression = exprRef.getValue();
+ // compute use chain first
+ LogicalVariable producedVariable = producedVariables != null ? producedVariables.get(expressionIndex) : null;
+ pushdownContext.use(operator, expression, expressionIndex, producedVariable);
+ if (producedVariable != null) {
+ // then define the produced variable
+ pushdownContext.define(producedVariable, operator, expression, expressionIndex);
+ }
+ expressionIndex++;
+ return false;
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaNodeToIATypeTranslatorVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
similarity index 98%
rename from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
rename to asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
index 32e4b18..825db1e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.optimizer.rules.pushdown;
+package org.apache.asterix.optimizer.rules.pushdown.visitor;
import java.util.ArrayList;
import java.util.List;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java
new file mode 100644
index 0000000..65b131a
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.visitor;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.common.config.DatasetConfig.DatasetFormat;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.schema.RootExpectedSchemaNode;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * This visitor visits the entire plan and tries to build the information of the required values from all dataset
+ */
+public class PushdownOperatorVisitor implements ILogicalOperatorVisitor<Void, Void> {
+ private static final List<LogicalVariable> EMPTY_VARIABLES = Collections.emptyList();
+ private final PushdownContext pushdownContext;
+ private final IOptimizationContext context;
+ private final DefUseChainComputerVisitor defUseComputer;
+ private final Set<ILogicalOperator> visitedOperators;
+
+ public PushdownOperatorVisitor(PushdownContext pushdownContext, IOptimizationContext context) {
+ this.pushdownContext = pushdownContext;
+ this.context = context;
+ defUseComputer = new DefUseChainComputerVisitor(pushdownContext);
+ visitedOperators = new HashSet<>();
+ }
+
+ /**
+ * Visit every input of an operator. Then, start pushdown any value expression that the operator has
+ *
+ * @param op the operator to process
+ * @param producedVariables any produced variables by the operator. We only care about the {@link AssignOperator}
+ * and {@link UnnestOperator} variables for now.
+ */
+ private void visitInputs(ILogicalOperator op, List<LogicalVariable> producedVariables) throws AlgebricksException {
+ if (visitedOperators.contains(op)) {
+ return;
+ }
+ for (Mutable<ILogicalOperator> child : op.getInputs()) {
+ child.getValue().accept(this, null);
+ }
+ visitedOperators.add(op);
+ // Enter scope for (new stage) for operators like GROUP and JOIN
+ pushdownContext.enterScope(op);
+ defUseComputer.init(op, producedVariables);
+ op.acceptExpressionTransform(defUseComputer);
+ }
+
+ /*
+ * ******************************************************************************
+ * Operators that need to handle special cases
+ * ******************************************************************************
+ */
+
+ @Override
+ public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ //Set as empty records for data-scan or unnest-map if certain variables are projected out
+ setEmptyRecord(op.getInputs().get(0).getValue(), op.getVariables());
+ return null;
+ }
+
+ /**
+ * From the {@link DataSourceScanOperator}, we need to register the payload variable (record variable) to check
+ * which expression in the plan is using it.
+ */
+ @Override
+ public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+ DatasetDataSource datasetDataSource = getDatasetDataSourceIfApplicable((DataSource) op.getDataSource());
+ registerDatasetIfApplicable(datasetDataSource, op);
+ visitInputs(op);
+ return null;
+ }
+
+ /**
+ * From the {@link UnnestMapOperator}, we need to register the payload variable (record variable) to check
+ * which expression in the plan is using it.
+ */
+ @Override
+ public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ DatasetDataSource datasetDataSource = getDatasetDataSourceIfApplicable(getDataSourceFromUnnestMapOperator(op));
+ registerDatasetIfApplicable(datasetDataSource, op);
+ return null;
+ }
+
+ @Override
+ public Void visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op, op.getVariables());
+ if (!op.isGlobal() && isCountConstant(op.getExpressions())) {
+ /*
+ * Optimize the SELECT COUNT(*) case
+ * It is local aggregate and has agg-sql-count function with a constant argument. Set empty record if the
+ * input operator is DataSourceScanOperator
+ */
+ setEmptyRecord(op.getInputs().get(0).getValue(), EMPTY_VARIABLES);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op, op.getVariables());
+ return null;
+ }
+
+ /*
+ * ******************************************************************************
+ * Helper methods
+ * ******************************************************************************
+ */
+
+ /**
+ * The role of this method is:
+ * 1- Check whether the datasource allows value access pushdowns
+ * 2- return the actual DatasetDataSource
+ */
+ private DatasetDataSource getDatasetDataSourceIfApplicable(DataSource dataSource) throws AlgebricksException {
+ if (dataSource == null || dataSource.getDatasourceType() == DataSource.Type.SAMPLE) {
+ return null;
+ }
+
+ Dataset dataset = getDataset(dataSource);
+ //Only external dataset can have pushed down expressions
+ if (dataset.getDatasetType() == DatasetConfig.DatasetType.EXTERNAL
+ && !ExternalDataUtils
+ .supportsPushdown(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties())
+ || dataset.getDatasetType() == DatasetConfig.DatasetType.INTERNAL
+ && dataset.getDatasetFormatInfo().getFormat() == DatasetFormat.ROW) {
+ return null;
+ }
+
+ return (DatasetDataSource) dataSource;
+ }
+
+ private Dataset getDataset(DataSource dataSource) throws AlgebricksException {
+ MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
+ DataverseName dataverse = dataSource.getId().getDataverseName();
+ String datasetName = dataSource.getId().getDatasourceName();
+ return mp.findDataset(dataverse, datasetName);
+ }
+
+ /**
+ * Find datasource from {@link UnnestMapOperator}
+ *
+ * @param unnest unnest map operator
+ * @return datasource
+ */
+ private DataSource getDataSourceFromUnnestMapOperator(UnnestMapOperator unnest) throws AlgebricksException {
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) unnest.getExpressionRef().getValue();
+ String dataverse = ConstantExpressionUtil.getStringArgument(funcExpr, 2);
+ String dataset = ConstantExpressionUtil.getStringArgument(funcExpr, 3);
+ if (!ConstantExpressionUtil.getStringArgument(funcExpr, 0).equals(dataset)) {
+ return null;
+ }
+
+ DataSourceId dsid = new DataSourceId(DataverseName.createFromCanonicalForm(dataverse), dataset);
+ MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+ return metadataProvider.findDataSource(dsid);
+ }
+
+ private void registerDatasetIfApplicable(DatasetDataSource datasetDataSource, AbstractScanOperator op)
+ throws AlgebricksException {
+ if (datasetDataSource != null) {
+ Dataset dataset = getDataset(datasetDataSource);
+ List<LogicalVariable> primaryKeyVariables = datasetDataSource.getPrimaryKeyVariables(op.getVariables());
+ LogicalVariable recordVar = datasetDataSource.getDataRecordVariable(op.getVariables());
+ LogicalVariable metaVar = datasetDataSource.getMetaVariable(op.getVariables());
+ pushdownContext.registerScan(dataset, primaryKeyVariables, recordVar, metaVar, op);
+ }
+ }
+
+ /**
+ * If the inputOp is a {@link DataSourceScanOperator} or {@link UnnestMapOperator}, then set the projected value
+ * needed as empty record if any variable originated from either operators are not in {@code retainedVariables}
+ *
+ * @param inputOp an operator that is potentially a {@link DataSourceScanOperator} or a {@link
+ * UnnestMapOperator}
+ * @param retainedVariables variables that should be retained
+ * @see #visitAggregateOperator(AggregateOperator, Void)
+ * @see #visitProjectOperator(ProjectOperator, Void)
+ */
+ private void setEmptyRecord(ILogicalOperator inputOp, List<LogicalVariable> retainedVariables)
+ throws AlgebricksException {
+ LogicalOperatorTag tag = inputOp.getOperatorTag();
+ if (tag != LogicalOperatorTag.DATASOURCESCAN && tag != LogicalOperatorTag.UNNEST_MAP) {
+ return;
+ }
+
+ DataSource dataSource;
+ List<LogicalVariable> variables;
+ Mutable<ILogicalExpression> selectCondition;
+ if (inputOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+ DataSourceScanOperator scan = (DataSourceScanOperator) inputOp;
+ dataSource = (DataSource) scan.getDataSource();
+ variables = scan.getVariables();
+ selectCondition = scan.getSelectCondition();
+ } else {
+ UnnestMapOperator unnest = (UnnestMapOperator) inputOp;
+ dataSource = getDataSourceFromUnnestMapOperator(unnest);
+ variables = unnest.getVariables();
+ selectCondition = unnest.getSelectCondition();
+ }
+
+ DatasetDataSource datasetDataSource = getDatasetDataSourceIfApplicable(dataSource);
+
+ if (datasetDataSource == null) {
+ //Does not support pushdown
+ return;
+ }
+
+ Set<LogicalVariable> selectConditionVariables = new HashSet<>();
+ if (selectCondition != null) {
+ //Get the used variables for a select condition
+ selectCondition.getValue().getUsedVariables(selectConditionVariables);
+ }
+
+ //We know that we only need the count of objects. So return empty objects only
+ LogicalVariable recordVar = datasetDataSource.getDataRecordVariable(variables);
+ ScanDefineDescriptor scanDefDesc = (ScanDefineDescriptor) pushdownContext.getDefineDescriptor(recordVar);
+
+ /*
+ * If the recordVar is not retained by an upper operator and not used by a select condition, then return empty
+ * record instead of the entire record.
+ */
+ if (!retainedVariables.contains(recordVar) && !selectConditionVariables.contains(recordVar)) {
+ /*
+ * Set the root node as EMPTY_ROOT_NODE (i.e., no fields will be read from disk). We register the
+ * dataset with EMPTY_ROOT_NODE so that we skip pushdowns on empty node.
+ */
+ scanDefDesc.setRecordNode(RootExpectedSchemaNode.EMPTY_ROOT_NODE);
+ }
+
+ if (scanDefDesc.hasMeta()) {
+ //Do the same for meta
+ if (!retainedVariables.contains(scanDefDesc.getMetaRecordVariable())) {
+ scanDefDesc.setMetaNode(RootExpectedSchemaNode.EMPTY_ROOT_NODE);
+ }
+ }
+ }
+
+ private boolean isCountConstant(List<Mutable<ILogicalExpression>> expressions) {
+ if (expressions.size() != 1) {
+ return false;
+ }
+ ILogicalExpression expression = expressions.get(0).getValue();
+ if (expression.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ return false;
+ }
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expression;
+ FunctionIdentifier fid = funcExpr.getFunctionIdentifier();
+ return BuiltinFunctions.SQL_COUNT.equals(fid)
+ && funcExpr.getArguments().get(0).getValue().getExpressionTag() == LogicalExpressionTag.CONSTANT;
+ }
+
+ private void visitSubplans(List<ILogicalPlan> nestedPlans) throws AlgebricksException {
+ for (ILogicalPlan plan : nestedPlans) {
+ for (Mutable<ILogicalOperator> root : plan.getRoots()) {
+ root.getValue().accept(this, null);
+ }
+ }
+ }
+
+ /*
+ * ******************************************************************************
+ * Pushdown when possible for each operator
+ * ******************************************************************************
+ */
+
+ @Override
+ public Void visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ visitSubplans(op.getNestedPlans());
+ return null;
+ }
+
+ @Override
+ public Void visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op, op.getVariables());
+ return null;
+ }
+
+ @Override
+ public Void visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op, op.getVariables());
+ return null;
+ }
+
+ @Override
+ public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op, op.getVariables());
+ visitSubplans(op.getNestedPlans());
+ return null;
+ }
+
+ @Override
+ public Void visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
+ throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ return null;
+ }
+
+ @Override
+ public Void visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
+ visitInputs(op);
+ visitSubplans(op.getNestedPlans());
+ return null;
+ }
+
+ private void visitInputs(ILogicalOperator op) throws AlgebricksException {
+ visitInputs(op, null);
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 277602d..06e949f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -44,6 +44,7 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
@@ -54,6 +55,7 @@
import org.apache.asterix.metadata.entities.CompactionPolicy;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.metadata.entities.NodeGroup;
@@ -682,4 +684,28 @@
public static boolean isNotView(Dataset dataset) {
return dataset.getDatasetType() != DatasetType.VIEW;
}
+
+ public static boolean isFieldAccessPushdownSupported(Dataset dataset) {
+ DatasetType datasetType = dataset.getDatasetType();
+ if (datasetType == DatasetType.INTERNAL) {
+ return dataset.getDatasetFormatInfo().getFormat() == DatasetConfig.DatasetFormat.COLUMN;
+ } else if (datasetType == DatasetType.EXTERNAL) {
+ ExternalDatasetDetails edd = (ExternalDatasetDetails) dataset.getDatasetDetails();
+ return ExternalDataUtils.supportsPushdown(edd.getProperties());
+ }
+ return false;
+ }
+
+ public static boolean isFilterPushdownSupported(Dataset dataset) {
+ if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+ return dataset.getDatasetFormatInfo().getFormat() == DatasetConfig.DatasetFormat.COLUMN;
+ }
+ // TODO add external dataset with dynamic prefixes
+ return false;
+ }
+
+ public static boolean isRangeFilterPushdownSupported(Dataset dataset) {
+ return dataset.getDatasetType() == DatasetType.INTERNAL
+ && dataset.getDatasetFormatInfo().getFormat() == DatasetConfig.DatasetFormat.COLUMN;
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ProjectionFiltrationUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ProjectionFiltrationUtil.java
new file mode 100644
index 0000000..6f2d5a4
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ProjectionFiltrationUtil.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.projection;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+
+public class ProjectionFiltrationUtil {
+ //Default open record type when requesting the entire fields
+ public static final ARecordType ALL_FIELDS_TYPE = createType("");
+ //Default open record type when requesting none of the fields
+ public static final ARecordType EMPTY_TYPE = createType("{}");
+
+ private ProjectionFiltrationUtil() {
+ }
+
+ private static ARecordType createType(String typeName) {
+ return new ARecordType(typeName, new String[] {}, new IAType[] {}, true);
+ }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
index 9553913..ced6371 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
@@ -89,6 +89,9 @@
return gByList;
}
+ /**
+ * @return returns the variables of the group-by keys
+ */
public List<LogicalVariable> getGroupByVarList() {
List<LogicalVariable> varList = new ArrayList<>(gByList.size());
for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : gByList) {
@@ -132,6 +135,18 @@
}
}
+ /**
+ * @return all variables produced by the group-by operator (for group-by-list followed by decor-list)
+ * Note: the list may contain null values -- as some decor expressions may not be assigned to variables
+ * @see #getProducedVariablesExceptNestedPlans(Collection) to get a collection of all variables without nulls
+ */
+ public List<LogicalVariable> getVariables() {
+ List<LogicalVariable> variables = new ArrayList<>(gByList.size() + decorList.size());
+ gByList.stream().map(Pair::getFirst).forEach(variables::add);
+ decorList.stream().map(Pair::getFirst).forEach(variables::add);
+ return variables;
+ }
+
@Override
public void getProducedVariablesExceptNestedPlans(Collection<LogicalVariable> vars) {
for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {