[ASTERIXDB-3339][COMP] Fix filter pushdown with subplans

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Ensure that SELECT conditions in subplans are not
blindly pushed into data-scan.

Change-Id: Ib309ac7d756dd60b33e393ed44b953d2e773aa84
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18090
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index 21d6358..16aeecb 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -92,7 +92,7 @@
         boolean changed = false;
         if (run) {
             // Context holds all the necessary information to perform pushdowns
-            PushdownContext pushdownContext = new PushdownContext();
+            PushdownContext pushdownContext = new PushdownContext(context);
             // Compute all the necessary pushdown information and performs inter-operator pushdown optimizations
             PushdownOperatorVisitor pushdownInfoComputer = new PushdownOperatorVisitor(pushdownContext, context);
             opRef.getValue().accept(pushdownInfoComputer, null);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
index 92dbff8..1622c6e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.optimizer.rules.pushdown;
 
-import static org.apache.asterix.metadata.utils.PushdownUtil.getArrayConstantFromScanCollection;
-
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -27,28 +25,16 @@
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.om.base.AOrderedList;
-import org.apache.asterix.om.constants.AsterixConstantValue;
-import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.DefineDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.UseDescriptor;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.asterix.optimizer.rules.pushdown.visitor.FilterExpressionInlineVisitor;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
-import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -62,25 +48,40 @@
     private final Map<LogicalVariable, DefineDescriptor> defineChain;
     private final Map<LogicalVariable, List<UseDescriptor>> useChain;
     private final List<ILogicalOperator> scopes;
-    private final Map<ILogicalOperator, ILogicalExpression> inlinedCache;
+    private final FilterExpressionInlineVisitor inlineVisitor;
     private final Map<Dataset, List<ScanDefineDescriptor>> datasetToScans;
+    private ILogicalOperator currentSubplan;
 
-    public PushdownContext() {
+    public PushdownContext(IOptimizationContext context) {
         registeredScans = new ArrayList<>();
         this.definedVariable = new HashMap<>();
         this.defineChain = new HashMap<>();
         this.useChain = new HashMap<>();
         scopes = new ArrayList<>();
-        inlinedCache = new HashMap<>();
+        inlineVisitor = new FilterExpressionInlineVisitor(this, context);
         datasetToScans = new HashMap<>();
     }
 
     public void enterScope(ILogicalOperator operator) {
-        if (SCOPE_OPERATORS.contains(operator.getOperatorTag())) {
+        LogicalOperatorTag opTag = operator.getOperatorTag();
+        if (SCOPE_OPERATORS.contains(opTag)) {
+            scopes.add(operator);
+        } else if (opTag == LogicalOperatorTag.AGGREGATE && currentSubplan == null) {
+            // Advance scope for aggregate if the aggregate is not in a subplan
             scopes.add(operator);
         }
     }
 
+    public ILogicalOperator enterSubplan(ILogicalOperator subplanOp) {
+        ILogicalOperator previous = currentSubplan;
+        currentSubplan = subplanOp;
+        return previous;
+    }
+
+    public void exitSubplan(ILogicalOperator previousSubplan) {
+        currentSubplan = previousSubplan;
+    }
+
     public void registerScan(Dataset dataset, List<LogicalVariable> pkList, LogicalVariable recordVariable,
             LogicalVariable metaVariable, AbstractScanOperator scanOperator) {
         ScanDefineDescriptor scanDefDesc =
@@ -100,6 +101,10 @@
         datasetScans.add(scanDefDesc);
     }
 
+    public Map<Dataset, List<ScanDefineDescriptor>> getDatasetToScanDefinitionDescriptors() {
+        return datasetToScans;
+    }
+
     public void define(LogicalVariable variable, ILogicalOperator operator, ILogicalExpression expression,
             int expressionIndex) {
         if (defineChain.containsKey(variable)) {
@@ -117,7 +122,7 @@
 
         int scope = scopes.size();
         DefineDescriptor defineDescriptor =
-                new DefineDescriptor(scope, variable, operator, expression, expressionIndex);
+                new DefineDescriptor(scope, currentSubplan, variable, operator, expression, expressionIndex);
         definedVariable.put(expression, defineDescriptor);
         defineChain.put(variable, defineDescriptor);
         useChain.put(variable, new ArrayList<>());
@@ -126,7 +131,8 @@
     public void use(ILogicalOperator operator, ILogicalExpression expression, int expressionIndex,
             LogicalVariable producedVariable) {
         int scope = scopes.size();
-        UseDescriptor useDescriptor = new UseDescriptor(scope, operator, expression, expressionIndex, producedVariable);
+        UseDescriptor useDescriptor =
+                new UseDescriptor(scope, currentSubplan, operator, expression, expressionIndex, producedVariable);
         Set<LogicalVariable> usedVariables = useDescriptor.getUsedVariables();
         expression.getUsedVariables(usedVariables);
         for (LogicalVariable variable : usedVariables) {
@@ -162,98 +168,14 @@
         return registeredScans;
     }
 
-    public ILogicalExpression cloneAndInlineExpression(UseDescriptor useDescriptor, IOptimizationContext context)
-            throws CompilationException {
-        ILogicalOperator op = useDescriptor.getOperator();
-        ILogicalExpression inlinedExpr = inlinedCache.get(op);
-        if (inlinedExpr == null) {
-            inlinedExpr = cloneAndInline(useDescriptor.getExpression(), context);
-            inlinedCache.put(op, inlinedExpr);
-        }
-
-        // Clone the cached expression as a processor may change it
-        return inlinedExpr.cloneExpression();
-    }
-
-    public Map<Dataset, List<ScanDefineDescriptor>> getDatasetToScanDefinitionDescriptors() {
-        return datasetToScans;
-    }
-
-    private ILogicalExpression cloneAndInline(ILogicalExpression expression, IOptimizationContext context)
-            throws CompilationException {
-        switch (expression.getExpressionTag()) {
-            case CONSTANT:
-                return expression;
-            case FUNCTION_CALL:
-                return cloneAndInlineFunction(expression, context);
-            case VARIABLE:
-                LogicalVariable variable = ((VariableReferenceExpression) expression).getVariableReference();
-                DefineDescriptor defineDescriptor = defineChain.get(variable);
-                if (defineDescriptor == null || defineDescriptor.isScanDefinition()) {
-                    // Reached un-filterable source variable (e.g., originated from an internal dataset in row format)
-                    // or filterable source recordVariable (e.g., columnar dataset or external dataset with prefix)
-                    return expression;
-                }
-                return cloneAndInline(defineDescriptor.getExpression(), context);
-            default:
-                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, expression.getSourceLocation());
-        }
-    }
-
-    private ILogicalExpression cloneAndInlineFunction(ILogicalExpression expression, IOptimizationContext context)
-            throws CompilationException {
-        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expression.cloneExpression();
-        for (Mutable<ILogicalExpression> arg : funcExpr.getArguments()) {
-            arg.setValue(cloneAndInline(arg.getValue(), context));
-        }
-        return convertToOr(funcExpr, context);
-    }
-
-    /**
-     * Converts eq(scan-collection(array: [a, b, c...]), expr) to or(eq(a, expr), eq(b, expr), eq(c, expr), ...)
-     *
-     * @param expression a function expression
-     * @return a converted expression if applicable
-     */
-    private static ILogicalExpression convertToOr(AbstractFunctionCallExpression expression,
-            IOptimizationContext context) {
-        if (!BuiltinFunctions.EQ.equals(expression.getFunctionIdentifier())) {
-            return expression;
-        }
-        ILogicalExpression left = expression.getArguments().get(0).getValue();
-        ILogicalExpression right = expression.getArguments().get(1).getValue();
-
-        ILogicalExpression valueExpr = left;
-        AOrderedList constArray = getArrayConstantFromScanCollection(right);
-        if (constArray == null) {
-            valueExpr = right;
-            constArray = getArrayConstantFromScanCollection(left);
-        }
-
-        if (constArray == null) {
-            return expression;
-        }
-
-        IFunctionInfo orInfo = context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.OR);
-        List<Mutable<ILogicalExpression>> orArgs = new ArrayList<>();
-        AbstractFunctionCallExpression orExpr = new ScalarFunctionCallExpression(orInfo, orArgs);
-
-        IFunctionInfo eqInfo = context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.EQ);
-        for (int i = 0; i < constArray.size(); i++) {
-            List<Mutable<ILogicalExpression>> eqArgs = new ArrayList<>(2);
-            eqArgs.add(new MutableObject<>(valueExpr));
-            eqArgs.add(new MutableObject<>(new ConstantExpression(new AsterixConstantValue(constArray.getItem(i)))));
-
-            orArgs.add(new MutableObject<>(new ScalarFunctionCallExpression(eqInfo, eqArgs)));
-        }
-
-        return orExpr;
+    public FilterExpressionInlineVisitor getInlineVisitor() {
+        return inlineVisitor;
     }
 
     private static Set<LogicalOperatorTag> getScopeOperators() {
         return EnumSet.of(LogicalOperatorTag.INNERJOIN, LogicalOperatorTag.LEFTOUTERJOIN, LogicalOperatorTag.GROUP,
-                LogicalOperatorTag.AGGREGATE, LogicalOperatorTag.WINDOW, LogicalOperatorTag.RUNNINGAGGREGATE,
-                LogicalOperatorTag.UNIONALL, LogicalOperatorTag.INTERSECT);
+                LogicalOperatorTag.WINDOW, LogicalOperatorTag.RUNNINGAGGREGATE, LogicalOperatorTag.UNIONALL,
+                LogicalOperatorTag.INTERSECT);
     }
 
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/AbstractDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/AbstractDescriptor.java
index 601b7d5..e0b71ac 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/AbstractDescriptor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/AbstractDescriptor.java
@@ -23,13 +23,15 @@
 
 class AbstractDescriptor {
     protected final int scope;
+    protected final ILogicalOperator subplanOperator;
     protected final ILogicalOperator operator;
     protected final ILogicalExpression expression;
     protected final int expressionIndex;
 
-    public AbstractDescriptor(int scope, ILogicalOperator operator, ILogicalExpression expression,
-            int expressionIndex) {
+    public AbstractDescriptor(int scope, ILogicalOperator subplanOperator, ILogicalOperator operator,
+            ILogicalExpression expression, int expressionIndex) {
         this.scope = scope;
+        this.subplanOperator = subplanOperator;
         this.operator = operator;
         this.expression = expression;
         this.expressionIndex = expressionIndex;
@@ -50,4 +52,12 @@
     public int getScope() {
         return scope;
     }
+
+    public boolean inSubplan() {
+        return subplanOperator != null;
+    }
+
+    public ILogicalOperator getSubplanOperator() {
+        return subplanOperator;
+    }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/DefineDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/DefineDescriptor.java
index e263e7f..78e2676 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/DefineDescriptor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/DefineDescriptor.java
@@ -25,9 +25,9 @@
 public class DefineDescriptor extends AbstractDescriptor {
     private final LogicalVariable variable;
 
-    public DefineDescriptor(int scope, LogicalVariable variable, ILogicalOperator operator,
-            ILogicalExpression expression, int expressionIndex) {
-        super(scope, operator, expression, expressionIndex);
+    public DefineDescriptor(int scope, ILogicalOperator subplanOperator, LogicalVariable variable,
+            ILogicalOperator operator, ILogicalExpression expression, int expressionIndex) {
+        super(scope, subplanOperator, operator, expression, expressionIndex);
         this.variable = variable;
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ScanDefineDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ScanDefineDescriptor.java
index a8246cb..3f3fa20 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ScanDefineDescriptor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ScanDefineDescriptor.java
@@ -47,7 +47,7 @@
 
     public ScanDefineDescriptor(int scope, Dataset dataset, List<LogicalVariable> primaryKeyVariables,
             LogicalVariable recordVariable, LogicalVariable metaRecordVariable, ILogicalOperator operator) {
-        super(scope, recordVariable, operator, null, -1);
+        super(scope, null, recordVariable, operator, null, -1);
         this.primaryKeyVariables = primaryKeyVariables;
         this.metaRecordVariable = metaRecordVariable;
         this.dataset = dataset;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/UseDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/UseDescriptor.java
index 7502013..a213b6f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/UseDescriptor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/UseDescriptor.java
@@ -29,9 +29,9 @@
     private final Set<LogicalVariable> usedVariables;
     private final LogicalVariable producedVariable;
 
-    public UseDescriptor(int scope, ILogicalOperator operator, ILogicalExpression expression, int expressionIndex,
-            LogicalVariable producedVariable) {
-        super(scope, operator, expression, expressionIndex);
+    public UseDescriptor(int scope, ILogicalOperator subplanOperator, ILogicalOperator operator,
+            ILogicalExpression expression, int expressionIndex, LogicalVariable producedVariable) {
+        super(scope, subplanOperator, operator, expression, expressionIndex);
         this.usedVariables = new HashSet<>();
         this.producedVariable = producedVariable;
     }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
index 02870c5..a150a10 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
@@ -23,10 +23,14 @@
 import static org.apache.asterix.metadata.utils.PushdownUtil.isCompare;
 import static org.apache.asterix.metadata.utils.PushdownUtil.isConstant;
 import static org.apache.asterix.metadata.utils.PushdownUtil.isFilterPath;
+import static org.apache.asterix.metadata.utils.PushdownUtil.isSupportedFilterAggregateFunction;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.asterix.om.base.IAObject;
@@ -34,6 +38,7 @@
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.DefineDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.UseDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.visitor.FilterExpressionInlineVisitor;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -41,15 +46,22 @@
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 
 abstract class AbstractFilterPushdownProcessor extends AbstractPushdownProcessor {
     private final Set<ILogicalOperator> visitedOperators;
+    private final Map<ILogicalOperator, List<UseDescriptor>> subplanSelects;
+    private final List<UseDescriptor> scanCandidateFilters;
+    private final Set<LogicalVariable> subplanProducedVariables;
 
     public AbstractFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
         super(pushdownContext, context);
         visitedOperators = new HashSet<>();
+        subplanSelects = new HashMap<>();
+        scanCandidateFilters = new ArrayList<>();
+        subplanProducedVariables = new HashSet<>();
     }
 
     @Override
@@ -60,8 +72,12 @@
             if (skip(scanDefineDescriptor)) {
                 continue;
             }
+            subplanSelects.clear();
+            scanCandidateFilters.clear();
             prepareScan(scanDefineDescriptor);
-            changed |= pushdownFilter(scanDefineDescriptor, scanDefineDescriptor);
+            collectFiltersInformation(scanDefineDescriptor, scanDefineDescriptor);
+            putPotentialSelects(scanDefineDescriptor);
+            changed |= pushdownFilter(scanDefineDescriptor);
         }
         return changed;
     }
@@ -91,12 +107,12 @@
             throws AlgebricksException;
 
     /**
-     * Is an expression pushable
+     * Is an expression NOT pushable
      *
      * @param expression the expression to push down
-     * @return true if it is pushable, false otherwise
+     * @return true if it is NOT pushable, false otherwise
      */
-    protected abstract boolean isPushable(AbstractFunctionCallExpression expression);
+    protected abstract boolean isNotPushable(AbstractFunctionCallExpression expression);
 
     /**
      * Handle a compare function
@@ -123,53 +139,117 @@
     protected abstract void putFilterInformation(ScanDefineDescriptor scanDefineDescriptor,
             ILogicalExpression inlinedExpr) throws AlgebricksException;
 
-    private boolean pushdownFilter(DefineDescriptor defineDescriptor, ScanDefineDescriptor scanDefineDescriptor)
-            throws AlgebricksException {
+    /**
+     * Collects all the selects that appear at the same scope of 'defineDescriptor' and that are not part of a subplan
+     *
+     * @param defineDescriptor to get its use descriptors
+     * @param scanDescriptor   data-scan descriptor
+     */
+    private void collectFiltersInformation(DefineDescriptor defineDescriptor, ScanDefineDescriptor scanDescriptor) {
         List<UseDescriptor> useDescriptors = pushdownContext.getUseDescriptors(defineDescriptor);
-        boolean changed = false;
+
+        // First find candidates for filter pushdowns
         for (UseDescriptor useDescriptor : useDescriptors) {
-            /*
-             * Pushdown works only if the scope(use) and scope(scan) are the same, as we cannot pushdown when
-             * scope(use) > scope(scan) (e.g., after join or group-by)
-             */
-            ILogicalOperator useOperator = useDescriptor.getOperator();
-            if (useDescriptor.getScope() == scanDefineDescriptor.getScope()
-                    && (useOperator.getOperatorTag() == LogicalOperatorTag.SELECT
-                            || useOperator.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN)
-                    && isPushdownAllowed(useOperator)) {
-                changed |= inlineAndPushdownFilter(useDescriptor, scanDefineDescriptor);
-            } else if (useOperator.getOperatorTag() == LogicalOperatorTag.INNERJOIN) {
-                changed |= inlineAndPushdownFilter(useDescriptor, scanDefineDescriptor);
+            if (visitedOperators.contains(useDescriptor.getOperator())) {
+                continue;
+            }
+            if (canPushSelect(useDescriptor, scanDescriptor)) {
+                scanCandidateFilters.add(useDescriptor);
+            } else if (useDescriptor.getOperator().getOperatorTag() == LogicalOperatorTag.INNERJOIN) {
+                scanCandidateFilters.add(useDescriptor);
             }
         }
 
+        // Next, descend using the def-use chain to find other candidates
         for (UseDescriptor useDescriptor : useDescriptors) {
             DefineDescriptor nextDefineDescriptor = pushdownContext.getDefineDescriptor(useDescriptor);
             if (nextDefineDescriptor != null) {
-                changed |= pushdownFilter(nextDefineDescriptor, scanDefineDescriptor);
+                collectFiltersInformation(nextDefineDescriptor, scanDescriptor);
             }
+            visitedOperators.add(useDescriptor.getOperator());
+        }
+    }
+
+    /**
+     * If {@link #subplanSelects} is not empty, the check if the subplan correspond to some sub-filter
+     * that cannot be linked using the def-use chain
+     * Example:
+     * <p>
+     * select ($$26)
+     * ... subplan
+     * ... ... aggregate [$$26] <- [non-empty-stream()]
+     * ... ... select (SOME_CONDITION)
+     * <p>
+     * In this example, the def-use chain cannot "chain" the nested SELECT with the upper SELECT 'select ($$26)' as the
+     * function 'non-empty-stream()' is argument-less and does not use any variable originated from the data-scan.
+     * However, we can do the "linking" by checking the produced variables of the subplan and find all their associated
+     * {@link DefineDescriptor}. In the example, that would be the variable $$26 which is defined as
+     * aggregate [$$26] <- [non-empty-stream()]. This would establish the connection between 'select ($$26)' and the
+     * nested 'select (SOME_CONDITION)'
+     *
+     * @param scanDescriptor data-scan descriptor
+     */
+    private void putPotentialSelects(ScanDefineDescriptor scanDescriptor) throws AlgebricksException {
+        for (Map.Entry<ILogicalOperator, List<UseDescriptor>> selects : subplanSelects.entrySet()) {
+            ILogicalOperator subplan = selects.getKey();
+            subplanProducedVariables.clear();
+            VariableUtilities.getProducedVariables(subplan, subplanProducedVariables);
+            for (LogicalVariable producedVar : subplanProducedVariables) {
+                DefineDescriptor defineDescriptor = pushdownContext.getDefineDescriptor(producedVar);
+                if (defineDescriptor != null && !visitedOperators.contains(defineDescriptor.getOperator())
+                        && isSupportedFilterAggregateFunction(defineDescriptor.getExpression())) {
+                    // A define descriptor that has not been visited and has a supported filter aggregate function
+                    // check for any missed SELECT
+                    collectFiltersInformation(defineDescriptor, scanDescriptor);
+                }
+            }
+        }
+    }
+
+    private boolean pushdownFilter(ScanDefineDescriptor scanDescriptor) throws AlgebricksException {
+        boolean changed = false;
+        for (UseDescriptor candidate : scanCandidateFilters) {
+            changed |= inlineAndPushdownFilter(candidate, scanDescriptor);
         }
 
         return changed;
     }
 
-    private boolean isPushdownAllowed(ILogicalOperator useOperator) {
-        Boolean disallowed = (Boolean) useOperator.getAnnotations()
-                .getOrDefault(OperatorAnnotations.DISALLOW_FILTER_PUSHDOWN_TO_SCAN, Boolean.FALSE);
-        return disallowed == Boolean.FALSE;
+    private boolean canPushSelect(UseDescriptor useDescriptor, ScanDefineDescriptor scanDescriptor) {
+        ILogicalOperator useOperator = useDescriptor.getOperator();
+        /*
+         * Pushdown works only if the scope(use) and scope(scan) are the same, as we cannot pushdown when
+         * scope(use) > scope(scan) (e.g., after join or group-by)
+         */
+        if (useDescriptor.getScope() != scanDescriptor.getScope()) {
+            return false;
+        }
+
+        // only select or data-scan are allowed (scan can have pushed condition)
+        if (useOperator.getOperatorTag() != LogicalOperatorTag.SELECT
+                && useOperator.getOperatorTag() != LogicalOperatorTag.DATASOURCESCAN) {
+            return false;
+        }
+
+        // Do not push selects in sub-plan now. They will be pushed later on
+        boolean inSubplan = useDescriptor.inSubplan();
+        if (inSubplan && useOperator.getOperatorTag() == LogicalOperatorTag.SELECT) {
+            ILogicalOperator subplanOp = useDescriptor.getSubplanOperator();
+            List<UseDescriptor> selects = subplanSelects.computeIfAbsent(subplanOp, k -> new ArrayList<>());
+            selects.add(useDescriptor);
+        }
+
+        // Finally, push down if not in subplan
+        return !inSubplan;
     }
 
     private boolean inlineAndPushdownFilter(UseDescriptor useDescriptor, ScanDefineDescriptor scanDefineDescriptor)
             throws AlgebricksException {
-        ILogicalOperator op = useDescriptor.getOperator();
-        if (visitedOperators.contains(op)) {
-            // Skip and follow through to find any other selects that can be pushed down
-            return false;
-        }
         boolean changed = false;
 
+        FilterExpressionInlineVisitor inliningVisitor = pushdownContext.getInlineVisitor();
         // Get a clone of the operator's expression and inline it
-        ILogicalExpression inlinedExpr = pushdownContext.cloneAndInlineExpression(useDescriptor, context);
+        ILogicalExpression inlinedExpr = inliningVisitor.cloneAndInline(useDescriptor, subplanSelects);
 
         // Prepare for pushdown
         preparePushdown(useDescriptor, scanDefineDescriptor);
@@ -178,8 +258,6 @@
             changed = true;
         }
 
-        // Do not push down a select twice.
-        visitedOperators.add(op);
         return changed;
     }
 
@@ -218,7 +296,7 @@
     }
 
     private boolean handleFunction(AbstractFunctionCallExpression expression) throws AlgebricksException {
-        if (!expression.getFunctionInfo().isFunctional() || !isPushable(expression)) {
+        if (!expression.getFunctionInfo().isFunctional() || isNotPushable(expression)) {
             return false;
         }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
index d4db751..64c3cf8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
@@ -18,9 +18,7 @@
  */
 package org.apache.asterix.optimizer.rules.pushdown.processor;
 
-import static org.apache.asterix.metadata.utils.PushdownUtil.RANGE_FILTER_PUSHABLE_FUNCTIONS;
-import static org.apache.asterix.metadata.utils.PushdownUtil.isNestedFunction;
-import static org.apache.asterix.metadata.utils.PushdownUtil.isTypeFunction;
+import static org.apache.asterix.metadata.utils.PushdownUtil.isProhibitedFilterFunction;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -103,9 +101,9 @@
     }
 
     @Override
-    protected boolean isPushable(AbstractFunctionCallExpression expression) {
+    protected boolean isNotPushable(AbstractFunctionCallExpression expression) {
         FunctionIdentifier fid = expression.getFunctionIdentifier();
-        return RANGE_FILTER_PUSHABLE_FUNCTIONS.contains(fid) || !isNestedFunction(fid) && !isTypeFunction(fid);
+        return isProhibitedFilterFunction(expression);
     }
 
     @Override
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnRangeFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnRangeFilterPushdownProcessor.java
index b56c550..e61a18f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnRangeFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnRangeFilterPushdownProcessor.java
@@ -78,8 +78,8 @@
     }
 
     @Override
-    protected boolean isPushable(AbstractFunctionCallExpression expression) {
-        return RANGE_FILTER_PUSHABLE_FUNCTIONS.contains(expression.getFunctionIdentifier());
+    protected boolean isNotPushable(AbstractFunctionCallExpression expression) {
+        return !RANGE_FILTER_PUSHABLE_FUNCTIONS.contains(expression.getFunctionIdentifier());
     }
 
     @Override
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
index 96b252a..1007313 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
@@ -67,9 +67,9 @@
     }
 
     @Override
-    protected boolean isPushable(AbstractFunctionCallExpression expression) {
+    protected boolean isNotPushable(AbstractFunctionCallExpression expression) {
         FunctionIdentifier fid = expression.getFunctionIdentifier();
-        return !ARRAY_FUNCTIONS.contains(fid) && super.isPushable(expression);
+        return ARRAY_FUNCTIONS.contains(fid) || super.isNotPushable(expression);
     }
 
     @Override
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/FilterExpressionInlineVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/FilterExpressionInlineVisitor.java
new file mode 100644
index 0000000..56ab226
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/FilterExpressionInlineVisitor.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.visitor;
+
+import static org.apache.asterix.metadata.utils.PushdownUtil.getArrayConstantFromScanCollection;
+import static org.apache.asterix.metadata.utils.PushdownUtil.isSupportedFilterAggregateFunction;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.annotations.ExistsComparisonExpressionAnnotation;
+import org.apache.asterix.om.base.AOrderedList;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.DefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.UseDescriptor;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+
+public class FilterExpressionInlineVisitor
+        implements ILogicalExpressionVisitor<ILogicalExpression, Map<ILogicalOperator, List<UseDescriptor>>> {
+
+    private final PushdownContext pushdownContext;
+    private final IOptimizationContext context;
+    private final Map<ILogicalOperator, ILogicalExpression> inlinedCache;
+
+    public FilterExpressionInlineVisitor(PushdownContext pushdownContext, IOptimizationContext context) {
+        this.pushdownContext = pushdownContext;
+        this.context = context;
+        inlinedCache = new HashMap<>();
+    }
+
+    public ILogicalExpression cloneAndInline(UseDescriptor useDescriptor,
+            Map<ILogicalOperator, List<UseDescriptor>> subplanSelects) throws AlgebricksException {
+        ILogicalOperator op = useDescriptor.getOperator();
+        ILogicalExpression inlinedExpr = inlinedCache.get(op);
+        if (inlinedExpr == null) {
+            inlinedExpr = useDescriptor.getExpression().accept(this, subplanSelects);
+            inlinedCache.put(op, inlinedExpr);
+        }
+
+        // Clone the cached expression as a processor may change it
+        return inlinedExpr.cloneExpression();
+    }
+
+    @Override
+    public ILogicalExpression visitConstantExpression(ConstantExpression expr,
+            Map<ILogicalOperator, List<UseDescriptor>> subplanSelects) throws AlgebricksException {
+        return expr;
+    }
+
+    @Override
+    public ILogicalExpression visitVariableReferenceExpression(VariableReferenceExpression expr,
+            Map<ILogicalOperator, List<UseDescriptor>> subplanSelects) throws AlgebricksException {
+        LogicalVariable variable = expr.getVariableReference();
+        DefineDescriptor defineDescriptor = pushdownContext.getDefineDescriptor(variable);
+        if (defineDescriptor == null || defineDescriptor.isScanDefinition()) {
+            // Reached un-filterable source variable (e.g., originated from an internal dataset in row format)
+            // or filterable source recordVariable (e.g., columnar dataset or external dataset with prefix)
+            return expr.cloneExpression();
+        }
+
+        ILogicalOperator subplanOp = defineDescriptor.getSubplanOperator();
+        ILogicalExpression defExpr = defineDescriptor.getExpression();
+        if (subplanOp != null && subplanSelects.containsKey(subplanOp) && isSupportedFilterAggregateFunction(defExpr)) {
+            List<UseDescriptor> selects = subplanSelects.get(subplanOp);
+            return visitSubplanSelects(selects, subplanSelects);
+        }
+
+        return defineDescriptor.getExpression().accept(this, subplanSelects);
+    }
+
+    @Override
+    public ILogicalExpression visitAggregateFunctionCallExpression(AggregateFunctionCallExpression expr,
+            Map<ILogicalOperator, List<UseDescriptor>> subplanSelects) throws AlgebricksException {
+        return cloneAndInlineFunction(expr, subplanSelects);
+    }
+
+    @Override
+    public ILogicalExpression visitScalarFunctionCallExpression(ScalarFunctionCallExpression expr,
+            Map<ILogicalOperator, List<UseDescriptor>> subplanSelects) throws AlgebricksException {
+        ILogicalExpression inlinable = getInlinableExpression(expr);
+        if (inlinable.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+            return inlinable.accept(this, subplanSelects);
+        }
+        return cloneAndInlineFunction(expr, subplanSelects);
+    }
+
+    @Override
+    public ILogicalExpression visitStatefulFunctionCallExpression(StatefulFunctionCallExpression expr,
+            Map<ILogicalOperator, List<UseDescriptor>> subplanSelects) throws AlgebricksException {
+        return cloneAndInlineFunction(expr, subplanSelects);
+    }
+
+    @Override
+    public ILogicalExpression visitUnnestingFunctionCallExpression(UnnestingFunctionCallExpression expr,
+            Map<ILogicalOperator, List<UseDescriptor>> subplanSelects) throws AlgebricksException {
+        return cloneAndInlineFunction(expr, subplanSelects);
+    }
+
+    private ILogicalExpression cloneAndInlineFunction(AbstractFunctionCallExpression funcExpr,
+            Map<ILogicalOperator, List<UseDescriptor>> subplanSelects) throws AlgebricksException {
+        AbstractFunctionCallExpression cloned = (AbstractFunctionCallExpression) funcExpr.cloneExpression();
+        for (Mutable<ILogicalExpression> arg : cloned.getArguments()) {
+            arg.setValue(arg.getValue().accept(this, subplanSelects));
+        }
+        return convertToOr(cloned, context);
+    }
+
+    /**
+     * @param expression current scalar function
+     * @return if annotated with {@link ExistsComparisonExpressionAnnotation} then return the count variable expression
+     * or return the same function
+     */
+    private ILogicalExpression getInlinableExpression(ScalarFunctionCallExpression expression) {
+        ScalarFunctionCallExpression funcExpr = expression.cloneExpression();
+        IExpressionAnnotation existsAnnotation = funcExpr.getAnnotation(ExistsComparisonExpressionAnnotation.class);
+        if (existsAnnotation != null) {
+            for (Mutable<ILogicalExpression> argRef : funcExpr.getArguments()) {
+                // Get the variable expression, which is a result from an aggregate
+                ILogicalExpression arg = argRef.getValue();
+                if (arg.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                    return arg;
+                }
+            }
+        }
+        return funcExpr;
+    }
+
+    private boolean notContainsZeroConstant(AbstractFunctionCallExpression funcExpr) {
+        for (Mutable<ILogicalExpression> arg : funcExpr.getArguments()) {
+            Long argValue = ConstantExpressionUtil.getLongConstant(arg.getValue());
+            if (argValue != null && argValue == 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * Converts eq(scan-collection(array: [a, b, c...]), expr) to or(eq(a, expr), eq(b, expr), eq(c, expr), ...)
+     *
+     * @param expression a function expression
+     * @return a converted expression if applicable
+     */
+    private static ILogicalExpression convertToOr(AbstractFunctionCallExpression expression,
+            IOptimizationContext context) {
+        if (!BuiltinFunctions.EQ.equals(expression.getFunctionIdentifier())) {
+            return expression;
+        }
+        ILogicalExpression left = expression.getArguments().get(0).getValue();
+        ILogicalExpression right = expression.getArguments().get(1).getValue();
+
+        ILogicalExpression valueExpr = left;
+        AOrderedList constArray = getArrayConstantFromScanCollection(right);
+        if (constArray == null) {
+            valueExpr = right;
+            constArray = getArrayConstantFromScanCollection(left);
+        }
+
+        if (constArray == null) {
+            return expression;
+        }
+
+        IFunctionInfo orInfo = context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.OR);
+        List<Mutable<ILogicalExpression>> orArgs = new ArrayList<>();
+        AbstractFunctionCallExpression orExpr = new ScalarFunctionCallExpression(orInfo, orArgs);
+
+        IFunctionInfo eqInfo = context.getMetadataProvider().lookupFunction(AlgebricksBuiltinFunctions.EQ);
+        for (int i = 0; i < constArray.size(); i++) {
+            List<Mutable<ILogicalExpression>> eqArgs = new ArrayList<>(2);
+            eqArgs.add(new MutableObject<>(valueExpr));
+            eqArgs.add(new MutableObject<>(new ConstantExpression(new AsterixConstantValue(constArray.getItem(i)))));
+
+            orArgs.add(new MutableObject<>(new ScalarFunctionCallExpression(eqInfo, eqArgs)));
+        }
+
+        return orExpr;
+    }
+
+    private ILogicalExpression visitSubplanSelects(List<UseDescriptor> useDescriptors,
+            Map<ILogicalOperator, List<UseDescriptor>> subplanSelects) throws AlgebricksException {
+        if (useDescriptors.size() == 1) {
+            // A single select exists in the subplan. Inline and clone.
+            return useDescriptors.get(0).getExpression().accept(this, subplanSelects);
+        }
+
+        // Multiple selects exist in the subplan, inline each then add all inlined expression as a single AND
+        List<Mutable<ILogicalExpression>> andArgs = new ArrayList<>();
+        for (UseDescriptor useDescriptor : useDescriptors) {
+            ILogicalExpression inlined = useDescriptor.getExpression().accept(this, subplanSelects);
+            andArgs.add(new MutableObject<>(inlined));
+        }
+
+        IFunctionInfo fInfo = context.getMetadataProvider().lookupFunction(BuiltinFunctions.AND);
+        return new ScalarFunctionCallExpression(fInfo, andArgs);
+    }
+
+    private static FunctionIdentifier[] createExistsPattern() {
+        FunctionIdentifier[] pattern = new FunctionIdentifier[2];
+        pattern[0] = BuiltinFunctions.NEQ;
+        pattern[1] = BuiltinFunctions.COUNT;
+
+        return pattern;
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java
index 28309aa..c425d72 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/PushdownOperatorVisitor.java
@@ -363,12 +363,14 @@
                 && funcExpr.getArguments().get(0).getValue().getExpressionTag() == LogicalExpressionTag.CONSTANT;
     }
 
-    private void visitSubplans(List<ILogicalPlan> nestedPlans) throws AlgebricksException {
+    private void visitNestedPlans(ILogicalOperator op, List<ILogicalPlan> nestedPlans) throws AlgebricksException {
+        ILogicalOperator previousSubplanOp = pushdownContext.enterSubplan(op);
         for (ILogicalPlan plan : nestedPlans) {
             for (Mutable<ILogicalOperator> root : plan.getRoots()) {
                 root.getValue().accept(this, null);
             }
         }
+        pushdownContext.exitSubplan(previousSubplanOp);
     }
 
     /*
@@ -386,7 +388,7 @@
     @Override
     public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
         visitInputs(op);
-        visitSubplans(op.getNestedPlans());
+        visitNestedPlans(op, op.getNestedPlans());
         return null;
     }
 
@@ -410,7 +412,7 @@
     @Override
     public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
         visitInputs(op, op.getVariables());
-        visitSubplans(op.getNestedPlans());
+        visitNestedPlans(op, op.getNestedPlans());
         return null;
     }
 
@@ -556,7 +558,7 @@
     @Override
     public Void visitWindowOperator(WindowOperator op, Void arg) throws AlgebricksException {
         visitInputs(op);
-        visitSubplans(op.getNestedPlans());
+        visitNestedPlans(op, op.getNestedPlans());
         return null;
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 08561c0..56719e2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -33,6 +33,7 @@
 
 import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator;
 import org.apache.asterix.algebra.operators.CommitOperator;
+import org.apache.asterix.common.annotations.ExistsComparisonExpressionAnnotation;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -1553,8 +1554,6 @@
                         BuiltinFunctions.getBuiltinFunctionInfo(AlgebricksBuiltinFunctions.NOT), notArgs);
                 notExpr.setSourceLocation(sourceLoc);
                 s = new SelectOperator(new MutableObject<>(notExpr));
-                // Disable pushdowns
-                s.getAnnotations().put(OperatorAnnotations.DISALLOW_FILTER_PUSHDOWN_TO_SCAN, Boolean.TRUE);
                 s.getInputs().add(eo2.second);
                 s.setSourceLocation(sourceLoc);
                 fAgg = BuiltinFunctions.makeAggregateFunctionExpression(BuiltinFunctions.EMPTY_STREAM,
@@ -2198,8 +2197,6 @@
         NestedTupleSourceOperator ntsOp = new NestedTupleSourceOperator(new MutableObject<>(subplanOp));
         ntsOp.setSourceLocation(sourceLoc);
         SelectOperator select = new SelectOperator(selectExpr);
-        // Disable pushdowns
-        select.getAnnotations().put(OperatorAnnotations.DISALLOW_FILTER_PUSHDOWN_TO_SCAN, Boolean.TRUE);
         // The select operator cannot be moved up and down, otherwise it will cause
         // typing issues (ASTERIXDB-1203).
         OperatorPropertiesUtil.markMovable(select, false);
@@ -2233,6 +2230,10 @@
         count.setSourceLocation(sourceLoc);
         AbstractFunctionCallExpression comparison = new ScalarFunctionCallExpression(
                 FunctionUtil.getFunctionInfo(not ? BuiltinFunctions.EQ : BuiltinFunctions.NEQ));
+        if (!not) {
+            // Indicate this comparison is for EXISTS
+            comparison.putAnnotation(ExistsComparisonExpressionAnnotation.INSTANCE);
+        }
         ConstantExpression eZero = new ConstantExpression(new AsterixConstantValue(new AInt64(0L)));
         eZero.setSourceLocation(sourceLoc);
         comparison.getArguments().add(new MutableObject<>(count));
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppAnalyzedExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppAnalyzedExecutionTest.java
index 2f3007d..2e32954 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppAnalyzedExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppAnalyzedExecutionTest.java
@@ -45,7 +45,8 @@
     protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/cc-analyze.conf";
     private final String[] denyList = { "synonym: synonym-01", "ddl: analyze-dataset-1", "misc: dump_index",
             "array-index: composite-index-queries", "filters: upsert", "column: analyze-dataset",
-            "column: filter/boolean", "ddl: analyze-dataset-with-indexes", "warnings: cardinality-hint-warning" };
+            "column: filter/boolean", "column: filter/sql-compat"
+            /*re-enable once the cbo stats class cast bug is fixed*/, "ddl: analyze-dataset-with-indexes", "warnings: cardinality-hint-warning" };
 
     @BeforeClass
     public static void setUp() throws Exception {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.000.ddl.sqlpp
new file mode 100644
index 0000000..226b2e2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.000.ddl.sqlpp
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE DATASET RowDataset
+PRIMARY KEY (`key`: string) WITH {
+    "storage-format": {"format": "row"}
+};
+
+CREATE VIEW `RowView`(
+ `key` STRING,
+ `num0` DOUBLE,
+ `num1` DOUBLE,
+ `num2` DOUBLE,
+ `num3` DOUBLE,
+ `num4` DOUBLE,
+ `str0` STRING,
+ `str1` STRING,
+ `str2` STRING,
+ `str3` STRING,
+ `int0` BIGINT,
+ `int1` BIGINT,
+ `int2` BIGINT,
+ `int3` BIGINT,
+ `bool0` BOOLEAN,
+ `bool1` BOOLEAN,
+ `bool2` BOOLEAN,
+ `bool3` BOOLEAN,
+ `date0` DATE,
+ `date1` DATE,
+ `date2` DATE,
+ `date3` DATE,
+ `time0` DATETIME,
+ `time1` TIME,
+ `datetime0` DATETIME,
+ `datetime1` STRING,
+ `zzz` STRING
+) DEFAULT NULL AS RowDataset;
+
+CREATE DATASET ColumnDataset
+PRIMARY KEY (`key`: string) WITH {
+    "storage-format": {"format": "column"}
+};
+
+CREATE VIEW `ColumnView`(
+ `key` STRING,
+ `num0` DOUBLE,
+ `num1` DOUBLE,
+ `num2` DOUBLE,
+ `num3` DOUBLE,
+ `num4` DOUBLE,
+ `str0` STRING,
+ `str1` STRING,
+ `str2` STRING,
+ `str3` STRING,
+ `int0` BIGINT,
+ `int1` BIGINT,
+ `int2` BIGINT,
+ `int3` BIGINT,
+ `bool0` BOOLEAN,
+ `bool1` BOOLEAN,
+ `bool2` BOOLEAN,
+ `bool3` BOOLEAN,
+ `date0` DATE,
+ `date1` DATE,
+ `date2` DATE,
+ `date3` DATE,
+ `time0` DATETIME,
+ `time1` TIME,
+ `datetime0` DATETIME,
+ `datetime1` STRING,
+ `zzz` STRING
+) DEFAULT NULL AS ColumnDataset;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.001.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.001.update.sqlpp
new file mode 100644
index 0000000..8e4c4cb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.001.update.sqlpp
@@ -0,0 +1,1050 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+INSERT INTO RowDataset (
+[
+  {
+
+      "int0": null,
+      "bool3": null,
+      "time1": "02:05:25",
+      "bool2": false,
+      "int2": -4,
+      "int1": -6,
+      "str3": "e",
+      "int3": 13,
+      "str1": "CLOCKS",
+      "str2": "two",
+      "time0": "1900-01-01T13:48:48",
+      "num1": "6.71",
+      "datetime0": "2004-07-26T12:30:34",
+      "num0": "-12.3",
+      "datetime1": null,
+      "num4": "10.85",
+      "key": "key01",
+      "num3": "-9.31",
+      "bool1": true,
+      "num2": "16.73",
+      "bool0": false,
+      "str0": "FURNITURE",
+      "date3": null,
+      "date2": "1995-09-03",
+      "date1": "2004-04-02",
+      "date0": "1972-07-04",
+      "zzz": "b"
+  },
+  {
+
+      "int0": null,
+      "bool3": true,
+      "time1": "04:48:07",
+      "bool2": true,
+      "int2": 0,
+      "int1": null,
+      "str3": null,
+      "int3": 11,
+      "str1": "CORDED KEYBOARDS",
+      "str2": null,
+      "time0": "1900-01-01T13:53:46",
+      "num1": "12.05",
+      "datetime0": "2004-07-17T14:01:56",
+      "num0": null,
+      "datetime1": null,
+      "num4": "3.38",
+      "key": "key12",
+      "num3": "-6.62",
+      "bool1": false,
+      "num2": null,
+      "bool0": null,
+      "str0": "TECHNOLOGY",
+      "date3": null,
+      "date2": "2001-02-04",
+      "date1": "2004-04-13",
+      "date0": null,
+      "zzz": "m"
+  },
+  {
+
+      "int0": 11,
+      "bool3": null,
+      "time1": "18:58:41",
+      "bool2": true,
+      "int2": -8,
+      "int1": null,
+      "str3": "e",
+      "int3": 18,
+      "str1": "DOT MATRIX PRINTERS",
+      "str2": "fifteen",
+      "time0": "1899-12-30T22:42:43",
+      "num1": "7.1",
+      "datetime0": "2004-07-31T11:57:52",
+      "num0": null,
+      "datetime1": null,
+      "num4": "-14.21",
+      "key": "key14",
+      "num3": "6.84",
+      "bool1": false,
+      "num2": null,
+      "bool0": true,
+      "str0": "TECHNOLOGY",
+      "date3": "1986-11-08",
+      "date2": "1972-07-12",
+      "date1": "2004-04-15",
+      "date0": null,
+      "zzz": "o"
+  },
+  {
+
+      "int0": 4,
+      "bool3": true,
+      "time1": null,
+      "bool2": false,
+      "int2": -9,
+      "int1": null,
+      "str3": "e",
+      "int3": 11,
+      "str1": "DVD",
+      "str2": "sixteen",
+      "time0": "1899-12-30T22:24:08",
+      "num1": "16.81",
+      "datetime0": "2004-07-14T07:43:00",
+      "num0": null,
+      "datetime1": null,
+      "num4": "6.75",
+      "key": "key15",
+      "num3": "-10.98",
+      "bool1": null,
+      "num2": "10.98",
+      "bool0": false,
+      "str0": "TECHNOLOGY",
+      "date3": null,
+      "date2": "1995-06-04",
+      "date1": "2004-04-16",
+      "date0": null,
+      "zzz": "p"
+  },
+  {
+
+      "int0": null,
+      "bool3": null,
+      "time1": "09:33:31",
+      "bool2": false,
+      "int2": 5,
+      "int1": null,
+      "str3": "e",
+      "int3": 2,
+      "str1": "AIR PURIFIERS",
+      "str2": "three",
+      "time0": "1900-01-01T18:21:08",
+      "num1": "9.78",
+      "datetime0": "2004-08-02T07:59:23",
+      "num0": "15.7",
+      "datetime1": null,
+      "num4": "-13.47",
+      "key": "key02",
+      "num3": "-12.17",
+      "bool1": true,
+      "num2": null,
+      "bool0": null,
+      "str0": "OFFICE SUPPLIES",
+      "date3": "1997-02-02",
+      "date2": "1997-09-19",
+      "date1": "2004-04-03",
+      "date0": "1975-11-12",
+      "zzz": "c"
+  },
+  {
+
+      "int0": null,
+      "bool3": null,
+      "time1": "22:50:16",
+      "bool2": false,
+      "int2": -5,
+      "int1": -4,
+      "str3": "e",
+      "int3": 5,
+      "str1": "BINDER ACCESSORIES",
+      "str2": null,
+      "time0": "1900-01-01T18:51:48",
+      "num1": "7.43",
+      "datetime0": "2004-07-05T13:14:20",
+      "num0": "-15.7",
+      "datetime1": null,
+      "num4": "-6.05",
+      "key": "key03",
+      "num3": "-7.25",
+      "bool1": false,
+      "num2": "8.51",
+      "bool0": true,
+      "str0": "OFFICE SUPPLIES",
+      "date3": null,
+      "date2": "1980-07-26",
+      "date1": "2004-04-04",
+      "date0": "2004-06-04",
+      "zzz": "d"
+  },
+  {
+
+      "int0": 3,
+      "bool3": false,
+      "time1": "19:57:33",
+      "bool2": true,
+      "int2": 2,
+      "int1": null,
+      "str3": null,
+      "int3": 7,
+      "str1": "BINDING MACHINES",
+      "str2": "six",
+      "time0": "1900-01-01T08:59:39",
+      "num1": "9.38",
+      "datetime0": "2004-07-22T00:30:23",
+      "num0": "-3.5",
+      "datetime1": null,
+      "num4": "10.71",
+      "key": "key05",
+      "num3": "-19.96",
+      "bool1": false,
+      "num2": "8.98",
+      "bool0": null,
+      "str0": "OFFICE SUPPLIES",
+      "date3": "1979-04-01",
+      "date2": "1980-11-07",
+      "date1": "2004-04-06",
+      "date0": null,
+      "zzz": "f"
+  },
+  {
+
+      "int0": null,
+      "bool3": false,
+      "time1": "19:48:23",
+      "bool2": true,
+      "int2": 0,
+      "int1": 2,
+      "str3": "e",
+      "int3": 3,
+      "str1": "BUSINESS ENVELOPES",
+      "str2": "eight",
+      "time0": "1900-01-01T19:45:54",
+      "num1": "11.38",
+      "datetime0": "2004-07-12T17:30:16",
+      "num0": null,
+      "datetime1": null,
+      "num4": "-10.24",
+      "key": "key07",
+      "num3": "3.64",
+      "bool1": null,
+      "num2": "17.25",
+      "bool0": false,
+      "str0": "OFFICE SUPPLIES",
+      "date3": null,
+      "date2": "1974-05-03",
+      "date1": "2004-04-08",
+      "date0": null,
+      "zzz": "h"
+  },
+  {
+
+      "int0": null,
+      "bool3": false,
+      "time1": "22:20:14",
+      "bool2": false,
+      "int2": -6,
+      "int1": 3,
+      "str3": null,
+      "int3": 17,
+      "str1": "ANSWERING MACHINES",
+      "str2": "nine",
+      "time0": "1900-01-01T09:00:59",
+      "num1": "9.47",
+      "datetime0": "2004-07-04T22:49:28",
+      "num0": "10",
+      "datetime1": null,
+      "num4": "4.77",
+      "key": "key08",
+      "num3": "-13.38",
+      "bool1": null,
+      "num2": null,
+      "bool0": null,
+      "str0": "TECHNOLOGY",
+      "date3": "1983-05-22",
+      "date2": "1976-09-09",
+      "date1": "2004-04-09",
+      "date0": null,
+      "zzz": "i"
+  },
+  {
+
+      "int0": 1,
+      "bool3": true,
+      "time1": "19:36:22",
+      "bool2": false,
+      "int2": 5,
+      "int1": -3,
+      "str3": "e",
+      "int3": 8,
+      "str1": "CLAMP ON LAMPS",
+      "str2": "one",
+      "time0": "1899-12-30T21:07:32",
+      "num1": "8.42",
+      "datetime0": "2004-07-09T10:17:35",
+      "num0": "12.3",
+      "datetime1": null,
+      "num4": null,
+      "key": "key00",
+      "num3": "-11.52",
+      "bool1": true,
+      "num2": "17.86",
+      "bool0": true,
+      "str0": "FURNITURE",
+      "date3": "1986-03-20",
+      "date2": "1977-04-20",
+      "date1": "2004-04-01",
+      "date0": "2004-04-15",
+      "zzz": "a"
+  },
+  {
+
+      "int0": 4,
+      "bool3": null,
+      "time1": "00:05:57",
+      "bool2": false,
+      "int2": -3,
+      "int1": null,
+      "str3": "e",
+      "int3": 11,
+      "str1": "CD-R MEDIA",
+      "str2": "eleven",
+      "time0": "1900-01-01T01:31:32",
+      "num1": "10.32",
+      "datetime0": "2004-07-14T08:16:44",
+      "num0": null,
+      "datetime1": null,
+      "num4": "19.39",
+      "key": "key10",
+      "num3": "-4.79",
+      "bool1": true,
+      "num2": "6.8",
+      "bool0": true,
+      "str0": "TECHNOLOGY",
+      "date3": "1999-08-20",
+      "date2": "1974-03-17",
+      "date1": "2004-04-11",
+      "date0": null,
+      "zzz": "k"
+  },
+  {
+
+      "int0": 10,
+      "bool3": null,
+      "time1": "04:40:49",
+      "bool2": true,
+      "int2": -4,
+      "int1": -8,
+      "str3": null,
+      "int3": 2,
+      "str1": "CONFERENCE PHONES",
+      "str2": "twelve",
+      "time0": "1899-12-30T22:15:40",
+      "num1": "2.47",
+      "datetime0": "2004-07-25T15:22:26",
+      "num0": null,
+      "datetime1": null,
+      "num4": "3.82",
+      "key": "key11",
+      "num3": "-10.81",
+      "bool1": true,
+      "num2": "3.79",
+      "bool0": false,
+      "str0": "TECHNOLOGY",
+      "date3": null,
+      "date2": "1994-04-20",
+      "date1": "2004-04-12",
+      "date0": null,
+      "zzz": "l"
+  },
+  {
+
+      "int0": 4,
+      "bool3": true,
+      "time1": null,
+      "bool2": true,
+      "int2": 4,
+      "int1": null,
+      "str3": null,
+      "int3": 18,
+      "str1": "CORDLESS KEYBOARDS",
+      "str2": "fourteen",
+      "time0": "1900-01-01T04:57:51",
+      "num1": "10.37",
+      "datetime0": "2004-07-19T22:21:31",
+      "num0": null,
+      "datetime1": null,
+      "num4": null,
+      "key": "key13",
+      "num3": "-18.43",
+      "bool1": false,
+      "num2": "13.04",
+      "bool0": null,
+      "str0": "TECHNOLOGY",
+      "date3": "1996-05-13",
+      "date2": "1988-01-05",
+      "date1": "2004-04-14",
+      "date0": null,
+      "zzz": "n"
+  },
+  {
+
+      "int0": 8,
+      "bool3": null,
+      "time1": "12:33:57",
+      "bool2": false,
+      "int2": 6,
+      "int1": -9,
+      "str3": null,
+      "int3": 0,
+      "str1": "ERICSSON",
+      "str2": null,
+      "time0": "1900-01-01T11:58:29",
+      "num1": "7.12",
+      "datetime0": "2004-07-28T12:34:28",
+      "num0": null,
+      "datetime1": null,
+      "num4": null,
+      "key": "key16",
+      "num3": "-2.6",
+      "bool1": null,
+      "num2": "7.87",
+      "bool0": null,
+      "str0": "TECHNOLOGY",
+      "date3": "1992-01-18",
+      "date2": "2002-04-27",
+      "date1": "2004-04-17",
+      "date0": null,
+      "zzz": "q"
+  },
+  {
+
+      "int0": 7,
+      "bool3": true,
+      "time1": null,
+      "bool2": true,
+      "int2": 3,
+      "int1": null,
+      "str3": null,
+      "int3": 9,
+      "str1": "BINDER CLIPS",
+      "str2": "five",
+      "time0": "1900-01-01T15:01:19",
+      "num1": "9.05",
+      "datetime0": "2004-07-28T23:30:22",
+      "num0": "3.5",
+      "datetime1": null,
+      "num4": "8.32",
+      "key": "key04",
+      "num3": "12.93",
+      "bool1": false,
+      "num2": "6.46",
+      "bool0": false,
+      "str0": "OFFICE SUPPLIES",
+      "date3": "1996-03-07",
+      "date2": "1997-05-30",
+      "date1": "2004-04-05",
+      "date0": "2004-06-19",
+      "zzz": "e"
+  },
+  {
+
+      "int0": 8,
+      "bool3": null,
+      "time1": null,
+      "bool2": false,
+      "int2": 9,
+      "int1": null,
+      "str3": "e",
+      "int3": 18,
+      "str1": "BINDING SUPPLIES",
+      "str2": null,
+      "time0": "1900-01-01T07:37:48",
+      "num1": "16.42",
+      "datetime0": "2004-07-28T06:54:50",
+      "num0": "0",
+      "datetime1": null,
+      "num4": null,
+      "key": "key06",
+      "num3": "10.93",
+      "bool1": null,
+      "num2": "11.69",
+      "bool0": true,
+      "str0": "OFFICE SUPPLIES",
+      "date3": null,
+      "date2": "1977-02-08",
+      "date1": "2004-04-07",
+      "date0": null,
+      "zzz": "g"
+  },
+  {
+
+      "int0": 8,
+      "bool3": null,
+      "time1": null,
+      "bool2": false,
+      "int2": -9,
+      "int1": 3,
+      "str3": "e",
+      "int3": 2,
+      "str1": "BUSINESS COPIERS",
+      "str2": "ten",
+      "time0": "1900-01-01T20:36:00",
+      "num1": "12.4",
+      "datetime0": "2004-07-23T21:13:37",
+      "num0": null,
+      "datetime1": null,
+      "num4": null,
+      "key": "key09",
+      "num3": "-10.56",
+      "bool1": true,
+      "num2": "11.5",
+      "bool0": null,
+      "str0": "TECHNOLOGY",
+      "date3": null,
+      "date2": "1998-08-12",
+      "date1": "2004-04-10",
+      "date0": null,
+      "zzz": "j"
+    }
+]
+);
+
+INSERT INTO ColumnDataset (
+[
+  {
+
+      "int0": null,
+      "bool3": null,
+      "time1": "02:05:25",
+      "bool2": false,
+      "int2": -4,
+      "int1": -6,
+      "str3": "e",
+      "int3": 13,
+      "str1": "CLOCKS",
+      "str2": "two",
+      "time0": "1900-01-01T13:48:48",
+      "num1": "6.71",
+      "datetime0": "2004-07-26T12:30:34",
+      "num0": "-12.3",
+      "datetime1": null,
+      "num4": "10.85",
+      "key": "key01",
+      "num3": "-9.31",
+      "bool1": true,
+      "num2": "16.73",
+      "bool0": false,
+      "str0": "FURNITURE",
+      "date3": null,
+      "date2": "1995-09-03",
+      "date1": "2004-04-02",
+      "date0": "1972-07-04",
+      "zzz": "b"
+  },
+  {
+
+      "int0": null,
+      "bool3": true,
+      "time1": "04:48:07",
+      "bool2": true,
+      "int2": 0,
+      "int1": null,
+      "str3": null,
+      "int3": 11,
+      "str1": "CORDED KEYBOARDS",
+      "str2": null,
+      "time0": "1900-01-01T13:53:46",
+      "num1": "12.05",
+      "datetime0": "2004-07-17T14:01:56",
+      "num0": null,
+      "datetime1": null,
+      "num4": "3.38",
+      "key": "key12",
+      "num3": "-6.62",
+      "bool1": false,
+      "num2": null,
+      "bool0": null,
+      "str0": "TECHNOLOGY",
+      "date3": null,
+      "date2": "2001-02-04",
+      "date1": "2004-04-13",
+      "date0": null,
+      "zzz": "m"
+  },
+  {
+
+      "int0": 11,
+      "bool3": null,
+      "time1": "18:58:41",
+      "bool2": true,
+      "int2": -8,
+      "int1": null,
+      "str3": "e",
+      "int3": 18,
+      "str1": "DOT MATRIX PRINTERS",
+      "str2": "fifteen",
+      "time0": "1899-12-30T22:42:43",
+      "num1": "7.1",
+      "datetime0": "2004-07-31T11:57:52",
+      "num0": null,
+      "datetime1": null,
+      "num4": "-14.21",
+      "key": "key14",
+      "num3": "6.84",
+      "bool1": false,
+      "num2": null,
+      "bool0": true,
+      "str0": "TECHNOLOGY",
+      "date3": "1986-11-08",
+      "date2": "1972-07-12",
+      "date1": "2004-04-15",
+      "date0": null,
+      "zzz": "o"
+  },
+  {
+
+      "int0": 4,
+      "bool3": true,
+      "time1": null,
+      "bool2": false,
+      "int2": -9,
+      "int1": null,
+      "str3": "e",
+      "int3": 11,
+      "str1": "DVD",
+      "str2": "sixteen",
+      "time0": "1899-12-30T22:24:08",
+      "num1": "16.81",
+      "datetime0": "2004-07-14T07:43:00",
+      "num0": null,
+      "datetime1": null,
+      "num4": "6.75",
+      "key": "key15",
+      "num3": "-10.98",
+      "bool1": null,
+      "num2": "10.98",
+      "bool0": false,
+      "str0": "TECHNOLOGY",
+      "date3": null,
+      "date2": "1995-06-04",
+      "date1": "2004-04-16",
+      "date0": null,
+      "zzz": "p"
+  },
+  {
+
+      "int0": null,
+      "bool3": null,
+      "time1": "09:33:31",
+      "bool2": false,
+      "int2": 5,
+      "int1": null,
+      "str3": "e",
+      "int3": 2,
+      "str1": "AIR PURIFIERS",
+      "str2": "three",
+      "time0": "1900-01-01T18:21:08",
+      "num1": "9.78",
+      "datetime0": "2004-08-02T07:59:23",
+      "num0": "15.7",
+      "datetime1": null,
+      "num4": "-13.47",
+      "key": "key02",
+      "num3": "-12.17",
+      "bool1": true,
+      "num2": null,
+      "bool0": null,
+      "str0": "OFFICE SUPPLIES",
+      "date3": "1997-02-02",
+      "date2": "1997-09-19",
+      "date1": "2004-04-03",
+      "date0": "1975-11-12",
+      "zzz": "c"
+  },
+  {
+
+      "int0": null,
+      "bool3": null,
+      "time1": "22:50:16",
+      "bool2": false,
+      "int2": -5,
+      "int1": -4,
+      "str3": "e",
+      "int3": 5,
+      "str1": "BINDER ACCESSORIES",
+      "str2": null,
+      "time0": "1900-01-01T18:51:48",
+      "num1": "7.43",
+      "datetime0": "2004-07-05T13:14:20",
+      "num0": "-15.7",
+      "datetime1": null,
+      "num4": "-6.05",
+      "key": "key03",
+      "num3": "-7.25",
+      "bool1": false,
+      "num2": "8.51",
+      "bool0": true,
+      "str0": "OFFICE SUPPLIES",
+      "date3": null,
+      "date2": "1980-07-26",
+      "date1": "2004-04-04",
+      "date0": "2004-06-04",
+      "zzz": "d"
+  },
+  {
+
+      "int0": 3,
+      "bool3": false,
+      "time1": "19:57:33",
+      "bool2": true,
+      "int2": 2,
+      "int1": null,
+      "str3": null,
+      "int3": 7,
+      "str1": "BINDING MACHINES",
+      "str2": "six",
+      "time0": "1900-01-01T08:59:39",
+      "num1": "9.38",
+      "datetime0": "2004-07-22T00:30:23",
+      "num0": "-3.5",
+      "datetime1": null,
+      "num4": "10.71",
+      "key": "key05",
+      "num3": "-19.96",
+      "bool1": false,
+      "num2": "8.98",
+      "bool0": null,
+      "str0": "OFFICE SUPPLIES",
+      "date3": "1979-04-01",
+      "date2": "1980-11-07",
+      "date1": "2004-04-06",
+      "date0": null,
+      "zzz": "f"
+  },
+  {
+
+      "int0": null,
+      "bool3": false,
+      "time1": "19:48:23",
+      "bool2": true,
+      "int2": 0,
+      "int1": 2,
+      "str3": "e",
+      "int3": 3,
+      "str1": "BUSINESS ENVELOPES",
+      "str2": "eight",
+      "time0": "1900-01-01T19:45:54",
+      "num1": "11.38",
+      "datetime0": "2004-07-12T17:30:16",
+      "num0": null,
+      "datetime1": null,
+      "num4": "-10.24",
+      "key": "key07",
+      "num3": "3.64",
+      "bool1": null,
+      "num2": "17.25",
+      "bool0": false,
+      "str0": "OFFICE SUPPLIES",
+      "date3": null,
+      "date2": "1974-05-03",
+      "date1": "2004-04-08",
+      "date0": null,
+      "zzz": "h"
+  },
+  {
+
+      "int0": null,
+      "bool3": false,
+      "time1": "22:20:14",
+      "bool2": false,
+      "int2": -6,
+      "int1": 3,
+      "str3": null,
+      "int3": 17,
+      "str1": "ANSWERING MACHINES",
+      "str2": "nine",
+      "time0": "1900-01-01T09:00:59",
+      "num1": "9.47",
+      "datetime0": "2004-07-04T22:49:28",
+      "num0": "10",
+      "datetime1": null,
+      "num4": "4.77",
+      "key": "key08",
+      "num3": "-13.38",
+      "bool1": null,
+      "num2": null,
+      "bool0": null,
+      "str0": "TECHNOLOGY",
+      "date3": "1983-05-22",
+      "date2": "1976-09-09",
+      "date1": "2004-04-09",
+      "date0": null,
+      "zzz": "i"
+  },
+  {
+
+      "int0": 1,
+      "bool3": true,
+      "time1": "19:36:22",
+      "bool2": false,
+      "int2": 5,
+      "int1": -3,
+      "str3": "e",
+      "int3": 8,
+      "str1": "CLAMP ON LAMPS",
+      "str2": "one",
+      "time0": "1899-12-30T21:07:32",
+      "num1": "8.42",
+      "datetime0": "2004-07-09T10:17:35",
+      "num0": "12.3",
+      "datetime1": null,
+      "num4": null,
+      "key": "key00",
+      "num3": "-11.52",
+      "bool1": true,
+      "num2": "17.86",
+      "bool0": true,
+      "str0": "FURNITURE",
+      "date3": "1986-03-20",
+      "date2": "1977-04-20",
+      "date1": "2004-04-01",
+      "date0": "2004-04-15",
+      "zzz": "a"
+  },
+  {
+
+      "int0": 4,
+      "bool3": null,
+      "time1": "00:05:57",
+      "bool2": false,
+      "int2": -3,
+      "int1": null,
+      "str3": "e",
+      "int3": 11,
+      "str1": "CD-R MEDIA",
+      "str2": "eleven",
+      "time0": "1900-01-01T01:31:32",
+      "num1": "10.32",
+      "datetime0": "2004-07-14T08:16:44",
+      "num0": null,
+      "datetime1": null,
+      "num4": "19.39",
+      "key": "key10",
+      "num3": "-4.79",
+      "bool1": true,
+      "num2": "6.8",
+      "bool0": true,
+      "str0": "TECHNOLOGY",
+      "date3": "1999-08-20",
+      "date2": "1974-03-17",
+      "date1": "2004-04-11",
+      "date0": null,
+      "zzz": "k"
+  },
+  {
+
+      "int0": 10,
+      "bool3": null,
+      "time1": "04:40:49",
+      "bool2": true,
+      "int2": -4,
+      "int1": -8,
+      "str3": null,
+      "int3": 2,
+      "str1": "CONFERENCE PHONES",
+      "str2": "twelve",
+      "time0": "1899-12-30T22:15:40",
+      "num1": "2.47",
+      "datetime0": "2004-07-25T15:22:26",
+      "num0": null,
+      "datetime1": null,
+      "num4": "3.82",
+      "key": "key11",
+      "num3": "-10.81",
+      "bool1": true,
+      "num2": "3.79",
+      "bool0": false,
+      "str0": "TECHNOLOGY",
+      "date3": null,
+      "date2": "1994-04-20",
+      "date1": "2004-04-12",
+      "date0": null,
+      "zzz": "l"
+  },
+  {
+
+      "int0": 4,
+      "bool3": true,
+      "time1": null,
+      "bool2": true,
+      "int2": 4,
+      "int1": null,
+      "str3": null,
+      "int3": 18,
+      "str1": "CORDLESS KEYBOARDS",
+      "str2": "fourteen",
+      "time0": "1900-01-01T04:57:51",
+      "num1": "10.37",
+      "datetime0": "2004-07-19T22:21:31",
+      "num0": null,
+      "datetime1": null,
+      "num4": null,
+      "key": "key13",
+      "num3": "-18.43",
+      "bool1": false,
+      "num2": "13.04",
+      "bool0": null,
+      "str0": "TECHNOLOGY",
+      "date3": "1996-05-13",
+      "date2": "1988-01-05",
+      "date1": "2004-04-14",
+      "date0": null,
+      "zzz": "n"
+  },
+  {
+
+      "int0": 8,
+      "bool3": null,
+      "time1": "12:33:57",
+      "bool2": false,
+      "int2": 6,
+      "int1": -9,
+      "str3": null,
+      "int3": 0,
+      "str1": "ERICSSON",
+      "str2": null,
+      "time0": "1900-01-01T11:58:29",
+      "num1": "7.12",
+      "datetime0": "2004-07-28T12:34:28",
+      "num0": null,
+      "datetime1": null,
+      "num4": null,
+      "key": "key16",
+      "num3": "-2.6",
+      "bool1": null,
+      "num2": "7.87",
+      "bool0": null,
+      "str0": "TECHNOLOGY",
+      "date3": "1992-01-18",
+      "date2": "2002-04-27",
+      "date1": "2004-04-17",
+      "date0": null,
+      "zzz": "q"
+  },
+  {
+
+      "int0": 7,
+      "bool3": true,
+      "time1": null,
+      "bool2": true,
+      "int2": 3,
+      "int1": null,
+      "str3": null,
+      "int3": 9,
+      "str1": "BINDER CLIPS",
+      "str2": "five",
+      "time0": "1900-01-01T15:01:19",
+      "num1": "9.05",
+      "datetime0": "2004-07-28T23:30:22",
+      "num0": "3.5",
+      "datetime1": null,
+      "num4": "8.32",
+      "key": "key04",
+      "num3": "12.93",
+      "bool1": false,
+      "num2": "6.46",
+      "bool0": false,
+      "str0": "OFFICE SUPPLIES",
+      "date3": "1996-03-07",
+      "date2": "1997-05-30",
+      "date1": "2004-04-05",
+      "date0": "2004-06-19",
+      "zzz": "e"
+  },
+  {
+
+      "int0": 8,
+      "bool3": null,
+      "time1": null,
+      "bool2": false,
+      "int2": 9,
+      "int1": null,
+      "str3": "e",
+      "int3": 18,
+      "str1": "BINDING SUPPLIES",
+      "str2": null,
+      "time0": "1900-01-01T07:37:48",
+      "num1": "16.42",
+      "datetime0": "2004-07-28T06:54:50",
+      "num0": "0",
+      "datetime1": null,
+      "num4": null,
+      "key": "key06",
+      "num3": "10.93",
+      "bool1": null,
+      "num2": "11.69",
+      "bool0": true,
+      "str0": "OFFICE SUPPLIES",
+      "date3": null,
+      "date2": "1977-02-08",
+      "date1": "2004-04-07",
+      "date0": null,
+      "zzz": "g"
+  },
+  {
+
+      "int0": 8,
+      "bool3": null,
+      "time1": null,
+      "bool2": false,
+      "int2": -9,
+      "int1": 3,
+      "str3": "e",
+      "int3": 2,
+      "str1": "BUSINESS COPIERS",
+      "str2": "ten",
+      "time0": "1900-01-01T20:36:00",
+      "num1": "12.4",
+      "datetime0": "2004-07-23T21:13:37",
+      "num0": null,
+      "datetime1": null,
+      "num4": null,
+      "key": "key09",
+      "num3": "-10.56",
+      "bool1": true,
+      "num2": "11.5",
+      "bool0": null,
+      "str0": "TECHNOLOGY",
+      "date3": null,
+      "date2": "1998-08-12",
+      "date1": "2004-04-10",
+      "date0": null,
+      "zzz": "j"
+    }
+]
+);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.010.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.010.query.sqlpp
new file mode 100644
index 0000000..424222a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.010.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// requesttype=application/json
+// param sql-compat:json=true
+
+USE test;
+
+SET `compiler.column.filter` "false";
+
+
+
+SELECT v.str2 AS str2,
+       SUM(v.num3) AS `sum:num3:ok`
+FROM RowView v
+WHERE ((v.str2 IN ('sixteen')) OR (v.str2 IS UNKNOWN))
+GROUP BY v.str2
+ORDER BY v.str2;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.011.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.011.query.sqlpp
new file mode 100644
index 0000000..ed60c99
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.011.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// requesttype=application/json
+// param sql-compat:json=true
+
+USE test;
+
+SET `compiler.column.filter` "false";
+
+
+
+SELECT v.str2 AS str2,
+       SUM(v.num3) AS `sum:num3:ok`
+FROM ColumnView v
+WHERE ((v.str2 IN ('sixteen')) OR (v.str2 IS UNKNOWN))
+GROUP BY v.str2
+ORDER BY v.str2;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.012.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.012.query.sqlpp
new file mode 100644
index 0000000..086ef86
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.012.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// requesttype=application/json
+// param sql-compat:json=true
+
+USE test;
+
+SET `compiler.column.filter` "true";
+
+
+
+SELECT v.str2 AS str2,
+       SUM(v.num3) AS `sum:num3:ok`
+FROM ColumnView v
+WHERE ((v.str2 IN ('sixteen')) OR (v.str2 IS UNKNOWN))
+GROUP BY v.str2
+ORDER BY v.str2;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.013.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.013.query.sqlpp
new file mode 100644
index 0000000..ebeb200
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/sql-compat/sql-compat.013.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// requesttype=application/json
+// param sql-compat:json=true
+
+USE test;
+
+SET `compiler.column.filter` "true";
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT v.str2 AS str2,
+       SUM(v.num3) AS `sum:num3:ok`
+FROM ColumnView v
+WHERE ((v.str2 IN ('sixteen')) OR (v.str2 IS UNKNOWN))
+GROUP BY v.str2
+ORDER BY v.str2;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.000.ddl.sqlpp
new file mode 100644
index 0000000..3792376
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.000.ddl.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE ColumnType AS {
+    id: int
+};
+
+CREATE DATASET ColumnDataset(ColumnType)
+PRIMARY KEY id WITH {
+    "storage-format": {"format" : "column"}
+};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.001.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.001.update.sqlpp
new file mode 100644
index 0000000..a71f6d9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.001.update.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+LOAD DATASET ColumnDataset USING localfs
+(
+    ("path" = "asterix_nc1://data/hdfs/parquet/heterogeneous_1.json, asterix_nc1://data/hdfs/parquet/heterogeneous_2.json"),
+    ("format" = "json")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.010.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.010.query.sqlpp
new file mode 100644
index 0000000..b768fab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.010.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "false";
+
+
+
+SELECT VALUE COUNT(*)
+FROM ColumnDataset p
+WHERE p.arrayOrObject.text = "7"
+   OR (SOME ao IN p.arrayOrObject SATISFIES ao.text = "1")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.011.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.011.query.sqlpp
new file mode 100644
index 0000000..ba21ba6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.011.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "true";
+
+
+
+SELECT VALUE COUNT(*)
+FROM ColumnDataset p
+WHERE p.arrayOrObject.text = "7"
+   OR (SOME ao IN p.arrayOrObject SATISFIES ao.text = "1")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.012.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.012.query.sqlpp
new file mode 100644
index 0000000..471035e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.012.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "true";
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT VALUE COUNT(*)
+FROM ColumnDataset p
+WHERE p.arrayOrObject.text = "7"
+   OR (SOME ao IN p.arrayOrObject SATISFIES ao.text = "1")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.020.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.020.query.sqlpp
new file mode 100644
index 0000000..63ab2bd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.020.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "false";
+
+
+
+SELECT VALUE COUNT(*)
+FROM ColumnDataset p
+WHERE p.arrayOrObject.text = "7"
+   OR (EVERY ao IN p.arrayOrObject SATISFIES ao.text <= "2")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.021.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.021.query.sqlpp
new file mode 100644
index 0000000..f35eab0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.021.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "true";
+
+
+
+SELECT VALUE COUNT(*)
+FROM ColumnDataset p
+WHERE p.arrayOrObject.text = "7"
+   OR (EVERY ao IN p.arrayOrObject SATISFIES ao.text <= "2")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.022.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.022.query.sqlpp
new file mode 100644
index 0000000..3d95947
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.022.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "true";
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT VALUE COUNT(*)
+FROM ColumnDataset p
+WHERE p.arrayOrObject.text = "7"
+   OR (EVERY ao IN p.arrayOrObject SATISFIES ao.text <= "2")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.030.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.030.query.sqlpp
new file mode 100644
index 0000000..5d5ef0b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.030.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "false";
+
+
+
+SELECT VALUE COUNT(*)
+FROM ColumnDataset p
+WHERE p.arrayOrObject.text = "7"
+  AND EXISTS (
+                SELECT *
+                FROM ColumnDataset b2, b2.arrayOrObject ao
+                WHERE ao.text = "1"
+            )
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.031.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.031.query.sqlpp
new file mode 100644
index 0000000..559a597
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.031.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "true";
+
+
+
+SELECT VALUE COUNT(*)
+FROM ColumnDataset p
+WHERE p.arrayOrObject.text = "7"
+  AND EXISTS (
+                SELECT *
+                FROM ColumnDataset b2, b2.arrayOrObject ao
+                WHERE ao.text = "1"
+            )
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.032.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.032.query.sqlpp
new file mode 100644
index 0000000..3d41ef6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.032.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "true";
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT VALUE COUNT(*)
+FROM ColumnDataset p
+WHERE p.arrayOrObject.text = "7"
+  AND EXISTS (
+                SELECT *
+                FROM ColumnDataset b2, b2.arrayOrObject ao
+                WHERE ao.text = "1"
+            )
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.040.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.040.query.sqlpp
new file mode 100644
index 0000000..686c2a0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.040.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "false";
+
+
+
+SELECT VALUE COUNT(*)
+FROM ColumnDataset p
+WHERE p.arrayOrObject.text = "7"
+  OR EXISTS (
+                SELECT *
+                FROM ColumnDataset b2, b2.arrayOrObject ao
+                WHERE ao.text = "1"
+            )
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.041.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.041.query.sqlpp
new file mode 100644
index 0000000..2a1a011
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.041.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "true";
+
+
+
+SELECT VALUE COUNT(*)
+FROM ColumnDataset p
+WHERE p.arrayOrObject.text = "7"
+  OR EXISTS (
+                SELECT *
+                FROM ColumnDataset b2, b2.arrayOrObject ao
+                WHERE ao.text = "1"
+            )
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.042.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.042.query.sqlpp
new file mode 100644
index 0000000..7b30a12
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.042.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "true";
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT VALUE COUNT(*)
+FROM ColumnDataset p
+WHERE p.arrayOrObject.text = "7"
+  OR EXISTS (
+                SELECT *
+                FROM ColumnDataset b2, b2.arrayOrObject ao
+                WHERE ao.text = "1"
+            )
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.050.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.050.query.sqlpp
new file mode 100644
index 0000000..752e468
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.050.query.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "false";
+
+
+
+SELECT VALUE SUM(array_count(X))
+FROM ColumnDataset p
+WITH X AS (
+        SELECT VALUE 1
+        FROM p.arrayOrObject ao
+        -- The filter should NOT be pushed!
+        WHERE ao.text >= "1"
+          AND ao.text <= "2"
+)
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.051.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.051.query.sqlpp
new file mode 100644
index 0000000..d1b1ba4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.051.query.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "true";
+
+
+
+SELECT VALUE SUM(array_count(X))
+FROM ColumnDataset p
+WITH X AS (
+        SELECT VALUE 1
+        FROM p.arrayOrObject ao
+        -- The filter should NOT be pushed!
+        WHERE ao.text >= "1"
+          AND ao.text <= "2"
+)
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.052.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.052.query.sqlpp
new file mode 100644
index 0000000..c569cbf
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.052.query.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "true";
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT VALUE SUM(array_count(X))
+FROM ColumnDataset p
+WITH X AS (
+        SELECT VALUE 1
+        FROM p.arrayOrObject ao
+        -- The filter should NOT be pushed!
+        WHERE ao.text >= "1"
+          AND ao.text <= "2"
+)
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.060.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.060.query.sqlpp
new file mode 100644
index 0000000..0681ee7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.060.query.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "false";
+
+
+
+SELECT VALUE array_count(X)
+FROM ColumnDataset p
+WITH X AS (
+        SELECT VALUE 1
+        FROM ColumnDataset p2, p2.arrayOrObject ao
+        -- This filter can be pushed as we are scanning a dataset
+        WHERE ao.text >= "1"
+          AND ao.text <= "2"
+)
+WHERE p.arrayOrObject.text = "7"
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.061.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.061.query.sqlpp
new file mode 100644
index 0000000..51fb3d0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.061.query.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "true";
+
+
+
+SELECT VALUE array_count(X)
+FROM ColumnDataset p
+WITH X AS (
+        SELECT VALUE 1
+        FROM ColumnDataset p2, p2.arrayOrObject ao
+        -- This filter can be pushed as we are scanning a dataset
+        WHERE ao.text >= "1"
+          AND ao.text <= "2"
+)
+WHERE p.arrayOrObject.text = "7"
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.062.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.062.query.sqlpp
new file mode 100644
index 0000000..893ca2f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/subplan/subplan.062.query.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.column.filter` "true";
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT VALUE array_count(X)
+FROM ColumnDataset p
+WITH X AS (
+        SELECT VALUE 1
+        FROM ColumnDataset p2, p2.arrayOrObject ao
+        -- This filter can be pushed as we are scanning a dataset
+        WHERE ao.text >= "1"
+          AND ao.text <= "2"
+)
+WHERE p.arrayOrObject.text = "7"
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/sql-compat/sql-compat.010.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/sql-compat/sql-compat.010.adm
new file mode 100644
index 0000000..8836479
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/sql-compat/sql-compat.010.adm
@@ -0,0 +1,2 @@
+{ "str2": null, "sum:num3:ok": -5.540000000000001 }
+{ "str2": "sixteen", "sum:num3:ok": -10.98 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/sql-compat/sql-compat.011.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/sql-compat/sql-compat.011.adm
new file mode 100644
index 0000000..8836479
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/sql-compat/sql-compat.011.adm
@@ -0,0 +1,2 @@
+{ "str2": null, "sum:num3:ok": -5.540000000000001 }
+{ "str2": "sixteen", "sum:num3:ok": -10.98 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/sql-compat/sql-compat.012.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/sql-compat/sql-compat.012.adm
new file mode 100644
index 0000000..8836479
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/sql-compat/sql-compat.012.adm
@@ -0,0 +1,2 @@
+{ "str2": null, "sum:num3:ok": -5.540000000000001 }
+{ "str2": "sixteen", "sum:num3:ok": -10.98 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/sql-compat/sql-compat.013.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/sql-compat/sql-compat.013.plan
new file mode 100644
index 0000000..7c76122
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/sql-compat/sql-compat.013.plan
@@ -0,0 +1,63 @@
+distribute result [$$197] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$197]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$197] <- [{"str2": $$str2, "sum:num3:ok": $$201}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- SORT_MERGE_EXCHANGE [$$str2(ASC) ]  |PARTITIONED|
+          group by ([$$str2 := $$233]) decor ([]) {
+                    aggregate [$$201] <- [agg-global-sql-sum($$232)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- AGGREGATE  |LOCAL|
+                      nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                 } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- SORT_GROUP_BY[$$233]  |PARTITIONED|
+            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- HASH_PARTITION_EXCHANGE [$$233]  |PARTITIONED|
+              group by ([$$233 := $$231]) decor ([]) {
+                        aggregate [$$232] <- [agg-local-sql-sum($$195)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- AGGREGATE  |LOCAL|
+                          nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                     } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- SORT_GROUP_BY[$$231]  |PARTITIONED|
+                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  project ([$$195, $$231]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    assign [$$195] <- [double-default-null($$206)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ASSIGN  |PARTITIONED|
+                      project ([$$206, $$231]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        select (or($$184, is-unknown($$231))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_SELECT  |PARTITIONED|
+                          project ([$$206, $$231, $$184]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            subplan {
+                                      aggregate [$$184] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- AGGREGATE  |LOCAL|
+                                        select (eq(string-default-null($$210), "sixteen")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- STREAM_SELECT  |LOCAL|
+                                          nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                   } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- SUBPLAN  |PARTITIONED|
+                              assign [$$231] <- [string-default-null($$210)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ASSIGN  |PARTITIONED|
+                                project ([$$210, $$206]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  assign [$$210, $$206] <- [$#4.getField("str2"), $#4.getField("num3")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ASSIGN  |PARTITIONED|
+                                    project ([$#4]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- STREAM_PROJECT  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        data-scan []<-[$$200, $#4] <- test.ColumnDataset project ({str2:any,num3:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.010.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.010.adm
new file mode 100644
index 0000000..0cfbf08
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.010.adm
@@ -0,0 +1 @@
+2
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.011.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.011.adm
new file mode 100644
index 0000000..0cfbf08
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.011.adm
@@ -0,0 +1 @@
+2
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.012.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.012.plan
new file mode 100644
index 0000000..1449b29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.012.plan
@@ -0,0 +1,39 @@
+distribute result [$$53] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    aggregate [$$53] <- [agg-sql-sum($$58)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- AGGREGATE  |UNPARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        aggregate [$$58] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |PARTITIONED|
+          select (or(eq($$57.getField("text"), "7"), $$44)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STREAM_SELECT  |PARTITIONED|
+            subplan {
+                      aggregate [$$44] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- AGGREGATE  |LOCAL|
+                        select (eq($$55, "1")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_SELECT  |LOCAL|
+                          assign [$$55] <- [$$ao.getField("text")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |LOCAL|
+                            unnest $$ao <- scan-collection($$57) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- UNNEST  |LOCAL|
+                              nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                   } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- SUBPLAN  |PARTITIONED|
+              project ([$$57]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                assign [$$57] <- [$$p.getField("arrayOrObject")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- ASSIGN  |PARTITIONED|
+                  project ([$$p]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      data-scan []<-[$$52, $$p] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) filter on: or(eq($$p.getField("arrayOrObject").getField("text"), "7"), eq(scan-collection($$p.getField("arrayOrObject")).getField("text"), "1")) range-filter on: or(eq($$p.getField("arrayOrObject").getField("text"), "7"), eq(scan-collection($$p.getField("arrayOrObject")).getField("text"), "1")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- DATASOURCE_SCAN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.020.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.020.adm
new file mode 100644
index 0000000..b8626c4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.020.adm
@@ -0,0 +1 @@
+4
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.021.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.021.adm
new file mode 100644
index 0000000..b8626c4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.021.adm
@@ -0,0 +1 @@
+4
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.022.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.022.plan
new file mode 100644
index 0000000..8d3fbd4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.022.plan
@@ -0,0 +1,39 @@
+distribute result [$$53] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    aggregate [$$53] <- [agg-sql-sum($$58)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- AGGREGATE  |UNPARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        aggregate [$$58] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |PARTITIONED|
+          select (or(eq($$57.getField("text"), "7"), $$44)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STREAM_SELECT  |PARTITIONED|
+            subplan {
+                      aggregate [$$44] <- [empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- AGGREGATE  |LOCAL|
+                        select (not(if-missing-or-null(le($$55, "2"), false))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_SELECT  |LOCAL|
+                          assign [$$55] <- [$$ao.getField("text")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |LOCAL|
+                            unnest $$ao <- scan-collection($$57) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- UNNEST  |LOCAL|
+                              nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                   } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- SUBPLAN  |PARTITIONED|
+              project ([$$57]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                assign [$$57] <- [$$p.getField("arrayOrObject")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- ASSIGN  |PARTITIONED|
+                  project ([$$p]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      data-scan []<-[$$52, $$p] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- DATASOURCE_SCAN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.030.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.030.adm
new file mode 100644
index 0000000..d00491f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.030.adm
@@ -0,0 +1 @@
+1
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.031.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.031.adm
new file mode 100644
index 0000000..d00491f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.031.adm
@@ -0,0 +1 @@
+1
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.032.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.032.plan
new file mode 100644
index 0000000..d00429a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.032.plan
@@ -0,0 +1,78 @@
+distribute result [$$70] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    aggregate [$$70] <- [agg-sql-sum($$76)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- AGGREGATE  |UNPARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        aggregate [$$76] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |PARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            join (true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- NESTED_LOOP  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                project ([]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  select (eq($$p.getField("arrayOrObject").getField("text"), "7")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_SELECT  |PARTITIONED|
+                    project ([$$p]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      assign [$$p] <- [$$b2] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ASSIGN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- REPLICATE  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              project ([$$b2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  data-scan []<-[$$68, $$b2] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) filter on: or(eq($$b2.getField("arrayOrObject").getField("text"), "7"), eq(scan-collection($$b2.getField("arrayOrObject")).getField("text"), "1")) range-filter on: or(eq($$b2.getField("arrayOrObject").getField("text"), "7"), eq(scan-collection($$b2.getField("arrayOrObject")).getField("text"), "1")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                project ([]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |UNPARTITIONED|
+                  select (neq($$69, 0)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_SELECT  |UNPARTITIONED|
+                    aggregate [$$69] <- [agg-sum($$75)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- AGGREGATE  |UNPARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                        aggregate [$$75] <- [agg-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- AGGREGATE  |PARTITIONED|
+                          select (eq($$ao.getField("text"), "1")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- STREAM_SELECT  |PARTITIONED|
+                            project ([$$ao]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              unnest $$ao <- scan-collection($$72) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- UNNEST  |PARTITIONED|
+                                project ([$$72]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  assign [$$72] <- [$$b2.getField("arrayOrObject")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ASSIGN  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- REPLICATE  |PARTITIONED|
+                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          project ([$$b2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              data-scan []<-[$$68, $$b2] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) filter on: or(eq($$b2.getField("arrayOrObject").getField("text"), "7"), eq(scan-collection($$b2.getField("arrayOrObject")).getField("text"), "1")) range-filter on: or(eq($$b2.getField("arrayOrObject").getField("text"), "7"), eq(scan-collection($$b2.getField("arrayOrObject")).getField("text"), "1")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.040.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.040.adm
new file mode 100644
index 0000000..1e8b314
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.040.adm
@@ -0,0 +1 @@
+6
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.041.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.041.adm
new file mode 100644
index 0000000..1e8b314
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.041.adm
@@ -0,0 +1 @@
+6
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.042.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.042.plan
new file mode 100644
index 0000000..0d2823e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.042.plan
@@ -0,0 +1,74 @@
+distribute result [$$70] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    aggregate [$$70] <- [agg-sql-sum($$76)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- AGGREGATE  |UNPARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        aggregate [$$76] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |PARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            join (or(eq($$71, "7"), neq($$69, 0))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- NESTED_LOOP  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                project ([$$71]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$71] <- [$$p.getField("arrayOrObject").getField("text")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ASSIGN  |PARTITIONED|
+                    project ([$$p]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      assign [$$p] <- [$$b2] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ASSIGN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- REPLICATE  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              project ([$$b2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  data-scan []<-[$$68, $$b2] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                aggregate [$$69] <- [agg-sum($$75)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- AGGREGATE  |UNPARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                    aggregate [$$75] <- [agg-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- AGGREGATE  |PARTITIONED|
+                      select (eq($$ao.getField("text"), "1")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_SELECT  |PARTITIONED|
+                        project ([$$ao]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          unnest $$ao <- scan-collection($$72) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- UNNEST  |PARTITIONED|
+                            project ([$$72]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              assign [$$72] <- [$$b2.getField("arrayOrObject")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ASSIGN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- REPLICATE  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      project ([$$b2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          data-scan []<-[$$68, $$b2] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.050.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.050.adm
new file mode 100644
index 0000000..0cfbf08
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.050.adm
@@ -0,0 +1 @@
+2
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.051.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.051.adm
new file mode 100644
index 0000000..0cfbf08
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.051.adm
@@ -0,0 +1 @@
+2
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.052.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.052.plan
new file mode 100644
index 0000000..d70c8e9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.052.plan
@@ -0,0 +1,39 @@
+distribute result [$$67] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    aggregate [$$67] <- [agg-global-sql-sum($$71)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- AGGREGATE  |UNPARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        aggregate [$$71] <- [agg-local-sql-sum($$70)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |PARTITIONED|
+          project ([$$70]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STREAM_PROJECT  |PARTITIONED|
+            subplan {
+                      aggregate [$$70] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- AGGREGATE  |LOCAL|
+                        select (and(ge($$65, "1"), le($$65, "2"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_SELECT  |LOCAL|
+                          assign [$$65] <- [$$ao.getField("text")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |LOCAL|
+                            unnest $$ao <- scan-collection($$68) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- UNNEST  |LOCAL|
+                              nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                   } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- SUBPLAN  |PARTITIONED|
+              project ([$$68]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                assign [$$68] <- [$$p.getField("arrayOrObject")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- ASSIGN  |PARTITIONED|
+                  project ([$$p]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      data-scan []<-[$$66, $$p] <- test.ColumnDataset project ({arrayOrObject:[{text:any}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- DATASOURCE_SCAN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.060.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.060.adm
new file mode 100644
index 0000000..0cfbf08
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.060.adm
@@ -0,0 +1 @@
+2
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.061.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.061.adm
new file mode 100644
index 0000000..0cfbf08
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.061.adm
@@ -0,0 +1 @@
+2
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.062.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.062.plan
new file mode 100644
index 0000000..1c777bd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/subplan/subplan.062.plan
@@ -0,0 +1,70 @@
+distribute result [$$63] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    join (true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- NESTED_LOOP  |PARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        project ([]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          select (eq($$p.getField("arrayOrObject").getField("text"), "7")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STREAM_SELECT  |PARTITIONED|
+            project ([$$p]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- STREAM_PROJECT  |PARTITIONED|
+              assign [$$p] <- [$$p2] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ASSIGN  |PARTITIONED|
+                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- REPLICATE  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      project ([$$p2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$62, $$p2] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) filter on: or(eq($$p2.getField("arrayOrObject").getField("text"), "7"), and(ge(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "1"), le(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "2"))) range-filter on: or(eq($$p2.getField("arrayOrObject").getField("text"), "7"), and(ge(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "1"), le(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "2"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- BROADCAST_EXCHANGE  |PARTITIONED|
+        aggregate [$$63] <- [agg-sql-sum($$67)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$67] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- AGGREGATE  |PARTITIONED|
+              select (and(ge($$60, "1"), le($$60, "2"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_SELECT  |PARTITIONED|
+                project ([$$60]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$60] <- [$$ao.getField("text")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ASSIGN  |PARTITIONED|
+                    project ([$$ao]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      unnest $$ao <- scan-collection($$65) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- UNNEST  |PARTITIONED|
+                        project ([$$65]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          assign [$$65] <- [$$p2.getField("arrayOrObject")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              replicate [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- REPLICATE  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  project ([$$p2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      data-scan []<-[$$62, $$p2] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) filter on: or(eq($$p2.getField("arrayOrObject").getField("text"), "7"), and(ge(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "1"), le(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "2"))) range-filter on: or(eq($$p2.getField("arrayOrObject").getField("text"), "7"), and(ge(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "1"), le(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "2"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.012.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.012.plan
new file mode 100644
index 0000000..81a8ea1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.012.plan
@@ -0,0 +1,39 @@
+distribute result [$$53] [cardinality: 2.0, op-cost: 0.0, total-cost: 6.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 6.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    aggregate [$$53] <- [agg-sql-sum($$58)] [cardinality: 2.0, op-cost: 0.0, total-cost: 6.0]
+    -- AGGREGATE  |UNPARTITIONED|
+      exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 6.0]
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        aggregate [$$58] <- [agg-sql-count(1)] [cardinality: 2.0, op-cost: 0.0, total-cost: 6.0]
+        -- AGGREGATE  |PARTITIONED|
+          select (or(eq($$57.getField("text"), "7"), $$44)) [cardinality: 2.0, op-cost: 0.0, total-cost: 6.0]
+          -- STREAM_SELECT  |PARTITIONED|
+            subplan {
+                      aggregate [$$44] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- AGGREGATE  |LOCAL|
+                        select (eq($$55, "1")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_SELECT  |LOCAL|
+                          assign [$$55] <- [$$ao.getField("text")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |LOCAL|
+                            unnest $$ao <- scan-collection($$57) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- UNNEST  |LOCAL|
+                              nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                   } [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+            -- SUBPLAN  |PARTITIONED|
+              project ([$$57]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                assign [$$57] <- [$$p.getField("arrayOrObject")] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                -- ASSIGN  |PARTITIONED|
+                  project ([$$p]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    exchange [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      data-scan []<-[$$52, $$p] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) filter on: or(eq($$p.getField("arrayOrObject").getField("text"), "7"), eq(scan-collection($$p.getField("arrayOrObject")).getField("text"), "1")) range-filter on: or(eq($$p.getField("arrayOrObject").getField("text"), "7"), eq(scan-collection($$p.getField("arrayOrObject")).getField("text"), "1")) [cardinality: 6.0, op-cost: 6.0, total-cost: 6.0]
+                      -- DATASOURCE_SCAN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.022.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.022.plan
new file mode 100644
index 0000000..7f6ceee
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.022.plan
@@ -0,0 +1,39 @@
+distribute result [$$53] [cardinality: 4.0, op-cost: 0.0, total-cost: 6.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 4.0, op-cost: 0.0, total-cost: 6.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    aggregate [$$53] <- [agg-sql-sum($$58)] [cardinality: 4.0, op-cost: 0.0, total-cost: 6.0]
+    -- AGGREGATE  |UNPARTITIONED|
+      exchange [cardinality: 4.0, op-cost: 0.0, total-cost: 6.0]
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        aggregate [$$58] <- [agg-sql-count(1)] [cardinality: 4.0, op-cost: 0.0, total-cost: 6.0]
+        -- AGGREGATE  |PARTITIONED|
+          select (or(eq($$57.getField("text"), "7"), $$44)) [cardinality: 4.0, op-cost: 0.0, total-cost: 6.0]
+          -- STREAM_SELECT  |PARTITIONED|
+            subplan {
+                      aggregate [$$44] <- [empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- AGGREGATE  |LOCAL|
+                        select (not(if-missing-or-null(le($$55, "2"), false))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_SELECT  |LOCAL|
+                          assign [$$55] <- [$$ao.getField("text")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |LOCAL|
+                            unnest $$ao <- scan-collection($$57) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- UNNEST  |LOCAL|
+                              nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                   } [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+            -- SUBPLAN  |PARTITIONED|
+              project ([$$57]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                assign [$$57] <- [$$p.getField("arrayOrObject")] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                -- ASSIGN  |PARTITIONED|
+                  project ([$$p]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    exchange [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      data-scan []<-[$$52, $$p] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) [cardinality: 6.0, op-cost: 6.0, total-cost: 6.0]
+                      -- DATASOURCE_SCAN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.032.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.032.plan
new file mode 100644
index 0000000..c51972e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.032.plan
@@ -0,0 +1,78 @@
+distribute result [$$70] [cardinality: 2.1, op-cost: 0.0, total-cost: 17.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 17.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    aggregate [$$70] <- [agg-sql-sum($$76)] [cardinality: 2.1, op-cost: 0.0, total-cost: 17.0]
+    -- AGGREGATE  |UNPARTITIONED|
+      exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 17.0]
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        aggregate [$$76] <- [agg-sql-count(1)] [cardinality: 2.1, op-cost: 0.0, total-cost: 17.0]
+        -- AGGREGATE  |PARTITIONED|
+          exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 17.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            join (true) [cardinality: 2.1, op-cost: 1.0, total-cost: 17.0]
+            -- NESTED_LOOP  |PARTITIONED|
+              exchange [cardinality: 1.0, op-cost: 0.0, total-cost: 6.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                project ([]) [cardinality: 1.0, op-cost: 0.0, total-cost: 6.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  select (eq($$p.getField("arrayOrObject").getField("text"), "7")) [cardinality: 1.0, op-cost: 0.0, total-cost: 6.0]
+                  -- STREAM_SELECT  |PARTITIONED|
+                    project ([$$p]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      assign [$$p] <- [$$b2] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                      -- ASSIGN  |PARTITIONED|
+                        exchange [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          replicate [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                          -- REPLICATE  |PARTITIONED|
+                            exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              project ([$$b2]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  data-scan []<-[$$68, $$b2] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) filter on: or(eq($$b2.getField("arrayOrObject").getField("text"), "7"), eq(scan-collection($$b2.getField("arrayOrObject")).getField("text"), "1")) range-filter on: or(eq($$b2.getField("arrayOrObject").getField("text"), "7"), eq(scan-collection($$b2.getField("arrayOrObject")).getField("text"), "1")) [cardinality: 6.0, op-cost: 6.0, total-cost: 6.0]
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              exchange [cardinality: 1.0, op-cost: 4.0, total-cost: 10.0]
+              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                project ([]) [cardinality: 1.0, op-cost: 0.0, total-cost: 6.0]
+                -- STREAM_PROJECT  |UNPARTITIONED|
+                  select (neq($$69, 0)) [cardinality: 1.0, op-cost: 0.0, total-cost: 6.0]
+                  -- STREAM_SELECT  |UNPARTITIONED|
+                    aggregate [$$69] <- [agg-sum($$75)] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                    -- AGGREGATE  |UNPARTITIONED|
+                      exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                        aggregate [$$75] <- [agg-count(1)] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                        -- AGGREGATE  |PARTITIONED|
+                          select (eq($$ao.getField("text"), "1")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- STREAM_SELECT  |PARTITIONED|
+                            project ([$$ao]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              unnest $$ao <- scan-collection($$72) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                              -- UNNEST  |PARTITIONED|
+                                project ([$$72]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  assign [$$72] <- [$$b2.getField("arrayOrObject")] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                                  -- ASSIGN  |PARTITIONED|
+                                    exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      replicate [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                                      -- REPLICATE  |PARTITIONED|
+                                        exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          project ([$$b2]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              data-scan []<-[$$68, $$b2] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) filter on: or(eq($$b2.getField("arrayOrObject").getField("text"), "7"), eq(scan-collection($$b2.getField("arrayOrObject")).getField("text"), "1")) range-filter on: or(eq($$b2.getField("arrayOrObject").getField("text"), "7"), eq(scan-collection($$b2.getField("arrayOrObject")).getField("text"), "1")) [cardinality: 6.0, op-cost: 6.0, total-cost: 6.0]
+                                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.042.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.042.plan
new file mode 100644
index 0000000..9aa6fe5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.042.plan
@@ -0,0 +1,74 @@
+distribute result [$$70] [cardinality: 2.1, op-cost: 0.0, total-cost: 22.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 22.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    aggregate [$$70] <- [agg-sql-sum($$76)] [cardinality: 2.1, op-cost: 0.0, total-cost: 22.0]
+    -- AGGREGATE  |UNPARTITIONED|
+      exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 22.0]
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        aggregate [$$76] <- [agg-sql-count(1)] [cardinality: 2.1, op-cost: 0.0, total-cost: 22.0]
+        -- AGGREGATE  |PARTITIONED|
+          exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 22.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            join (or(eq($$71, "7"), neq($$69, 0))) [cardinality: 2.1, op-cost: 6.0, total-cost: 22.0]
+            -- NESTED_LOOP  |PARTITIONED|
+              exchange [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                project ([$$71]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$71] <- [$$p.getField("arrayOrObject").getField("text")] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                  -- ASSIGN  |PARTITIONED|
+                    project ([$$p]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      assign [$$p] <- [$$b2] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                      -- ASSIGN  |PARTITIONED|
+                        exchange [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          replicate [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                          -- REPLICATE  |PARTITIONED|
+                            exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              project ([$$b2]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                              -- STREAM_PROJECT  |PARTITIONED|
+                                exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  data-scan []<-[$$68, $$b2] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) [cardinality: 6.0, op-cost: 6.0, total-cost: 6.0]
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+              exchange [cardinality: 1.0, op-cost: 4.0, total-cost: 10.0]
+              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                aggregate [$$69] <- [agg-sum($$75)] [cardinality: 1.0, op-cost: 0.0, total-cost: 6.0]
+                -- AGGREGATE  |UNPARTITIONED|
+                  exchange [cardinality: 1.0, op-cost: 4.0, total-cost: 10.0]
+                  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                    aggregate [$$75] <- [agg-count(1)] [cardinality: 1.0, op-cost: 0.0, total-cost: 6.0]
+                    -- AGGREGATE  |PARTITIONED|
+                      select (eq($$ao.getField("text"), "1")) [cardinality: 1.0, op-cost: 0.0, total-cost: 6.0]
+                      -- STREAM_SELECT  |PARTITIONED|
+                        project ([$$ao]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          unnest $$ao <- scan-collection($$72) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                          -- UNNEST  |PARTITIONED|
+                            project ([$$72]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              assign [$$72] <- [$$b2.getField("arrayOrObject")] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                              -- ASSIGN  |PARTITIONED|
+                                exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  replicate [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                                  -- REPLICATE  |PARTITIONED|
+                                    exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      project ([$$b2]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          data-scan []<-[$$68, $$b2] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) [cardinality: 6.0, op-cost: 6.0, total-cost: 6.0]
+                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.052.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.052.plan
new file mode 100644
index 0000000..5840b49
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.052.plan
@@ -0,0 +1,39 @@
+distribute result [$$67] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    aggregate [$$67] <- [agg-global-sql-sum($$71)] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+    -- AGGREGATE  |UNPARTITIONED|
+      exchange [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        aggregate [$$71] <- [agg-local-sql-sum($$70)] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+        -- AGGREGATE  |PARTITIONED|
+          project ([$$70]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+          -- STREAM_PROJECT  |PARTITIONED|
+            subplan {
+                      aggregate [$$70] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- AGGREGATE  |LOCAL|
+                        select (and(ge($$65, "1"), le($$65, "2"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_SELECT  |LOCAL|
+                          assign [$$65] <- [$$ao.getField("text")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |LOCAL|
+                            unnest $$ao <- scan-collection($$68) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- UNNEST  |LOCAL|
+                              nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                   } [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+            -- SUBPLAN  |PARTITIONED|
+              project ([$$68]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                assign [$$68] <- [$$p.getField("arrayOrObject")] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                -- ASSIGN  |PARTITIONED|
+                  project ([$$p]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    exchange [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      data-scan []<-[$$66, $$p] <- test.ColumnDataset project ({arrayOrObject:[{text:any}]}) [cardinality: 6.0, op-cost: 6.0, total-cost: 6.0]
+                      -- DATASOURCE_SCAN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.062.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.062.plan
new file mode 100644
index 0000000..0af0063
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/subplan/subplan.062.plan
@@ -0,0 +1,70 @@
+distribute result [$$63] [cardinality: 2.1, op-cost: 0.0, total-cost: 17.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 17.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    join (true) [cardinality: 2.1, op-cost: 1.0, total-cost: 17.0]
+    -- NESTED_LOOP  |PARTITIONED|
+      exchange [cardinality: 1.0, op-cost: 0.0, total-cost: 6.0]
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        project ([]) [cardinality: 1.0, op-cost: 0.0, total-cost: 6.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          select (eq($$p.getField("arrayOrObject").getField("text"), "7")) [cardinality: 1.0, op-cost: 0.0, total-cost: 6.0]
+          -- STREAM_SELECT  |PARTITIONED|
+            project ([$$p]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+            -- STREAM_PROJECT  |PARTITIONED|
+              assign [$$p] <- [$$p2] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+              -- ASSIGN  |PARTITIONED|
+                exchange [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  replicate [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                  -- REPLICATE  |PARTITIONED|
+                    exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      project ([$$p2]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$62, $$p2] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) filter on: or(eq($$p2.getField("arrayOrObject").getField("text"), "7"), and(ge(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "1"), le(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "2"))) range-filter on: or(eq($$p2.getField("arrayOrObject").getField("text"), "7"), and(ge(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "1"), le(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "2"))) [cardinality: 6.0, op-cost: 6.0, total-cost: 6.0]
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+      exchange [cardinality: 1.0, op-cost: 4.0, total-cost: 10.0]
+      -- BROADCAST_EXCHANGE  |PARTITIONED|
+        aggregate [$$63] <- [agg-sql-sum($$67)] [cardinality: 1.0, op-cost: 0.0, total-cost: 6.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 1.0, op-cost: 4.0, total-cost: 10.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$67] <- [agg-sql-count(1)] [cardinality: 1.0, op-cost: 0.0, total-cost: 6.0]
+            -- AGGREGATE  |PARTITIONED|
+              select (and(ge($$60, "1"), le($$60, "2"))) [cardinality: 1.0, op-cost: 0.0, total-cost: 6.0]
+              -- STREAM_SELECT  |PARTITIONED|
+                project ([$$60]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  assign [$$60] <- [$$ao.getField("text")] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                  -- ASSIGN  |PARTITIONED|
+                    project ([$$ao]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      unnest $$ao <- scan-collection($$65) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                      -- UNNEST  |PARTITIONED|
+                        project ([$$65]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          assign [$$65] <- [$$p2.getField("arrayOrObject")] [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                          -- ASSIGN  |PARTITIONED|
+                            exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              replicate [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                              -- REPLICATE  |PARTITIONED|
+                                exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  project ([$$p2]) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    exchange [cardinality: 6.0, op-cost: 4.0, total-cost: 10.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      data-scan []<-[$$62, $$p2] <- test.ColumnDataset project ({arrayOrObject:<[{text:any}],{text:any}>}) filter on: or(eq($$p2.getField("arrayOrObject").getField("text"), "7"), and(ge(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "1"), le(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "2"))) range-filter on: or(eq($$p2.getField("arrayOrObject").getField("text"), "7"), and(ge(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "1"), le(scan-collection($$p2.getField("arrayOrObject")).getField("text"), "2"))) [cardinality: 6.0, op-cost: 6.0, total-cost: 6.0]
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
index 110f95d..72f11b6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
@@ -16389,6 +16389,16 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="column">
+      <compilation-unit name="filter/sql-compat">
+        <output-dir compare="Text">filter/sql-compat</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="column">
+      <compilation-unit name="filter/subplan">
+        <output-dir compare="Text">filter/subplan</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="column">
       <compilation-unit name="big-object">
         <output-dir compare="Text">big-object</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/ExistsComparisonExpressionAnnotation.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/ExistsComparisonExpressionAnnotation.java
new file mode 100644
index 0000000..c57cb8a
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/annotations/ExistsComparisonExpressionAnnotation.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.annotations;
+
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
+
+/**
+ * An annotation to indicate an expression is a comparison for EXISTS/NOT EXISTS
+ */
+public class ExistsComparisonExpressionAnnotation implements IExpressionAnnotation {
+    public static final IExpressionAnnotation INSTANCE = new ExistsComparisonExpressionAnnotation();
+
+    private ExistsComparisonExpressionAnnotation() {
+    }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/PushdownUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/PushdownUtil.java
index ced2a25..b0cc50d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/PushdownUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/PushdownUtil.java
@@ -59,6 +59,10 @@
     public static final Set<FunctionIdentifier> FILTER_PUSHABLE_PATH_FUNCTIONS = createFilterPushablePathFunctions();
     public static final Set<FunctionIdentifier> COMPARE_FUNCTIONS = createCompareFunctions();
     public static final Set<FunctionIdentifier> RANGE_FILTER_PUSHABLE_FUNCTIONS = createRangeFilterPushableFunctions();
+    // Set of aggregate functions in a subplan that allows the SELECT in such subplan to be pushed (SOME and EXISTS)
+    public static final Set<FunctionIdentifier> FILTER_PUSHABLE_AGGREGATE_FUNCTIONS =
+            createFilterPushableAggregateFunctions();
+    public static final Set<FunctionIdentifier> FILTER_PROHIBITED_FUNCTIONS = createFilterProhibitedFunctions();
 
     private PushdownUtil() {
     }
@@ -172,6 +176,17 @@
         return arguments.stream().allMatch(arg -> arg.getValue().getExpressionTag() == LogicalExpressionTag.VARIABLE);
     }
 
+    public static boolean isSupportedFilterAggregateFunction(ILogicalExpression expression) {
+        FunctionIdentifier fid = getFunctionIdentifier(expression);
+        return fid != null && FILTER_PUSHABLE_AGGREGATE_FUNCTIONS.contains(fid);
+    }
+
+    public static boolean isProhibitedFilterFunction(ILogicalExpression expression) {
+        FunctionIdentifier fid = getFunctionIdentifier(expression);
+        return fid != null && !RANGE_FILTER_PUSHABLE_FUNCTIONS.contains(fid)
+                && (isNestedFunction(fid) || isTypeFunction(fid) || FILTER_PROHIBITED_FUNCTIONS.contains(fid));
+    }
+
     public static IAObject getConstant(ILogicalExpression expr) {
         IAlgebricksConstantValue algebricksConstant = ((ConstantExpression) expr).getValue();
         if (algebricksConstant.isTrue()) {
@@ -248,4 +263,16 @@
         pushableFunctions.add(AlgebricksBuiltinFunctions.NOT);
         return pushableFunctions;
     }
+
+    private static Set<FunctionIdentifier> createFilterPushableAggregateFunctions() {
+        Set<FunctionIdentifier> pushableFunctions = new HashSet<>();
+        pushableFunctions.add(BuiltinFunctions.NON_EMPTY_STREAM);
+        return pushableFunctions;
+    }
+
+    private static Set<FunctionIdentifier> createFilterProhibitedFunctions() {
+        Set<FunctionIdentifier> prohibitedFunctions = new HashSet<>();
+        prohibitedFunctions.add(BuiltinFunctions.EMPTY_STREAM);
+        return prohibitedFunctions;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java
index 7ca9dc7..ac038d2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java
@@ -35,13 +35,4 @@
     String OP_COST_LOCAL = "OP_COST";
     String OP_LEFT_EXCHANGE_COST = "LEFT_EXCHANGE_COST";
     String OP_RIGHT_EXCHANGE_COST = "RIGHT_EXCHANGE_COST";
-
-    /**
-     * An annotation to indicate that a SELECT predicate should not be pushed to data-scan.
-     * <p>
-     * Returns TRUE if it is disallowed, FALSE otherwise.
-     * <p>
-     * If the annotation is missing (i.e., {@code null}), it should mean FALSE.
-     */
-    String DISALLOW_FILTER_PUSHDOWN_TO_SCAN = "DISALLOW_FILTER_PUSHDOWN_TO_SCAN";
 }