[ASTERIXDB-3229][COMP] Part 1: Introduce DefUseChain computer

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
To generalize the pushdown effort to include
dynamic prefixes, we need to have a general approach
for handling pushdowns (e.g., value-access pushdown,
range-filter pushdown, predicate pushdown). Each one
of those pushdowns requires certain information about
expressions like where they were defined and where they
were used. This patch introduces a way to collect such
information.

Change-Id: I985a779fe74a57cccbe1e625cec4ff69cda84924
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17655
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaBuilder.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaBuilder.java
index 36a77c9..edebed9 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaBuilder.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaBuilder.java
@@ -37,6 +37,7 @@
 import org.apache.asterix.optimizer.rules.pushdown.schema.ObjectExpectedSchemaNode;
 import org.apache.asterix.optimizer.rules.pushdown.schema.RootExpectedSchemaNode;
 import org.apache.asterix.optimizer.rules.pushdown.schema.UnionExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.visitor.ExpectedSchemaNodeToIATypeTranslatorVisitor;
 import org.apache.asterix.runtime.projection.DataProjectionFiltrationInfo;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
 import org.apache.commons.lang3.mutable.Mutable;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
new file mode 100644
index 0000000..878d4b1
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.DefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.UseDescriptor;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class PushdownContext {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final Set<LogicalOperatorTag> SCOPE_OPERATORS = getScopeOperators();
+    private final List<ScanDefineDescriptor> registeredScans;
+    // For debugging purposes only
+    private final Map<ILogicalExpression, DefineDescriptor> definedVariable;
+    private final Map<LogicalVariable, DefineDescriptor> defineChain;
+    private final Map<LogicalVariable, List<UseDescriptor>> useChain;
+    private final List<ILogicalOperator> scopes;
+    private final Map<ILogicalOperator, ILogicalExpression> inlinedCache;
+
+    public PushdownContext() {
+        registeredScans = new ArrayList<>();
+        this.definedVariable = new HashMap<>();
+        this.defineChain = new HashMap<>();
+        this.useChain = new HashMap<>();
+        scopes = new ArrayList<>();
+        inlinedCache = new HashMap<>();
+    }
+
+    public void enterScope(ILogicalOperator operator) {
+        if (SCOPE_OPERATORS.contains(operator.getOperatorTag())) {
+            scopes.add(operator);
+        }
+    }
+
+    public void registerScan(Dataset dataset, List<LogicalVariable> pkList, LogicalVariable recordVariable,
+            LogicalVariable metaVariable, AbstractScanOperator scanOperator) {
+        ScanDefineDescriptor scanDefDesc =
+                new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable, scanOperator);
+        defineChain.put(recordVariable, scanDefDesc);
+        useChain.put(recordVariable, new ArrayList<>());
+        if (metaVariable != null) {
+            defineChain.put(metaVariable, scanDefDesc);
+            useChain.put(metaVariable, new ArrayList<>());
+        }
+        for (LogicalVariable pkVar : pkList) {
+            defineChain.put(pkVar, scanDefDesc);
+            useChain.put(pkVar, new ArrayList<>());
+        }
+        registeredScans.add(scanDefDesc);
+    }
+
+    public void define(LogicalVariable variable, ILogicalOperator operator, ILogicalExpression expression,
+            int expressionIndex) {
+        if (defineChain.containsKey(variable)) {
+            LOGGER.warn("Variable {}  declared twice", variable);
+            return;
+        } else if (definedVariable.containsKey(expression)) {
+            DefineDescriptor defineDescriptor = definedVariable.get(expression);
+            /*
+             * Currently, we know that scan-collection of some constant array can appear multiple times as REPLICATE
+             * wasn't fired yet to remove common operators. However, this log is to track any issue could occur due to
+             * having redundant expressions declared in different operators.
+             */
+            LOGGER.debug("Expression {} is redundant. It was seen at {}", expression, defineDescriptor.toString());
+        }
+
+        int scope = scopes.size();
+        DefineDescriptor defineDescriptor =
+                new DefineDescriptor(scope, variable, operator, expression, expressionIndex);
+        definedVariable.put(expression, defineDescriptor);
+        defineChain.put(variable, defineDescriptor);
+        useChain.put(variable, new ArrayList<>());
+    }
+
+    public void use(ILogicalOperator operator, ILogicalExpression expression, int expressionIndex,
+            LogicalVariable producedVariable) {
+        int scope = scopes.size();
+        UseDescriptor useDescriptor = new UseDescriptor(scope, operator, expression, expressionIndex, producedVariable);
+        Set<LogicalVariable> usedVariables = useDescriptor.getUsedVariables();
+        expression.getUsedVariables(usedVariables);
+        for (LogicalVariable variable : usedVariables) {
+            DefineDescriptor defineDescriptor = defineChain.get(variable);
+            if (defineDescriptor == null) {
+                // Log to track any definition that we may have missed
+                LOGGER.warn("Variable {} is not defined", variable);
+                return;
+            }
+
+            List<UseDescriptor> uses = useChain.get(variable);
+            uses.add(useDescriptor);
+        }
+    }
+
+    public DefineDescriptor getDefineDescriptor(LogicalVariable variable) {
+        return defineChain.get(variable);
+    }
+
+    public DefineDescriptor getDefineDescriptor(UseDescriptor useDescriptor) {
+        LogicalVariable producedVariable = useDescriptor.getProducedVariable();
+        if (producedVariable == null) {
+            return null;
+        }
+        return getDefineDescriptor(producedVariable);
+    }
+
+    public List<UseDescriptor> getUseDescriptors(DefineDescriptor defineDescriptor) {
+        return useChain.get(defineDescriptor.getVariable());
+    }
+
+    public List<ScanDefineDescriptor> getRegisteredScans() {
+        return registeredScans;
+    }
+
+    public ILogicalExpression cloneAndInlineExpression(UseDescriptor useDescriptor) throws CompilationException {
+        ILogicalOperator op = useDescriptor.getOperator();
+        ILogicalExpression inlinedExpr = inlinedCache.get(op);
+        if (inlinedExpr == null) {
+            inlinedExpr = cloneAndInline(useDescriptor.getExpression());
+            inlinedCache.put(op, inlinedExpr);
+        }
+
+        // Clone the cached expression as a processor may change it
+        return inlinedExpr.cloneExpression();
+    }
+
+    private ILogicalExpression cloneAndInline(ILogicalExpression expression) throws CompilationException {
+        switch (expression.getExpressionTag()) {
+            case CONSTANT:
+                return expression;
+            case FUNCTION_CALL:
+                return cloneAndInlineFunction(expression);
+            case VARIABLE:
+                LogicalVariable variable = ((VariableReferenceExpression) expression).getVariableReference();
+                DefineDescriptor defineDescriptor = defineChain.get(variable);
+                if (defineDescriptor.isScanDefinition()) {
+                    // Reached the recordVariable
+                    return expression;
+                }
+                return cloneAndInline(defineDescriptor.getExpression());
+            default:
+                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, expression.getSourceLocation());
+        }
+    }
+
+    private ILogicalExpression cloneAndInlineFunction(ILogicalExpression expression) throws CompilationException {
+        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expression.cloneExpression();
+        for (Mutable<ILogicalExpression> arg : funcExpr.getArguments()) {
+            arg.setValue(cloneAndInline(arg.getValue()));
+        }
+        return funcExpr;
+    }
+
+    private static Set<LogicalOperatorTag> getScopeOperators() {
+        return EnumSet.of(LogicalOperatorTag.INNERJOIN, LogicalOperatorTag.LEFTOUTERJOIN, LogicalOperatorTag.GROUP,
+                LogicalOperatorTag.AGGREGATE, LogicalOperatorTag.WINDOW);
+    }
+
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/AbstractDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/AbstractDescriptor.java
new file mode 100644
index 0000000..601b7d5
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/AbstractDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.descriptor;
+
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+
+class AbstractDescriptor {
+    protected final int scope;
+    protected final ILogicalOperator operator;
+    protected final ILogicalExpression expression;
+    protected final int expressionIndex;
+
+    public AbstractDescriptor(int scope, ILogicalOperator operator, ILogicalExpression expression,
+            int expressionIndex) {
+        this.scope = scope;
+        this.operator = operator;
+        this.expression = expression;
+        this.expressionIndex = expressionIndex;
+    }
+
+    public ILogicalOperator getOperator() {
+        return operator;
+    }
+
+    public ILogicalExpression getExpression() {
+        return expression;
+    }
+
+    public int getExpressionIndex() {
+        return expressionIndex;
+    }
+
+    public int getScope() {
+        return scope;
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/DefineDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/DefineDescriptor.java
new file mode 100644
index 0000000..e263e7f
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/DefineDescriptor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.descriptor;
+
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+
+public class DefineDescriptor extends AbstractDescriptor {
+    private final LogicalVariable variable;
+
+    public DefineDescriptor(int scope, LogicalVariable variable, ILogicalOperator operator,
+            ILogicalExpression expression, int expressionIndex) {
+        super(scope, operator, expression, expressionIndex);
+        this.variable = variable;
+    }
+
+    public LogicalVariable getVariable() {
+        return variable;
+    }
+
+    public boolean isScanDefinition() {
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return operator.getOperatorTag() + ": [" + variable + "<-" + expression + "]";
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ScanDefineDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ScanDefineDescriptor.java
new file mode 100644
index 0000000..a8246cb
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ScanDefineDescriptor.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.descriptor;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.visitor.SimpleStringBuilderForIATypeVisitor;
+import org.apache.asterix.optimizer.rules.pushdown.schema.RootExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.visitor.ExpectedSchemaNodeToIATypeTranslatorVisitor;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
+
+public class ScanDefineDescriptor extends DefineDescriptor {
+    private final Dataset dataset;
+    private final List<LogicalVariable> primaryKeyVariables;
+    private final LogicalVariable metaRecordVariable;
+    private final Map<ILogicalExpression, ARecordType> paths;
+    private final Map<String, FunctionCallInformation> pathLocations;
+    private RootExpectedSchemaNode recordNode;
+    private RootExpectedSchemaNode metaNode;
+    private ILogicalExpression filterExpression;
+    private ILogicalExpression rangeFilterExpression;
+
+    public ScanDefineDescriptor(int scope, Dataset dataset, List<LogicalVariable> primaryKeyVariables,
+            LogicalVariable recordVariable, LogicalVariable metaRecordVariable, ILogicalOperator operator) {
+        super(scope, recordVariable, operator, null, -1);
+        this.primaryKeyVariables = primaryKeyVariables;
+        this.metaRecordVariable = metaRecordVariable;
+        this.dataset = dataset;
+        paths = new HashMap<>();
+        pathLocations = new HashMap<>();
+
+        recordNode = RootExpectedSchemaNode.ALL_FIELDS_ROOT_NODE;
+        if (hasMeta()) {
+            metaNode = RootExpectedSchemaNode.ALL_FIELDS_ROOT_NODE;
+        }
+    }
+
+    @Override
+    public boolean isScanDefinition() {
+        return true;
+    }
+
+    public Dataset getDataset() {
+        return dataset;
+    }
+
+    public List<LogicalVariable> getPrimaryKeyVariables() {
+        return primaryKeyVariables;
+    }
+
+    public boolean hasMeta() {
+        return metaRecordVariable != null;
+    }
+
+    public LogicalVariable getMetaRecordVariable() {
+        return metaRecordVariable;
+    }
+
+    public void setRecordNode(RootExpectedSchemaNode recordNode) {
+        this.recordNode = recordNode;
+    }
+
+    public RootExpectedSchemaNode getRecordNode() {
+        return recordNode;
+    }
+
+    public void setMetaNode(RootExpectedSchemaNode metaNode) {
+        this.metaNode = metaNode;
+    }
+
+    public RootExpectedSchemaNode getMetaNode() {
+        return metaNode;
+    }
+
+    public Map<ILogicalExpression, ARecordType> getFilterPaths() {
+        return paths;
+    }
+
+    public Map<String, FunctionCallInformation> getPathLocations() {
+        return pathLocations;
+    }
+
+    public void setFilterExpression(ILogicalExpression expression) {
+        this.filterExpression = expression;
+    }
+
+    public ILogicalExpression getFilterExpression() {
+        return filterExpression;
+    }
+
+    public void setRangeFilterExpression(ILogicalExpression rangeFilterExpression) {
+        this.rangeFilterExpression = rangeFilterExpression;
+    }
+
+    public ILogicalExpression getRangeFilterExpression() {
+        return rangeFilterExpression;
+    }
+
+    @Override
+    public String toString() {
+        ExpectedSchemaNodeToIATypeTranslatorVisitor converter =
+                new ExpectedSchemaNodeToIATypeTranslatorVisitor(new HashMap<>());
+        SimpleStringBuilderForIATypeVisitor typeStringVisitor = new SimpleStringBuilderForIATypeVisitor();
+        StringBuilder builder = new StringBuilder();
+
+        AbstractScanOperator scanOp = (AbstractScanOperator) operator;
+        builder.append("[record: ");
+        builder.append(getVariable());
+        if (hasMeta()) {
+            builder.append(", meta: ");
+            builder.append(metaRecordVariable);
+        }
+        builder.append("] <- ");
+        builder.append(scanOp.getOperatorTag());
+
+        builder.append('\n');
+        boolean fieldAccessPushdown = DatasetUtil.isFieldAccessPushdownSupported(dataset);
+        if (fieldAccessPushdown && !recordNode.isAllFields()) {
+            builder.append("project: ");
+            ARecordType recordType = (ARecordType) recordNode.accept(converter, "root");
+            recordType.accept(typeStringVisitor, builder);
+
+            if (hasMeta()) {
+                builder.append(" project-meta: ");
+                ARecordType metaType = (ARecordType) metaNode.accept(converter, "meta");
+                metaType.accept(typeStringVisitor, builder);
+            }
+        }
+
+        builder.append('\n');
+        if (filterExpression != null) {
+            builder.append("filter: ");
+            builder.append(filterExpression);
+        }
+
+        builder.append('\n');
+        if (rangeFilterExpression != null) {
+            builder.append("range-filter: ");
+            builder.append(rangeFilterExpression);
+        }
+
+        return builder.toString();
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/UseDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/UseDescriptor.java
new file mode 100644
index 0000000..7502013
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/UseDescriptor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.descriptor;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+
+public class UseDescriptor extends AbstractDescriptor {
+    private final Set<LogicalVariable> usedVariables;
+    private final LogicalVariable producedVariable;
+
+    public UseDescriptor(int scope, ILogicalOperator operator, ILogicalExpression expression, int expressionIndex,
+            LogicalVariable producedVariable) {
+        super(scope, operator, expression, expressionIndex);
+        this.usedVariables = new HashSet<>();
+        this.producedVariable = producedVariable;
+    }
+
+    public Set<LogicalVariable> getUsedVariables() {
+        return usedVariables;
+    }
+
+    public LogicalVariable getProducedVariable() {
+        return producedVariable;
+    }
+
+    @Override
+    public String toString() {
+        return operator.getOperatorTag() + ": [" + expression + "]";
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/DefUseChainComputerVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/DefUseChainComputerVisitor.java
new file mode 100644
index 0000000..5fd8f1e
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/DefUseChainComputerVisitor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.visitor;
+
+import java.util.List;
+
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+public class DefUseChainComputerVisitor implements ILogicalExpressionReferenceTransform {
+    private final PushdownContext pushdownContext;
+    private ILogicalOperator operator;
+    private List<LogicalVariable> producedVariables;
+    private int expressionIndex;
+
+    public DefUseChainComputerVisitor(PushdownContext pushdownContext) {
+        this.pushdownContext = pushdownContext;
+    }
+
+    public void init(ILogicalOperator operator, List<LogicalVariable> producedVariables) {
+        this.operator = operator;
+        this.producedVariables = producedVariables;
+        expressionIndex = 0;
+    }
+
+    @Override
+    public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+        ILogicalExpression expression = exprRef.getValue();
+        // compute use chain first
+        LogicalVariable producedVariable = producedVariables != null ? producedVariables.get(expressionIndex) : null;
+        pushdownContext.use(operator, expression, expressionIndex, producedVariable);
+        if (producedVariable != null) {
+            // then define the produced variable
+            pushdownContext.define(producedVariable, operator, expression, expressionIndex);
+        }
+        expressionIndex++;
+        return false;
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaNodeToIATypeTranslatorVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
similarity index 98%
rename from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
rename to asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
index 32e4b18..825db1e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.optimizer.rules.pushdown;
+package org.apache.asterix.optimizer.rules.pushdown.visitor;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java
new file mode 100644
index 0000000..65b131a
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.visitor;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.common.config.DatasetConfig.DatasetFormat;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.schema.RootExpectedSchemaNode;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * This visitor visits the entire plan and tries to build the information of the required values from all dataset
+ */
+public class PushdownOperatorVisitor implements ILogicalOperatorVisitor<Void, Void> {
+    private static final List<LogicalVariable> EMPTY_VARIABLES = Collections.emptyList();
+    private final PushdownContext pushdownContext;
+    private final IOptimizationContext context;
+    private final DefUseChainComputerVisitor defUseComputer;
+    private final Set<ILogicalOperator> visitedOperators;
+
+    public PushdownOperatorVisitor(PushdownContext pushdownContext, IOptimizationContext context) {
+        this.pushdownContext = pushdownContext;
+        this.context = context;
+        defUseComputer = new DefUseChainComputerVisitor(pushdownContext);
+        visitedOperators = new HashSet<>();
+    }
+
+    /**
+     * Visit every input of an operator. Then, start pushdown any value expression that the operator has
+     *
+     * @param op                the operator to process
+     * @param producedVariables any produced variables by the operator. We only care about the {@link AssignOperator}
+     *                          and {@link UnnestOperator} variables for now.
+     */
+    private void visitInputs(ILogicalOperator op, List<LogicalVariable> producedVariables) throws AlgebricksException {
+        if (visitedOperators.contains(op)) {
+            return;
+        }
+        for (Mutable<ILogicalOperator> child : op.getInputs()) {
+            child.getValue().accept(this, null);
+        }
+        visitedOperators.add(op);
+        // Enter scope for (new stage) for operators like GROUP and JOIN
+        pushdownContext.enterScope(op);
+        defUseComputer.init(op, producedVariables);
+        op.acceptExpressionTransform(defUseComputer);
+    }
+
+    /*
+     * ******************************************************************************
+     * Operators that need to handle special cases
+     * ******************************************************************************
+     */
+
+    @Override
+    public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        //Set as empty records for data-scan or unnest-map if certain variables are projected out
+        setEmptyRecord(op.getInputs().get(0).getValue(), op.getVariables());
+        return null;
+    }
+
+    /**
+     * From the {@link DataSourceScanOperator}, we need to register the payload variable (record variable) to check
+     * which expression in the plan is using it.
+     */
+    @Override
+    public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+        DatasetDataSource datasetDataSource = getDatasetDataSourceIfApplicable((DataSource) op.getDataSource());
+        registerDatasetIfApplicable(datasetDataSource, op);
+        visitInputs(op);
+        return null;
+    }
+
+    /**
+     * From the {@link UnnestMapOperator}, we need to register the payload variable (record variable) to check
+     * which expression in the plan is using it.
+     */
+    @Override
+    public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        DatasetDataSource datasetDataSource = getDatasetDataSourceIfApplicable(getDataSourceFromUnnestMapOperator(op));
+        registerDatasetIfApplicable(datasetDataSource, op);
+        return null;
+    }
+
+    @Override
+    public Void visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op, op.getVariables());
+        if (!op.isGlobal() && isCountConstant(op.getExpressions())) {
+            /*
+             * Optimize the SELECT COUNT(*) case
+             * It is local aggregate and has agg-sql-count function with a constant argument. Set empty record if the
+             * input operator is DataSourceScanOperator
+             */
+            setEmptyRecord(op.getInputs().get(0).getValue(), EMPTY_VARIABLES);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op, op.getVariables());
+        return null;
+    }
+
+    /*
+     * ******************************************************************************
+     * Helper methods
+     * ******************************************************************************
+     */
+
+    /**
+     * The role of this method is:
+     * 1- Check whether the datasource allows value access pushdowns
+     * 2- return the actual DatasetDataSource
+     */
+    private DatasetDataSource getDatasetDataSourceIfApplicable(DataSource dataSource) throws AlgebricksException {
+        if (dataSource == null || dataSource.getDatasourceType() == DataSource.Type.SAMPLE) {
+            return null;
+        }
+
+        Dataset dataset = getDataset(dataSource);
+        //Only external dataset can have pushed down expressions
+        if (dataset.getDatasetType() == DatasetConfig.DatasetType.EXTERNAL
+                && !ExternalDataUtils
+                        .supportsPushdown(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties())
+                || dataset.getDatasetType() == DatasetConfig.DatasetType.INTERNAL
+                        && dataset.getDatasetFormatInfo().getFormat() == DatasetFormat.ROW) {
+            return null;
+        }
+
+        return (DatasetDataSource) dataSource;
+    }
+
+    private Dataset getDataset(DataSource dataSource) throws AlgebricksException {
+        MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
+        DataverseName dataverse = dataSource.getId().getDataverseName();
+        String datasetName = dataSource.getId().getDatasourceName();
+        return mp.findDataset(dataverse, datasetName);
+    }
+
+    /**
+     * Find datasource from {@link UnnestMapOperator}
+     *
+     * @param unnest unnest map operator
+     * @return datasource
+     */
+    private DataSource getDataSourceFromUnnestMapOperator(UnnestMapOperator unnest) throws AlgebricksException {
+        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) unnest.getExpressionRef().getValue();
+        String dataverse = ConstantExpressionUtil.getStringArgument(funcExpr, 2);
+        String dataset = ConstantExpressionUtil.getStringArgument(funcExpr, 3);
+        if (!ConstantExpressionUtil.getStringArgument(funcExpr, 0).equals(dataset)) {
+            return null;
+        }
+
+        DataSourceId dsid = new DataSourceId(DataverseName.createFromCanonicalForm(dataverse), dataset);
+        MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
+        return metadataProvider.findDataSource(dsid);
+    }
+
+    private void registerDatasetIfApplicable(DatasetDataSource datasetDataSource, AbstractScanOperator op)
+            throws AlgebricksException {
+        if (datasetDataSource != null) {
+            Dataset dataset = getDataset(datasetDataSource);
+            List<LogicalVariable> primaryKeyVariables = datasetDataSource.getPrimaryKeyVariables(op.getVariables());
+            LogicalVariable recordVar = datasetDataSource.getDataRecordVariable(op.getVariables());
+            LogicalVariable metaVar = datasetDataSource.getMetaVariable(op.getVariables());
+            pushdownContext.registerScan(dataset, primaryKeyVariables, recordVar, metaVar, op);
+        }
+    }
+
+    /**
+     * If the inputOp is a {@link DataSourceScanOperator} or {@link UnnestMapOperator}, then set the projected value
+     * needed as empty record if any variable originated from either operators are not in {@code retainedVariables}
+     *
+     * @param inputOp           an operator that is potentially a {@link DataSourceScanOperator} or a {@link
+     *                          UnnestMapOperator}
+     * @param retainedVariables variables that should be retained
+     * @see #visitAggregateOperator(AggregateOperator, Void)
+     * @see #visitProjectOperator(ProjectOperator, Void)
+     */
+    private void setEmptyRecord(ILogicalOperator inputOp, List<LogicalVariable> retainedVariables)
+            throws AlgebricksException {
+        LogicalOperatorTag tag = inputOp.getOperatorTag();
+        if (tag != LogicalOperatorTag.DATASOURCESCAN && tag != LogicalOperatorTag.UNNEST_MAP) {
+            return;
+        }
+
+        DataSource dataSource;
+        List<LogicalVariable> variables;
+        Mutable<ILogicalExpression> selectCondition;
+        if (inputOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+            DataSourceScanOperator scan = (DataSourceScanOperator) inputOp;
+            dataSource = (DataSource) scan.getDataSource();
+            variables = scan.getVariables();
+            selectCondition = scan.getSelectCondition();
+        } else {
+            UnnestMapOperator unnest = (UnnestMapOperator) inputOp;
+            dataSource = getDataSourceFromUnnestMapOperator(unnest);
+            variables = unnest.getVariables();
+            selectCondition = unnest.getSelectCondition();
+        }
+
+        DatasetDataSource datasetDataSource = getDatasetDataSourceIfApplicable(dataSource);
+
+        if (datasetDataSource == null) {
+            //Does not support pushdown
+            return;
+        }
+
+        Set<LogicalVariable> selectConditionVariables = new HashSet<>();
+        if (selectCondition != null) {
+            //Get the used variables for a select condition
+            selectCondition.getValue().getUsedVariables(selectConditionVariables);
+        }
+
+        //We know that we only need the count of objects. So return empty objects only
+        LogicalVariable recordVar = datasetDataSource.getDataRecordVariable(variables);
+        ScanDefineDescriptor scanDefDesc = (ScanDefineDescriptor) pushdownContext.getDefineDescriptor(recordVar);
+
+        /*
+         * If the recordVar is not retained by an upper operator and not used by a select condition, then return empty
+         * record instead of the entire record.
+         */
+        if (!retainedVariables.contains(recordVar) && !selectConditionVariables.contains(recordVar)) {
+            /*
+             * Set the root node as EMPTY_ROOT_NODE (i.e., no fields will be read from disk). We register the
+             * dataset with EMPTY_ROOT_NODE so that we skip pushdowns on empty node.
+             */
+            scanDefDesc.setRecordNode(RootExpectedSchemaNode.EMPTY_ROOT_NODE);
+        }
+
+        if (scanDefDesc.hasMeta()) {
+            //Do the same for meta
+            if (!retainedVariables.contains(scanDefDesc.getMetaRecordVariable())) {
+                scanDefDesc.setMetaNode(RootExpectedSchemaNode.EMPTY_ROOT_NODE);
+            }
+        }
+    }
+
+    private boolean isCountConstant(List<Mutable<ILogicalExpression>> expressions) {
+        if (expressions.size() != 1) {
+            return false;
+        }
+        ILogicalExpression expression = expressions.get(0).getValue();
+        if (expression.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            return false;
+        }
+        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expression;
+        FunctionIdentifier fid = funcExpr.getFunctionIdentifier();
+        return BuiltinFunctions.SQL_COUNT.equals(fid)
+                && funcExpr.getArguments().get(0).getValue().getExpressionTag() == LogicalExpressionTag.CONSTANT;
+    }
+
+    private void visitSubplans(List<ILogicalPlan> nestedPlans) throws AlgebricksException {
+        for (ILogicalPlan plan : nestedPlans) {
+            for (Mutable<ILogicalOperator> root : plan.getRoots()) {
+                root.getValue().accept(this, null);
+            }
+        }
+    }
+
+    /*
+     * ******************************************************************************
+     * Pushdown when possible for each operator
+     * ******************************************************************************
+     */
+
+    @Override
+    public Void visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        visitSubplans(op.getNestedPlans());
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op, op.getVariables());
+        return null;
+    }
+
+    @Override
+    public Void visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op, op.getVariables());
+        return null;
+    }
+
+    @Override
+    public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op, op.getVariables());
+        visitSubplans(op.getNestedPlans());
+        return null;
+    }
+
+    @Override
+    public Void visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitWriteOperator(WriteOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
+            throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitForwardOperator(ForwardOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        visitSubplans(op.getNestedPlans());
+        return null;
+    }
+
+    private void visitInputs(ILogicalOperator op) throws AlgebricksException {
+        visitInputs(op, null);
+    }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 277602d..06e949f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -44,6 +44,7 @@
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
@@ -54,6 +55,7 @@
 import org.apache.asterix.metadata.entities.CompactionPolicy;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.entities.NodeGroup;
@@ -682,4 +684,28 @@
     public static boolean isNotView(Dataset dataset) {
         return dataset.getDatasetType() != DatasetType.VIEW;
     }
+
+    public static boolean isFieldAccessPushdownSupported(Dataset dataset) {
+        DatasetType datasetType = dataset.getDatasetType();
+        if (datasetType == DatasetType.INTERNAL) {
+            return dataset.getDatasetFormatInfo().getFormat() == DatasetConfig.DatasetFormat.COLUMN;
+        } else if (datasetType == DatasetType.EXTERNAL) {
+            ExternalDatasetDetails edd = (ExternalDatasetDetails) dataset.getDatasetDetails();
+            return ExternalDataUtils.supportsPushdown(edd.getProperties());
+        }
+        return false;
+    }
+
+    public static boolean isFilterPushdownSupported(Dataset dataset) {
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            return dataset.getDatasetFormatInfo().getFormat() == DatasetConfig.DatasetFormat.COLUMN;
+        }
+        // TODO add external dataset with dynamic prefixes
+        return false;
+    }
+
+    public static boolean isRangeFilterPushdownSupported(Dataset dataset) {
+        return dataset.getDatasetType() == DatasetType.INTERNAL
+                && dataset.getDatasetFormatInfo().getFormat() == DatasetConfig.DatasetFormat.COLUMN;
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ProjectionFiltrationUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ProjectionFiltrationUtil.java
new file mode 100644
index 0000000..6f2d5a4
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ProjectionFiltrationUtil.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.projection;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+
+public class ProjectionFiltrationUtil {
+    //Default open record type when requesting the entire fields
+    public static final ARecordType ALL_FIELDS_TYPE = createType("");
+    //Default open record type when requesting none of the fields
+    public static final ARecordType EMPTY_TYPE = createType("{}");
+
+    private ProjectionFiltrationUtil() {
+    }
+
+    private static ARecordType createType(String typeName) {
+        return new ARecordType(typeName, new String[] {}, new IAType[] {}, true);
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
index 9553913..ced6371 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
@@ -89,6 +89,9 @@
         return gByList;
     }
 
+    /**
+     * @return returns the variables of the group-by keys
+     */
     public List<LogicalVariable> getGroupByVarList() {
         List<LogicalVariable> varList = new ArrayList<>(gByList.size());
         for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : gByList) {
@@ -132,6 +135,18 @@
         }
     }
 
+    /**
+     * @return all variables produced by the group-by operator (for group-by-list followed by decor-list)
+     * Note: the list may contain null values -- as some decor expressions may not be assigned to variables
+     * @see #getProducedVariablesExceptNestedPlans(Collection) to get a collection of all variables without nulls
+     */
+    public List<LogicalVariable> getVariables() {
+        List<LogicalVariable> variables = new ArrayList<>(gByList.size() + decorList.size());
+        gByList.stream().map(Pair::getFirst).forEach(variables::add);
+        decorList.stream().map(Pair::getFirst).forEach(variables::add);
+        return variables;
+    }
+
     @Override
     public void getProducedVariablesExceptNestedPlans(Collection<LogicalVariable> vars) {
         for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {