[MULTIPLE ISSUES][COMP] Fix multiple pushdown issues

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- ASTERIXDB-3304: Some fields are not projected
- ASTERIXDB-3305: ClassCastException in left outer joins

Change-Id: I826ec12d598ef9fc38ded47f6f29e28ed75a047b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17933
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index 3cc19d3..f55c4a5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -88,6 +88,7 @@
          * supports value-access, filter, and/or range-filter.
          */
         run = shouldRun(context);
+        boolean changed = false;
         if (run) {
             // Context holds all the necessary information to perform pushdowns
             PushdownContext pushdownContext = new PushdownContext();
@@ -97,11 +98,11 @@
             // Execute several optimization passes to perform the pushdown
             PushdownProcessorsExecutor pushdownProcessorsExecutor = new PushdownProcessorsExecutor();
             addProcessors(pushdownProcessorsExecutor, pushdownContext, context);
-            pushdownProcessorsExecutor.execute();
+            changed = pushdownProcessorsExecutor.execute();
             pushdownProcessorsExecutor.finalizePushdown(pushdownContext, context);
             run = false;
         }
-        return false;
+        return changed;
     }
 
     private void addProcessors(PushdownProcessorsExecutor pushdownProcessorsExecutor, PushdownContext pushdownContext,
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
index 219ef9c..7d1e068 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
@@ -181,8 +181,9 @@
             case VARIABLE:
                 LogicalVariable variable = ((VariableReferenceExpression) expression).getVariableReference();
                 DefineDescriptor defineDescriptor = defineChain.get(variable);
-                if (defineDescriptor.isScanDefinition()) {
-                    // Reached the recordVariable
+                if (defineDescriptor == null || defineDescriptor.isScanDefinition()) {
+                    // Reached un-filterable source variable (e.g., originated from an internal dataset in row format)
+                    // or filterable source recordVariable (e.g., columnar dataset or external dataset with prefix)
                     return expression;
                 }
                 return cloneAndInline(defineDescriptor.getExpression(), context);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
index 957e988..01d3aeb 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
@@ -56,10 +56,12 @@
         processors.add(processor);
     }
 
-    public void execute() throws AlgebricksException {
+    public boolean execute() throws AlgebricksException {
+        boolean changed = false;
         for (IPushdownProcessor processor : processors) {
-            processor.process();
+            changed |= processor.process();
         }
+        return changed;
     }
 
     public void finalizePushdown(PushdownContext pushdownContext, IOptimizationContext context) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
index 6ca553a..9a88036 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
@@ -53,15 +53,17 @@
     }
 
     @Override
-    public final void process() throws AlgebricksException {
+    public final boolean process() throws AlgebricksException {
         List<ScanDefineDescriptor> scanDefineDescriptors = pushdownContext.getRegisteredScans();
+        boolean changed = false;
         for (ScanDefineDescriptor scanDefineDescriptor : scanDefineDescriptors) {
             if (skip(scanDefineDescriptor)) {
                 continue;
             }
             prepareScan(scanDefineDescriptor);
-            pushdownFilter(scanDefineDescriptor, scanDefineDescriptor);
+            changed |= pushdownFilter(scanDefineDescriptor, scanDefineDescriptor);
         }
+        return changed;
     }
 
     /**
@@ -119,9 +121,10 @@
     protected abstract void putFilterInformation(ScanDefineDescriptor scanDefineDescriptor,
             ILogicalExpression inlinedExpr) throws AlgebricksException;
 
-    private void pushdownFilter(DefineDescriptor defineDescriptor, ScanDefineDescriptor scanDefineDescriptor)
+    private boolean pushdownFilter(DefineDescriptor defineDescriptor, ScanDefineDescriptor scanDefineDescriptor)
             throws AlgebricksException {
         List<UseDescriptor> useDescriptors = pushdownContext.getUseDescriptors(defineDescriptor);
+        boolean changed = false;
         for (UseDescriptor useDescriptor : useDescriptors) {
             /*
              * Pushdown works only if the scope(use) and scope(scan) are the same, as we cannot pushdown when
@@ -132,18 +135,20 @@
                     && (useOperator.getOperatorTag() == LogicalOperatorTag.SELECT
                             || useOperator.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN)
                     && isPushdownAllowed(useOperator)) {
-                inlineAndPushdownFilter(useDescriptor, scanDefineDescriptor);
+                changed |= inlineAndPushdownFilter(useDescriptor, scanDefineDescriptor);
             } else if (useOperator.getOperatorTag() == LogicalOperatorTag.INNERJOIN) {
-                inlineAndPushdownFilter(useDescriptor, scanDefineDescriptor);
+                changed |= inlineAndPushdownFilter(useDescriptor, scanDefineDescriptor);
             }
         }
 
         for (UseDescriptor useDescriptor : useDescriptors) {
             DefineDescriptor nextDefineDescriptor = pushdownContext.getDefineDescriptor(useDescriptor);
             if (nextDefineDescriptor != null) {
-                pushdownFilter(nextDefineDescriptor, scanDefineDescriptor);
+                changed |= pushdownFilter(nextDefineDescriptor, scanDefineDescriptor);
             }
         }
+
+        return changed;
     }
 
     private boolean isPushdownAllowed(ILogicalOperator useOperator) {
@@ -152,24 +157,26 @@
         return disallowed == Boolean.FALSE;
     }
 
-    private void inlineAndPushdownFilter(UseDescriptor useDescriptor, ScanDefineDescriptor scanDefineDescriptor)
+    private boolean inlineAndPushdownFilter(UseDescriptor useDescriptor, ScanDefineDescriptor scanDefineDescriptor)
             throws AlgebricksException {
         ILogicalOperator selectOp = useDescriptor.getOperator();
         if (visitedOperators.contains(selectOp)) {
             // Skip and follow through to find any other selects that can be pushed down
-            return;
+            return false;
         }
-
+        boolean changed = false;
         // Get a clone of the SELECT expression and inline it
         ILogicalExpression inlinedExpr = pushdownContext.cloneAndInlineExpression(useDescriptor, context);
         // Prepare for pushdown
         preparePushdown(useDescriptor);
         if (pushdownFilterExpression(inlinedExpr)) {
             putFilterInformation(scanDefineDescriptor, inlinedExpr);
+            changed = true;
         }
 
         // Do not push down a select twice.
         visitedOperators.add(selectOp);
+        return changed;
     }
 
     protected final boolean pushdownFilterExpression(ILogicalExpression expression) throws AlgebricksException {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnValueAccessPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnValueAccessPushdownProcessor.java
index 0715dc4..9378cff 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnValueAccessPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnValueAccessPushdownProcessor.java
@@ -48,20 +48,25 @@
     }
 
     @Override
-    public void process() throws AlgebricksException {
+    public boolean process() throws AlgebricksException {
         List<ScanDefineDescriptor> scanDefineDescriptors = pushdownContext.getRegisteredScans();
+        boolean changed = false;
         for (ScanDefineDescriptor scanDefineDescriptor : scanDefineDescriptors) {
             if (!DatasetUtil.isFieldAccessPushdownSupported(scanDefineDescriptor.getDataset())) {
                 continue;
             }
             pushdownFieldAccessForDataset(scanDefineDescriptor);
-            scanDefineDescriptor
-                    .setRecordNode((RootExpectedSchemaNode) builder.getNode(scanDefineDescriptor.getVariable()));
+            RootExpectedSchemaNode root = (RootExpectedSchemaNode) builder.getNode(scanDefineDescriptor.getVariable());
+            scanDefineDescriptor.setRecordNode(root);
+            changed |= !root.isAllFields();
             if (scanDefineDescriptor.hasMeta()) {
-                scanDefineDescriptor.setMetaNode(
-                        (RootExpectedSchemaNode) builder.getNode(scanDefineDescriptor.getMetaRecordVariable()));
+                RootExpectedSchemaNode metaRoot =
+                        (RootExpectedSchemaNode) builder.getNode(scanDefineDescriptor.getMetaRecordVariable());
+                changed |= !metaRoot.isAllFields();
+                scanDefineDescriptor.setMetaNode(metaRoot);
             }
         }
+        return changed;
     }
 
     private void pushdownFieldAccessForDataset(ScanDefineDescriptor scanDefineDescriptor) throws AlgebricksException {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/IPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/IPushdownProcessor.java
index d7c7a40..6e6544c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/IPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/IPushdownProcessor.java
@@ -21,5 +21,5 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 
 public interface IPushdownProcessor {
-    void process() throws AlgebricksException;
+    boolean process() throws AlgebricksException;
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/InlineFilterExpressionsProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/InlineFilterExpressionsProcessor.java
index 6cec455..3939873 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/InlineFilterExpressionsProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/InlineFilterExpressionsProcessor.java
@@ -46,12 +46,14 @@
     }
 
     @Override
-    public void process() throws AlgebricksException {
+    public boolean process() throws AlgebricksException {
         List<ScanDefineDescriptor> scanDefineDescriptors = pushdownContext.getRegisteredScans();
         for (ScanDefineDescriptor scanDefineDescriptor : scanDefineDescriptors) {
             scanDefineDescriptor.setFilterExpression(inline(scanDefineDescriptor.getFilterExpression()));
             scanDefineDescriptor.setRangeFilterExpression(inline(scanDefineDescriptor.getRangeFilterExpression()));
         }
+        // Should always return false as this processor relies on filter pushdown
+        return false;
     }
 
     private ILogicalExpression inline(ILogicalExpression expression) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpressionValueAccessPushdownVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpressionValueAccessPushdownVisitor.java
index 15c37d3..45dfcff 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpressionValueAccessPushdownVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpressionValueAccessPushdownVisitor.java
@@ -18,11 +18,12 @@
  */
 package org.apache.asterix.optimizer.rules.pushdown.visitor;
 
-import static org.apache.asterix.metadata.utils.PushdownUtil.ALLOWED_FUNCTIONS;
 import static org.apache.asterix.metadata.utils.PushdownUtil.SUPPORTED_FUNCTIONS;
+import static org.apache.asterix.metadata.utils.PushdownUtil.YIELDABLE_FUNCTIONS;
 
 import java.util.List;
 
+import org.apache.asterix.metadata.utils.PushdownUtil;
 import org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaBuilder;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -42,33 +43,32 @@
 
     public boolean transform(ILogicalExpression expression, LogicalVariable producedVariable,
             IVariableTypeEnvironment typeEnv) throws AlgebricksException {
-        pushValueAccessExpression(expression, producedVariable, typeEnv);
-        return false;
+        return pushValueAccessExpression(expression, producedVariable, typeEnv);
     }
 
-    private void pushValueAccessExpression(Mutable<ILogicalExpression> exprRef, LogicalVariable producedVar,
+    private boolean pushValueAccessExpression(Mutable<ILogicalExpression> exprRef, LogicalVariable producedVar,
             IVariableTypeEnvironment typeEnv) throws AlgebricksException {
-        pushValueAccessExpression(exprRef.getValue(), producedVar, typeEnv);
+        return pushValueAccessExpression(exprRef.getValue(), producedVar, typeEnv);
     }
 
     /**
      * Pushdown field access expressions and array access expressions down
      */
-    private void pushValueAccessExpression(ILogicalExpression expr, LogicalVariable producedVar,
+    private boolean pushValueAccessExpression(ILogicalExpression expr, LogicalVariable producedVar,
             IVariableTypeEnvironment typeEnv) throws AlgebricksException {
         if (skipPushdown(expr)) {
-            return;
+            return false;
         }
 
         final AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
 
         if (isSuccessfullyPushedDown(funcExpr, producedVar, typeEnv)) {
             //We successfully pushed down the value access function
-            return;
+            return true;
         }
 
         //Check nested arguments if contains any pushable value access
-        pushValueAccessExpressionArg(funcExpr.getArguments(), producedVar, typeEnv);
+        return pushValueAccessExpressionArg(funcExpr.getArguments(), producedVar, typeEnv);
     }
 
     /**
@@ -82,12 +82,12 @@
             return true;
         }
         return expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL || builder.isEmpty()
-                || isTypeCheckOnVariable(expr);
+                || shouldYieldVariables(expr);
     }
 
     /**
-     * If the expression is a type-check function on a variable. We should stop as we do not want to unregister
-     * the variable used by the type-check function.
+     * If the expression is an 'allowed' function with only variable arguments, we should stop as we do not
+     * want to unregister the variable used by the function.
      * <p>
      * Example:
      * SELECT p.personInfo.name
@@ -110,21 +110,23 @@
      * @param expression expression
      * @return if the function is a type-check function and has a variable argument.
      */
-    private boolean isTypeCheckOnVariable(ILogicalExpression expression) {
+    private boolean shouldYieldVariables(ILogicalExpression expression) {
         AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expression;
-        return ALLOWED_FUNCTIONS.contains(funcExpr.getFunctionIdentifier())
-                && funcExpr.getArguments().get(0).getValue().getExpressionTag() == LogicalExpressionTag.VARIABLE;
+        return YIELDABLE_FUNCTIONS.contains(funcExpr.getFunctionIdentifier())
+                && PushdownUtil.isAllVariableExpressions(funcExpr.getArguments());
     }
 
-    private void pushValueAccessExpressionArg(List<Mutable<ILogicalExpression>> exprList, LogicalVariable producedVar,
-            IVariableTypeEnvironment typeEnv) throws AlgebricksException {
+    private boolean pushValueAccessExpressionArg(List<Mutable<ILogicalExpression>> exprList,
+            LogicalVariable producedVar, IVariableTypeEnvironment typeEnv) throws AlgebricksException {
+        boolean changed = false;
         for (Mutable<ILogicalExpression> exprRef : exprList) {
             /*
              * We need to set the produced variable as null here as the produced variable will not correspond to the
              * nested expression.
              */
-            pushValueAccessExpression(exprRef, producedVar, typeEnv);
+            changed |= pushValueAccessExpression(exprRef, producedVar, typeEnv);
         }
+        return changed;
     }
 
     private boolean isSuccessfullyPushedDown(AbstractFunctionCallExpression funcExpr, LogicalVariable producedVar,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.001.ddl.sqlpp
new file mode 100644
index 0000000..839c5f0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.001.ddl.sqlpp
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE COLLECTION TestOpenColumn1
+PRIMARY KEY (c_id:bigint)
+WITH {
+    "storage-format": {"format": "column"}
+};
+
+
+CREATE COLLECTION TestOpenColumn2
+PRIMARY KEY (c_id:bigint)
+WITH {
+    "storage-format": {"format": "column"}
+};
+
+CREATE INDEX idx_column_t1_s ON TestOpenColumn1(c_s:string);
+CREATE INDEX idx_column_t2_s ON TestOpenColumn2(c_s:string);
+
+CREATE COLLECTION TestOpenRow1
+PRIMARY KEY (c_id:bigint)
+WITH {
+    "storage-format": {"format": "row"}
+};
+
+
+CREATE COLLECTION TestOpenRow2
+PRIMARY KEY (c_id:bigint)
+WITH {
+    "storage-format": {"format": "row"}
+};
+
+CREATE INDEX idx_row_t1_s ON TestOpenRow1(c_s:string);
+CREATE INDEX idx_row_t2_s ON TestOpenRow2(c_s:string);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.002.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.002.update.sqlpp
new file mode 100644
index 0000000..7a49ddf
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.002.update.sqlpp
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+INSERT INTO TestOpenColumn1 ({
+    "c_id": 1,
+    "c_x": 1,
+    "c_s": "hello",
+    "c_i64": 2,
+    "c_i8": 2,
+    "c_d": 2
+});
+
+INSERT INTO TestOpenColumn2 ({
+    "c_id": 105,
+    "c_x": 105,
+    "c_s": "hello",
+    "c_i64": 2.25,
+    "c_i8": 10000.25,
+    "c_d": 3.5
+});
+
+INSERT INTO TestOpenRow1 ({
+    "c_id": 1,
+    "c_x": 1,
+    "c_s": "hello",
+    "c_i64": 2,
+    "c_i8": 2,
+    "c_d": 2
+});
+
+INSERT INTO TestOpenRow2 ({
+    "c_id": 105,
+    "c_x": 105,
+    "c_s": "hello",
+    "c_i64": 2.25,
+    "c_i8": 10000.25,
+    "c_d": 3.5
+});
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.010.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.010.query.sqlpp
new file mode 100644
index 0000000..066ff29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.010.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+
+SELECT t1.c_x c1, t2.c_x c2
+FROM  TestOpenRow1 t1, TestOpenRow2 t2
+WHERE to_string(t1.c_s) /*+ indexnl */ = t2.c_s
+ORDER BY t1.c_x, t2.c_x;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.011.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.011.query.sqlpp
new file mode 100644
index 0000000..20c9cd2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.011.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+
+SELECT t1.c_x c1, t2.c_x c2
+FROM  TestOpenColumn1 t1, TestOpenColumn2 t2
+WHERE to_string(t1.c_s) /*+ indexnl */ = t2.c_s
+ORDER BY t1.c_x, t2.c_x;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.012.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.012.query.sqlpp
new file mode 100644
index 0000000..767064d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.012.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT t1.c_x c1, t2.c_x c2
+FROM  TestOpenColumn1 t1, TestOpenColumn2 t2
+WHERE to_string(t1.c_s) /*+ indexnl */ = t2.c_s
+ORDER BY t1.c_x, t2.c_x;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.013.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.013.query.sqlpp
new file mode 100644
index 0000000..394b620
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.013.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+
+SELECT t1.c_x c1, t2.c_x c2
+FROM  TestOpenColumn1 t1, TestOpenColumn2 t2
+WHERE to_string(t1.c_s) /*+ indexnl */ = t2.c_s
+  AND t1.c_i64 = 2
+  AND t2.c_i64 = 2.25
+ORDER BY t1.c_x, t2.c_x;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.014.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.014.query.sqlpp
new file mode 100644
index 0000000..cddb7b3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.014.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT t1.c_x c1, t2.c_x c2
+FROM  TestOpenColumn1 t1, TestOpenColumn2 t2
+WHERE to_string(t1.c_s) /*+ indexnl */ = t2.c_s
+  AND t1.c_i64 = 2
+  AND t2.c_i64 = 2.25
+ORDER BY t1.c_x, t2.c_x;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.020.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.020.query.sqlpp
new file mode 100644
index 0000000..5c05355
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.020.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+
+SELECT t1.c_x c1, t2.c_x c2
+FROM  TestOpenColumn1 t1, TestOpenRow2 t2
+WHERE to_string(t1.c_s) /*+ indexnl */ = t2.c_s
+ORDER BY t1.c_x, t2.c_x;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.021.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.021.query.sqlpp
new file mode 100644
index 0000000..e7080da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.021.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT t1.c_x c1, t2.c_x c2
+FROM  TestOpenColumn1 t1, TestOpenRow2 t2
+WHERE to_string(t1.c_s) /*+ indexnl */ = t2.c_s
+ORDER BY t1.c_x, t2.c_x;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.022.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.022.query.sqlpp
new file mode 100644
index 0000000..04c6b5a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.022.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+
+SELECT t1.c_x c1, t2.c_x c2
+FROM  TestOpenColumn1 t1, TestOpenRow2 t2
+WHERE to_string(t1.c_s) /*+ indexnl */ = t2.c_s
+  AND t1.c_i64 = 2
+  AND t2.c_i64 = 2.25
+ORDER BY t1.c_x, t2.c_x;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.023.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.023.query.sqlpp
new file mode 100644
index 0000000..d845665
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.023.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT t1.c_x c1, t2.c_x c2
+FROM  TestOpenColumn1 t1, TestOpenRow2 t2
+WHERE to_string(t1.c_s) /*+ indexnl */ = t2.c_s
+  AND t1.c_i64 = 2
+  AND t2.c_i64 = 2.25
+ORDER BY t1.c_x, t2.c_x;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.030.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.030.query.sqlpp
new file mode 100644
index 0000000..67f6c03
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.030.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+
+SELECT t1.c_x c1, t2.c_x c2
+FROM  TestOpenRow1 t1, TestOpenColumn2 t2
+WHERE to_string(t1.c_s) /*+ indexnl */ = t2.c_s
+ORDER BY t1.c_x, t2.c_x;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.031.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.031.query.sqlpp
new file mode 100644
index 0000000..0cd21d6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.031.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT t1.c_x c1, t2.c_x c2
+FROM  TestOpenRow1 t1, TestOpenColumn2 t2
+WHERE to_string(t1.c_s) /*+ indexnl */ = t2.c_s
+ORDER BY t1.c_x, t2.c_x;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.032.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.032.query.sqlpp
new file mode 100644
index 0000000..5817b15
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.032.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+
+SELECT t1.c_x c1, t2.c_x c2
+FROM  TestOpenRow1 t1, TestOpenColumn2 t2
+WHERE to_string(t1.c_s) /*+ indexnl */ = t2.c_s
+  AND t1.c_i64 = 2
+  AND t2.c_i64 = 2.25
+ORDER BY t1.c_x, t2.c_x;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.033.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.033.query.sqlpp
new file mode 100644
index 0000000..e4093c2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/condition-pushdown/condition-pushdown.033.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT t1.c_x c1, t2.c_x c2
+FROM  TestOpenRow1 t1, TestOpenColumn2 t2
+WHERE to_string(t1.c_s) /*+ indexnl */ = t2.c_s
+  AND t1.c_i64 = 2
+  AND t2.c_i64 = 2.25
+ORDER BY t1.c_x, t2.c_x;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/left-outer/left-outer.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/left-outer/left-outer.001.ddl.sqlpp
new file mode 100644
index 0000000..f17480d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/left-outer/left-outer.001.ddl.sqlpp
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+CREATE COLLECTION DBLP
+PRIMARY KEY (id:int)
+WITH {
+    "storage-format": {"format": "column"}
+};
+
+CREATE COLLECTION CSX
+PRIMARY KEY (id:int)
+WITH {
+    "storage-format": {"format": "column"}
+};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/left-outer/left-outer.001.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/left-outer/left-outer.001.update.sqlpp
new file mode 100644
index 0000000..b7daded
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/left-outer/left-outer.001.update.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+INSERT INTO DBLP {"id":6, "authors":"asdaskhdaskj dhas dhkasj dsakjdh"};
+INSERT INTO CSX {"id":7, "authors":"asdaskhdaskj dhas dhkasj dsakjdh"};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/left-outer/left-outer.002.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/left-outer/left-outer.002.query.sqlpp
new file mode 100644
index 0000000..dc6428b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/join/left-outer/left-outer.002.query.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SELECT a.id AS aid, b.id AS bid
+FROM DBLP AS a
+LEFT OUTER UNNEST
+(
+  SELECT VALUE b
+  FROM CSX AS b
+  WHERE a.authors = b.authors
+) AS b
+ORDER BY a.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.015.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.015.query.sqlpp
new file mode 100644
index 0000000..d9f2390
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.015.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+-- Should request the entire 'x' instead of {"x": {"y":any}}
+SELECT COUNT(c.x.y)
+FROM ColumnDataset c, RowDataset r
+WHERE c.x = r.x
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.010.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.010.adm
new file mode 100644
index 0000000..96f213c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.010.adm
@@ -0,0 +1 @@
+{ "c1": 1, "c2": 105 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.011.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.011.adm
new file mode 100644
index 0000000..96f213c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.011.adm
@@ -0,0 +1 @@
+{ "c1": 1, "c2": 105 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.012.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.012.plan
new file mode 100644
index 0000000..5afd641
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.012.plan
@@ -0,0 +1,54 @@
+distribute result [$$32] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$32]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$32] <- [{"c1": $$40, "c2": $$41}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- SORT_MERGE_EXCHANGE [$$40(ASC), $$41(ASC) ]  |PARTITIONED|
+          order (ASC, $$40) (ASC, $$41) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STABLE_SORT [$$40(ASC), $$41(ASC)]  |PARTITIONED|
+            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              project ([$$40, $$41]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                select (eq($$37, $$t2.getField("c_s"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_SELECT  |PARTITIONED|
+                  assign [$$41] <- [$$t2.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ASSIGN  |PARTITIONED|
+                    project ([$$40, $$37, $$t2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        unnest-map [$$36, $$t2] <- index-search("TestOpenColumn2", 0, "Default", "test", "TestOpenColumn2", true, false, 1, $$45, 1, $$45, true, true, true) project ({c_s:any,c_x:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- BTREE_SEARCH  |PARTITIONED|
+                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            order (ASC, $$45) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STABLE_SORT [$$45(ASC)]  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                project ([$$40, $$37, $$45]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    unnest-map [$$44, $$45] <- index-search("idx_column_t2_s", 0, "Default", "test", "TestOpenColumn2", true, true, 1, $$37, 1, $$37, true, true, true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- BTREE_SEARCH  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                        project ([$$37, $$40]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          assign [$$37, $$40] <- [to-string($$t1.getField("c_s")), $$t1.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- ASSIGN  |PARTITIONED|
+                                            project ([$$t1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                data-scan []<-[$$35, $$t1] <- test.TestOpenColumn1 project ({c_s:any,c_x:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.013.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.013.adm
new file mode 100644
index 0000000..96f213c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.013.adm
@@ -0,0 +1 @@
+{ "c1": 1, "c2": 105 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.014.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.014.plan
new file mode 100644
index 0000000..493b14f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.014.plan
@@ -0,0 +1,60 @@
+distribute result [$$37] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$37]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$37] <- [{"c1": $$47, "c2": $$48}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- SORT_MERGE_EXCHANGE [$$47(ASC), $$48(ASC) ]  |PARTITIONED|
+          order (ASC, $$47) (ASC, $$48) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STABLE_SORT [$$47(ASC), $$48(ASC)]  |PARTITIONED|
+            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              project ([$$47, $$48]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                select (eq($$42, $$t2.getField("c_s"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_SELECT  |PARTITIONED|
+                  select (eq($$t2.getField("c_i64"), 2.25)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_SELECT  |PARTITIONED|
+                    assign [$$48] <- [$$t2.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ASSIGN  |PARTITIONED|
+                      project ([$$47, $$42, $$t2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          unnest-map [$$41, $$t2] <- index-search("TestOpenColumn2", 0, "Default", "test", "TestOpenColumn2", true, false, 1, $$52, 1, $$52, true, true, true) project ({c_i64:any,c_s:any,c_x:any}) range-filter on: eq($$t2.getField("c_i64"), 2.25) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- BTREE_SEARCH  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              order (ASC, $$52) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STABLE_SORT [$$52(ASC)]  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  project ([$$47, $$42, $$52]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      unnest-map [$$51, $$52] <- index-search("idx_column_t2_s", 0, "Default", "test", "TestOpenColumn2", true, true, 1, $$42, 1, $$42, true, true, true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- BTREE_SEARCH  |PARTITIONED|
+                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                          project ([$$47, $$42]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            assign [$$42] <- [to-string($$t1.getField("c_s"))] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- ASSIGN  |PARTITIONED|
+                                              select (eq($$t1.getField("c_i64"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- STREAM_SELECT  |PARTITIONED|
+                                                assign [$$47] <- [$$t1.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- ASSIGN  |PARTITIONED|
+                                                  project ([$$t1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      data-scan []<-[$$40, $$t1] <- test.TestOpenColumn1 project ({c_i64:any,c_s:any,c_x:any}) filter on: eq($$t1.getField("c_i64"), 2) range-filter on: eq($$t1.getField("c_i64"), 2) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.020.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.020.adm
new file mode 100644
index 0000000..96f213c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.020.adm
@@ -0,0 +1 @@
+{ "c1": 1, "c2": 105 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.021.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.021.plan
new file mode 100644
index 0000000..9a42877
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.021.plan
@@ -0,0 +1,54 @@
+distribute result [$$32] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$32]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$32] <- [{"c1": $$40, "c2": $$41}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- SORT_MERGE_EXCHANGE [$$40(ASC), $$41(ASC) ]  |PARTITIONED|
+          order (ASC, $$40) (ASC, $$41) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STABLE_SORT [$$40(ASC), $$41(ASC)]  |PARTITIONED|
+            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              project ([$$40, $$41]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                select (eq($$37, $$t2.getField("c_s"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_SELECT  |PARTITIONED|
+                  assign [$$41] <- [$$t2.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ASSIGN  |PARTITIONED|
+                    project ([$$40, $$37, $$t2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        unnest-map [$$36, $$t2] <- index-search("TestOpenRow2", 0, "Default", "test", "TestOpenRow2", true, false, 1, $$45, 1, $$45, true, true, true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- BTREE_SEARCH  |PARTITIONED|
+                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            order (ASC, $$45) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STABLE_SORT [$$45(ASC)]  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                project ([$$40, $$37, $$45]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    unnest-map [$$44, $$45] <- index-search("idx_row_t2_s", 0, "Default", "test", "TestOpenRow2", true, true, 1, $$37, 1, $$37, true, true, true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- BTREE_SEARCH  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                        project ([$$37, $$40]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          assign [$$37, $$40] <- [to-string($$t1.getField("c_s")), $$t1.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- ASSIGN  |PARTITIONED|
+                                            project ([$$t1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                data-scan []<-[$$35, $$t1] <- test.TestOpenColumn1 project ({c_s:any,c_x:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.022.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.022.adm
new file mode 100644
index 0000000..96f213c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.022.adm
@@ -0,0 +1 @@
+{ "c1": 1, "c2": 105 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.023.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.023.plan
new file mode 100644
index 0000000..3d11eab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.023.plan
@@ -0,0 +1,60 @@
+distribute result [$$37] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$37]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$37] <- [{"c1": $$47, "c2": $$48}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- SORT_MERGE_EXCHANGE [$$47(ASC), $$48(ASC) ]  |PARTITIONED|
+          order (ASC, $$47) (ASC, $$48) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STABLE_SORT [$$47(ASC), $$48(ASC)]  |PARTITIONED|
+            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              project ([$$47, $$48]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                select (eq($$42, $$t2.getField("c_s"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_SELECT  |PARTITIONED|
+                  select (eq($$t2.getField("c_i64"), 2.25)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_SELECT  |PARTITIONED|
+                    assign [$$48] <- [$$t2.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ASSIGN  |PARTITIONED|
+                      project ([$$47, $$42, $$t2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          unnest-map [$$41, $$t2] <- index-search("TestOpenRow2", 0, "Default", "test", "TestOpenRow2", true, false, 1, $$52, 1, $$52, true, true, true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- BTREE_SEARCH  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              order (ASC, $$52) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STABLE_SORT [$$52(ASC)]  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  project ([$$47, $$42, $$52]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      unnest-map [$$51, $$52] <- index-search("idx_row_t2_s", 0, "Default", "test", "TestOpenRow2", true, true, 1, $$42, 1, $$42, true, true, true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- BTREE_SEARCH  |PARTITIONED|
+                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                          project ([$$47, $$42]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            assign [$$42] <- [to-string($$t1.getField("c_s"))] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- ASSIGN  |PARTITIONED|
+                                              select (eq($$t1.getField("c_i64"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- STREAM_SELECT  |PARTITIONED|
+                                                assign [$$47] <- [$$t1.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- ASSIGN  |PARTITIONED|
+                                                  project ([$$t1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      data-scan []<-[$$40, $$t1] <- test.TestOpenColumn1 project ({c_i64:any,c_s:any,c_x:any}) filter on: eq($$t1.getField("c_i64"), 2) range-filter on: eq($$t1.getField("c_i64"), 2) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.030.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.030.adm
new file mode 100644
index 0000000..96f213c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.030.adm
@@ -0,0 +1 @@
+{ "c1": 1, "c2": 105 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.031.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.031.plan
new file mode 100644
index 0000000..e7345eb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.031.plan
@@ -0,0 +1,54 @@
+distribute result [$$32] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$32]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$32] <- [{"c1": $$40, "c2": $$41}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- SORT_MERGE_EXCHANGE [$$40(ASC), $$41(ASC) ]  |PARTITIONED|
+          order (ASC, $$40) (ASC, $$41) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STABLE_SORT [$$40(ASC), $$41(ASC)]  |PARTITIONED|
+            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              project ([$$40, $$41]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                select (eq($$37, $$t2.getField("c_s"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_SELECT  |PARTITIONED|
+                  assign [$$41] <- [$$t2.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ASSIGN  |PARTITIONED|
+                    project ([$$40, $$37, $$t2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        unnest-map [$$36, $$t2] <- index-search("TestOpenColumn2", 0, "Default", "test", "TestOpenColumn2", true, false, 1, $$45, 1, $$45, true, true, true) project ({c_s:any,c_x:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- BTREE_SEARCH  |PARTITIONED|
+                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            order (ASC, $$45) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STABLE_SORT [$$45(ASC)]  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                project ([$$40, $$37, $$45]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    unnest-map [$$44, $$45] <- index-search("idx_column_t2_s", 0, "Default", "test", "TestOpenColumn2", true, true, 1, $$37, 1, $$37, true, true, true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- BTREE_SEARCH  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                        project ([$$37, $$40]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          assign [$$37, $$40] <- [to-string($$t1.getField("c_s")), $$t1.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- ASSIGN  |PARTITIONED|
+                                            project ([$$t1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                data-scan []<-[$$35, $$t1] <- test.TestOpenRow1 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.032.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.032.adm
new file mode 100644
index 0000000..96f213c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.032.adm
@@ -0,0 +1 @@
+{ "c1": 1, "c2": 105 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.033.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.033.plan
new file mode 100644
index 0000000..ff129d8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/condition-pushdown/condition-pushdown.033.plan
@@ -0,0 +1,60 @@
+distribute result [$$37] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$37]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$37] <- [{"c1": $$47, "c2": $$48}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- SORT_MERGE_EXCHANGE [$$47(ASC), $$48(ASC) ]  |PARTITIONED|
+          order (ASC, $$47) (ASC, $$48) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STABLE_SORT [$$47(ASC), $$48(ASC)]  |PARTITIONED|
+            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              project ([$$47, $$48]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                select (eq($$42, $$t2.getField("c_s"))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_SELECT  |PARTITIONED|
+                  select (eq($$t2.getField("c_i64"), 2.25)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_SELECT  |PARTITIONED|
+                    assign [$$48] <- [$$t2.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ASSIGN  |PARTITIONED|
+                      project ([$$47, $$42, $$t2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          unnest-map [$$41, $$t2] <- index-search("TestOpenColumn2", 0, "Default", "test", "TestOpenColumn2", true, false, 1, $$52, 1, $$52, true, true, true) project ({c_i64:any,c_s:any,c_x:any}) range-filter on: eq($$t2.getField("c_i64"), 2.25) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- BTREE_SEARCH  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              order (ASC, $$52) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STABLE_SORT [$$52(ASC)]  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  project ([$$47, $$42, $$52]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      unnest-map [$$51, $$52] <- index-search("idx_column_t2_s", 0, "Default", "test", "TestOpenColumn2", true, true, 1, $$42, 1, $$42, true, true, true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- BTREE_SEARCH  |PARTITIONED|
+                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                          project ([$$47, $$42]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            assign [$$42] <- [to-string($$t1.getField("c_s"))] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- ASSIGN  |PARTITIONED|
+                                              select (eq($$t1.getField("c_i64"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- STREAM_SELECT  |PARTITIONED|
+                                                assign [$$47] <- [$$t1.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- ASSIGN  |PARTITIONED|
+                                                  project ([$$t1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      data-scan []<-[$$40, $$t1] <- test.TestOpenRow1 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/left-outer/left-outer.002.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/left-outer/left-outer.002.adm
new file mode 100644
index 0000000..a427e98
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/join/left-outer/left-outer.002.adm
@@ -0,0 +1 @@
+{ "aid": 6, "bid": 7 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.015.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.015.plan
new file mode 100644
index 0000000..a2f87a4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.015.plan
@@ -0,0 +1,54 @@
+distribute result [$$52] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    project ([$$52]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      assign [$$52] <- [{"$1": $$57}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |UNPARTITIONED|
+        aggregate [$$57] <- [agg-sql-sum($$60)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$60] <- [agg-sql-count($$50)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- AGGREGATE  |PARTITIONED|
+              project ([$$50]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  join (eq($$55, $$56)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HYBRID_HASH_JOIN [$$55][$$56]  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- HASH_PARTITION_EXCHANGE [$$55]  |PARTITIONED|
+                      assign [$$50] <- [$$55.getField("y")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ASSIGN  |PARTITIONED|
+                        project ([$$55]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          assign [$$55] <- [$$c.getField("x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |PARTITIONED|
+                            project ([$$c]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                data-scan []<-[$$53, $$c] <- test.ColumnDataset project ({x:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- HASH_PARTITION_EXCHANGE [$$56]  |PARTITIONED|
+                      project ([$$56]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$56] <- [$$r.getField("x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$r]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$54, $$r] <- test.RowDataset [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.012.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.012.plan
new file mode 100644
index 0000000..47546eb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.012.plan
@@ -0,0 +1,54 @@
+distribute result [$$32] [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$32]) [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$32] <- [{"c1": $$40, "c2": $$41}] [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+        -- SORT_MERGE_EXCHANGE [$$40(ASC), $$41(ASC) ]  |PARTITIONED|
+          order (ASC, $$40) (ASC, $$41) [cardinality: 2.1, op-cost: 2.25, total-cost: 23.25]
+          -- STABLE_SORT [$$40(ASC), $$41(ASC)]  |PARTITIONED|
+            exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 21.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              project ([$$40, $$41]) [cardinality: 2.1, op-cost: 0.0, total-cost: 21.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                select (eq($$37, $$t2.getField("c_s"))) [cardinality: 2.1, op-cost: 10.5, total-cost: 21.0]
+                -- STREAM_SELECT  |PARTITIONED|
+                  assign [$$41] <- [$$t2.getField("c_x")] [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                  -- ASSIGN  |PARTITIONED|
+                    project ([$$40, $$37, $$t2]) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        unnest-map [$$36, $$t2] <- index-search("TestOpenColumn2", 0, "Default", "test", "TestOpenColumn2", true, false, 1, $$45, 1, $$45, true, true, true) project ({c_s:any,c_x:any}) [cardinality: 2.1, op-cost: 2.1, total-cost: 2.1]
+                        -- BTREE_SEARCH  |PARTITIONED|
+                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            order (ASC, $$45) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STABLE_SORT [$$45(ASC)]  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                project ([$$40, $$37, $$45]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    unnest-map [$$44, $$45] <- index-search("idx_column_t2_s", 0, "Default", "test", "TestOpenColumn2", true, true, 1, $$37, 1, $$37, true, true, true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- BTREE_SEARCH  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                        project ([$$37, $$40]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          assign [$$37, $$40] <- [to-string($$t1.getField("c_s")), $$t1.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- ASSIGN  |PARTITIONED|
+                                            project ([$$t1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                data-scan []<-[$$35, $$t1] <- test.TestOpenColumn1 project ({c_s:any,c_x:any}) [cardinality: 2.1, op-cost: 2.1, total-cost: 2.1]
+                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.014.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.014.plan
new file mode 100644
index 0000000..90f5172
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.014.plan
@@ -0,0 +1,60 @@
+distribute result [$$37] [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$37]) [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$37] <- [{"c1": $$47, "c2": $$48}] [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+        -- SORT_MERGE_EXCHANGE [$$47(ASC), $$48(ASC) ]  |PARTITIONED|
+          order (ASC, $$47) (ASC, $$48) [cardinality: 2.1, op-cost: 2.25, total-cost: 23.25]
+          -- STABLE_SORT [$$47(ASC), $$48(ASC)]  |PARTITIONED|
+            exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 21.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              project ([$$47, $$48]) [cardinality: 2.1, op-cost: 0.0, total-cost: 21.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                select (eq($$42, $$t2.getField("c_s"))) [cardinality: 2.1, op-cost: 10.5, total-cost: 21.0]
+                -- STREAM_SELECT  |PARTITIONED|
+                  select (eq($$t2.getField("c_i64"), 2.25)) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                  -- STREAM_SELECT  |PARTITIONED|
+                    assign [$$48] <- [$$t2.getField("c_x")] [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                    -- ASSIGN  |PARTITIONED|
+                      project ([$$47, $$42, $$t2]) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          unnest-map [$$41, $$t2] <- index-search("TestOpenColumn2", 0, "Default", "test", "TestOpenColumn2", true, false, 1, $$52, 1, $$52, true, true, true) project ({c_i64:any,c_s:any,c_x:any}) range-filter on: eq($$t2.getField("c_i64"), 2.25) [cardinality: 2.1, op-cost: 2.1, total-cost: 2.1]
+                          -- BTREE_SEARCH  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              order (ASC, $$52) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STABLE_SORT [$$52(ASC)]  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  project ([$$47, $$42, $$52]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      unnest-map [$$51, $$52] <- index-search("idx_column_t2_s", 0, "Default", "test", "TestOpenColumn2", true, true, 1, $$42, 1, $$42, true, true, true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- BTREE_SEARCH  |PARTITIONED|
+                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                          project ([$$47, $$42]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            assign [$$42] <- [to-string($$t1.getField("c_s"))] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- ASSIGN  |PARTITIONED|
+                                              select (eq($$t1.getField("c_i64"), 2)) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                                              -- STREAM_SELECT  |PARTITIONED|
+                                                assign [$$47] <- [$$t1.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- ASSIGN  |PARTITIONED|
+                                                  project ([$$t1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      data-scan []<-[$$40, $$t1] <- test.TestOpenColumn1 project ({c_i64:any,c_s:any,c_x:any}) filter on: eq($$t1.getField("c_i64"), 2) range-filter on: eq($$t1.getField("c_i64"), 2) [cardinality: 2.1, op-cost: 2.1, total-cost: 2.1]
+                                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.021.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.021.plan
new file mode 100644
index 0000000..a93082e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.021.plan
@@ -0,0 +1,54 @@
+distribute result [$$32] [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$32]) [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$32] <- [{"c1": $$40, "c2": $$41}] [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+        -- SORT_MERGE_EXCHANGE [$$40(ASC), $$41(ASC) ]  |PARTITIONED|
+          order (ASC, $$40) (ASC, $$41) [cardinality: 2.1, op-cost: 2.25, total-cost: 23.25]
+          -- STABLE_SORT [$$40(ASC), $$41(ASC)]  |PARTITIONED|
+            exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 21.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              project ([$$40, $$41]) [cardinality: 2.1, op-cost: 0.0, total-cost: 21.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                select (eq($$37, $$t2.getField("c_s"))) [cardinality: 2.1, op-cost: 10.5, total-cost: 21.0]
+                -- STREAM_SELECT  |PARTITIONED|
+                  assign [$$41] <- [$$t2.getField("c_x")] [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                  -- ASSIGN  |PARTITIONED|
+                    project ([$$40, $$37, $$t2]) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        unnest-map [$$36, $$t2] <- index-search("TestOpenRow2", 0, "Default", "test", "TestOpenRow2", true, false, 1, $$45, 1, $$45, true, true, true) [cardinality: 2.1, op-cost: 2.1, total-cost: 2.1]
+                        -- BTREE_SEARCH  |PARTITIONED|
+                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            order (ASC, $$45) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STABLE_SORT [$$45(ASC)]  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                project ([$$40, $$37, $$45]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    unnest-map [$$44, $$45] <- index-search("idx_row_t2_s", 0, "Default", "test", "TestOpenRow2", true, true, 1, $$37, 1, $$37, true, true, true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- BTREE_SEARCH  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                        project ([$$37, $$40]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          assign [$$37, $$40] <- [to-string($$t1.getField("c_s")), $$t1.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- ASSIGN  |PARTITIONED|
+                                            project ([$$t1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                data-scan []<-[$$35, $$t1] <- test.TestOpenColumn1 project ({c_s:any,c_x:any}) [cardinality: 2.1, op-cost: 2.1, total-cost: 2.1]
+                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.023.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.023.plan
new file mode 100644
index 0000000..acadbbd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.023.plan
@@ -0,0 +1,60 @@
+distribute result [$$37] [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$37]) [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$37] <- [{"c1": $$47, "c2": $$48}] [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+        -- SORT_MERGE_EXCHANGE [$$47(ASC), $$48(ASC) ]  |PARTITIONED|
+          order (ASC, $$47) (ASC, $$48) [cardinality: 2.1, op-cost: 2.25, total-cost: 23.25]
+          -- STABLE_SORT [$$47(ASC), $$48(ASC)]  |PARTITIONED|
+            exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 21.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              project ([$$47, $$48]) [cardinality: 2.1, op-cost: 0.0, total-cost: 21.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                select (eq($$42, $$t2.getField("c_s"))) [cardinality: 2.1, op-cost: 10.5, total-cost: 21.0]
+                -- STREAM_SELECT  |PARTITIONED|
+                  select (eq($$t2.getField("c_i64"), 2.25)) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                  -- STREAM_SELECT  |PARTITIONED|
+                    assign [$$48] <- [$$t2.getField("c_x")] [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                    -- ASSIGN  |PARTITIONED|
+                      project ([$$47, $$42, $$t2]) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          unnest-map [$$41, $$t2] <- index-search("TestOpenRow2", 0, "Default", "test", "TestOpenRow2", true, false, 1, $$52, 1, $$52, true, true, true) [cardinality: 2.1, op-cost: 2.1, total-cost: 2.1]
+                          -- BTREE_SEARCH  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              order (ASC, $$52) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STABLE_SORT [$$52(ASC)]  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  project ([$$47, $$42, $$52]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      unnest-map [$$51, $$52] <- index-search("idx_row_t2_s", 0, "Default", "test", "TestOpenRow2", true, true, 1, $$42, 1, $$42, true, true, true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- BTREE_SEARCH  |PARTITIONED|
+                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                          project ([$$47, $$42]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            assign [$$42] <- [to-string($$t1.getField("c_s"))] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- ASSIGN  |PARTITIONED|
+                                              select (eq($$t1.getField("c_i64"), 2)) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                                              -- STREAM_SELECT  |PARTITIONED|
+                                                assign [$$47] <- [$$t1.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- ASSIGN  |PARTITIONED|
+                                                  project ([$$t1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      data-scan []<-[$$40, $$t1] <- test.TestOpenColumn1 project ({c_i64:any,c_s:any,c_x:any}) filter on: eq($$t1.getField("c_i64"), 2) range-filter on: eq($$t1.getField("c_i64"), 2) [cardinality: 2.1, op-cost: 2.1, total-cost: 2.1]
+                                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.031.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.031.plan
new file mode 100644
index 0000000..ddb3f27
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.031.plan
@@ -0,0 +1,54 @@
+distribute result [$$32] [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$32]) [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$32] <- [{"c1": $$40, "c2": $$41}] [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+        -- SORT_MERGE_EXCHANGE [$$40(ASC), $$41(ASC) ]  |PARTITIONED|
+          order (ASC, $$40) (ASC, $$41) [cardinality: 2.1, op-cost: 2.25, total-cost: 23.25]
+          -- STABLE_SORT [$$40(ASC), $$41(ASC)]  |PARTITIONED|
+            exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 21.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              project ([$$40, $$41]) [cardinality: 2.1, op-cost: 0.0, total-cost: 21.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                select (eq($$37, $$t2.getField("c_s"))) [cardinality: 2.1, op-cost: 10.5, total-cost: 21.0]
+                -- STREAM_SELECT  |PARTITIONED|
+                  assign [$$41] <- [$$t2.getField("c_x")] [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                  -- ASSIGN  |PARTITIONED|
+                    project ([$$40, $$37, $$t2]) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        unnest-map [$$36, $$t2] <- index-search("TestOpenColumn2", 0, "Default", "test", "TestOpenColumn2", true, false, 1, $$45, 1, $$45, true, true, true) project ({c_s:any,c_x:any}) [cardinality: 2.1, op-cost: 2.1, total-cost: 2.1]
+                        -- BTREE_SEARCH  |PARTITIONED|
+                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            order (ASC, $$45) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STABLE_SORT [$$45(ASC)]  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                project ([$$40, $$37, $$45]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    unnest-map [$$44, $$45] <- index-search("idx_column_t2_s", 0, "Default", "test", "TestOpenColumn2", true, true, 1, $$37, 1, $$37, true, true, true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- BTREE_SEARCH  |PARTITIONED|
+                                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                        project ([$$37, $$40]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- STREAM_PROJECT  |PARTITIONED|
+                                          assign [$$37, $$40] <- [to-string($$t1.getField("c_s")), $$t1.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- ASSIGN  |PARTITIONED|
+                                            project ([$$t1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- STREAM_PROJECT  |PARTITIONED|
+                                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                data-scan []<-[$$35, $$t1] <- test.TestOpenRow1 [cardinality: 2.1, op-cost: 2.1, total-cost: 2.1]
+                                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.033.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.033.plan
new file mode 100644
index 0000000..a8e1b83
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/join/condition-pushdown/condition-pushdown.033.plan
@@ -0,0 +1,60 @@
+distribute result [$$37] [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$37]) [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$37] <- [{"c1": $$47, "c2": $$48}] [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+      -- ASSIGN  |PARTITIONED|
+        exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 23.25]
+        -- SORT_MERGE_EXCHANGE [$$47(ASC), $$48(ASC) ]  |PARTITIONED|
+          order (ASC, $$47) (ASC, $$48) [cardinality: 2.1, op-cost: 2.25, total-cost: 23.25]
+          -- STABLE_SORT [$$47(ASC), $$48(ASC)]  |PARTITIONED|
+            exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 21.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              project ([$$47, $$48]) [cardinality: 2.1, op-cost: 0.0, total-cost: 21.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                select (eq($$42, $$t2.getField("c_s"))) [cardinality: 2.1, op-cost: 10.5, total-cost: 21.0]
+                -- STREAM_SELECT  |PARTITIONED|
+                  select (eq($$t2.getField("c_i64"), 2.25)) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                  -- STREAM_SELECT  |PARTITIONED|
+                    assign [$$48] <- [$$t2.getField("c_x")] [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                    -- ASSIGN  |PARTITIONED|
+                      project ([$$47, $$42, $$t2]) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          unnest-map [$$41, $$t2] <- index-search("TestOpenColumn2", 0, "Default", "test", "TestOpenColumn2", true, false, 1, $$52, 1, $$52, true, true, true) project ({c_i64:any,c_s:any,c_x:any}) range-filter on: eq($$t2.getField("c_i64"), 2.25) [cardinality: 2.1, op-cost: 2.1, total-cost: 2.1]
+                          -- BTREE_SEARCH  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              order (ASC, $$52) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STABLE_SORT [$$52(ASC)]  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  project ([$$47, $$42, $$52]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      unnest-map [$$51, $$52] <- index-search("idx_column_t2_s", 0, "Default", "test", "TestOpenColumn2", true, true, 1, $$42, 1, $$42, true, true, true) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- BTREE_SEARCH  |PARTITIONED|
+                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                          project ([$$47, $$42]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            assign [$$42] <- [to-string($$t1.getField("c_s"))] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- ASSIGN  |PARTITIONED|
+                                              select (eq($$t1.getField("c_i64"), 2)) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                                              -- STREAM_SELECT  |PARTITIONED|
+                                                assign [$$47] <- [$$t1.getField("c_x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                -- ASSIGN  |PARTITIONED|
+                                                  project ([$$t1]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      data-scan []<-[$$40, $$t1] <- test.TestOpenRow1 [cardinality: 2.1, op-cost: 2.1, total-cost: 2.1]
+                                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.015.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.015.plan
new file mode 100644
index 0000000..2c56948
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.015.plan
@@ -0,0 +1,54 @@
+distribute result [$$52] [cardinality: 2.1, op-cost: 0.0, total-cost: 12.6]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 12.6]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    project ([$$52]) [cardinality: 2.1, op-cost: 0.0, total-cost: 12.6]
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      assign [$$52] <- [{"$1": $$57}] [cardinality: 2.1, op-cost: 0.0, total-cost: 12.6]
+      -- ASSIGN  |UNPARTITIONED|
+        aggregate [$$57] <- [agg-sql-sum($$60)] [cardinality: 2.1, op-cost: 0.0, total-cost: 12.6]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 12.6]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$60] <- [agg-sql-count($$50)] [cardinality: 2.1, op-cost: 0.0, total-cost: 12.6]
+            -- AGGREGATE  |PARTITIONED|
+              project ([$$50]) [cardinality: 2.1, op-cost: 0.0, total-cost: 12.6]
+              -- STREAM_PROJECT  |PARTITIONED|
+                exchange [cardinality: 2.1, op-cost: 0.0, total-cost: 12.6]
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  join (eq($$55, $$56)) [cardinality: 2.1, op-cost: 4.2, total-cost: 12.6]
+                  -- HYBRID_HASH_JOIN [$$55][$$56]  |PARTITIONED|
+                    exchange [cardinality: 2.1, op-cost: 2.1, total-cost: 4.2]
+                    -- HASH_PARTITION_EXCHANGE [$$55]  |PARTITIONED|
+                      assign [$$50] <- [$$55.getField("y")] [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                      -- ASSIGN  |PARTITIONED|
+                        project ([$$55]) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          assign [$$55] <- [$$c.getField("x")] [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                          -- ASSIGN  |PARTITIONED|
+                            project ([$$c]) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              exchange [cardinality: 2.1, op-cost: 2.1, total-cost: 4.2]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                data-scan []<-[$$53, $$c] <- test.ColumnDataset project ({x:any}) [cardinality: 2.1, op-cost: 2.1, total-cost: 2.1]
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                    exchange [cardinality: 2.1, op-cost: 2.1, total-cost: 4.2]
+                    -- HASH_PARTITION_EXCHANGE [$$56]  |PARTITIONED|
+                      project ([$$56]) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$56] <- [$$r.getField("x")] [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$r]) [cardinality: 2.1, op-cost: 0.0, total-cost: 2.1]
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange [cardinality: 2.1, op-cost: 2.1, total-cost: 4.2]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$54, $$r] <- test.RowDataset [cardinality: 2.1, op-cost: 2.1, total-cost: 2.1]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
index c18075d..22bc3a9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml
@@ -16264,6 +16264,16 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="column">
+      <compilation-unit name="pushdown/join/condition-pushdown">
+        <output-dir compare="Text">pushdown/join/condition-pushdown</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="column">
+      <compilation-unit name="pushdown/join/left-outer">
+        <output-dir compare="Text">pushdown/join/left-outer</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="column">
       <compilation-unit name="pushdown/other-pushdowns">
         <output-dir compare="Text">pushdown/other-pushdowns</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/PushdownUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/PushdownUtil.java
index 7daef5c..aaaeea1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/PushdownUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/PushdownUtil.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.metadata.utils;
 
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.asterix.om.base.ABoolean;
@@ -30,7 +31,10 @@
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
@@ -43,7 +47,7 @@
 
 public class PushdownUtil {
     //Set of allowed functions that can request a type in its entirety without marking it as leaf (i.e., ANY)
-    public static final Set<FunctionIdentifier> ALLOWED_FUNCTIONS = createAllowedFunctions();
+    public static final Set<FunctionIdentifier> YIELDABLE_FUNCTIONS = createYieldableFunctions();
     //Set of supported array functions
     public static final Set<FunctionIdentifier> ARRAY_FUNCTIONS = createSupportedArrayFunctions();
     //Set of supported functions that we can push down (a.k.a. path functions)
@@ -61,8 +65,9 @@
         if (BuiltinFunctions.FIELD_ACCESS_BY_NAME.equals(fieldAccessExpr.getFunctionIdentifier())) {
             return ConstantExpressionUtil.getStringArgument(fieldAccessExpr, 1);
         } else {
-            //FIELD_ACCESS_BY_INDEX
-            ARecordType recordType = (ARecordType) typeEnv.getType(fieldAccessExpr.getArguments().get(0).getValue());
+            // FIELD_ACCESS_BY_INDEX
+            IAType type = (IAType) typeEnv.getType(fieldAccessExpr.getArguments().get(0).getValue());
+            ARecordType recordType = getRecordType(type);
             int fieldIdx = ConstantExpressionUtil.getIntArgument(fieldAccessExpr, 1);
             return recordType.getFieldNames()[fieldIdx];
         }
@@ -138,6 +143,10 @@
         return fid1 != null && fid1.equals(fid2);
     }
 
+    public static boolean isAllVariableExpressions(List<Mutable<ILogicalExpression>> arguments) {
+        return arguments.stream().allMatch(arg -> arg.getValue().getExpressionTag() == LogicalExpressionTag.VARIABLE);
+    }
+
     public static IAObject getConstant(ILogicalExpression expr) {
         IAlgebricksConstantValue algebricksConstant = ((ConstantExpression) expr).getValue();
         if (algebricksConstant.isTrue()) {
@@ -165,6 +174,15 @@
         return Set.of(BuiltinFunctions.GET_ITEM, BuiltinFunctions.ARRAY_STAR, BuiltinFunctions.SCAN_COLLECTION);
     }
 
+    private static ARecordType getRecordType(IAType type) {
+        IAType recordType = type;
+        if (type.getTypeTag() == ATypeTag.UNION) {
+            recordType = ((AUnionType) type).getActualType();
+        }
+
+        return (ARecordType) recordType;
+    }
+
     private static Set<FunctionIdentifier> createSupportedFunctions() {
         Set<FunctionIdentifier> supportedFunctions = new HashSet<>();
         supportedFunctions.add(BuiltinFunctions.FIELD_ACCESS_BY_NAME);
@@ -173,11 +191,16 @@
         return supportedFunctions;
     }
 
-    private static Set<FunctionIdentifier> createAllowedFunctions() {
-        return Set.of(BuiltinFunctions.IS_ARRAY, BuiltinFunctions.IS_OBJECT, BuiltinFunctions.IS_ATOMIC,
+    private static Set<FunctionIdentifier> createYieldableFunctions() {
+        return Set.of(BuiltinFunctions.IS_ARRAY, BuiltinFunctions.IS_MULTISET, BuiltinFunctions.IS_OBJECT,
+                BuiltinFunctions.IS_ATOMIC, BuiltinFunctions.IS_BINARY, BuiltinFunctions.IS_POINT,
+                BuiltinFunctions.IS_LINE, BuiltinFunctions.IS_RECTANGLE, BuiltinFunctions.IS_CIRCLE,
+                BuiltinFunctions.IS_POLYGON, BuiltinFunctions.IS_SPATIAL, BuiltinFunctions.IS_DATE,
+                BuiltinFunctions.IS_DATETIME, BuiltinFunctions.IS_TIME, BuiltinFunctions.IS_DURATION,
+                BuiltinFunctions.IS_INTERVAL, BuiltinFunctions.IS_TEMPORAL, BuiltinFunctions.IS_UUID,
                 BuiltinFunctions.IS_NUMBER, BuiltinFunctions.IS_BOOLEAN, BuiltinFunctions.IS_STRING,
-                AlgebricksBuiltinFunctions.IS_MISSING, AlgebricksBuiltinFunctions.IS_NULL, BuiltinFunctions.IS_UNKNOWN,
-                BuiltinFunctions.LT, BuiltinFunctions.LE, BuiltinFunctions.EQ, BuiltinFunctions.GT, BuiltinFunctions.GE,
+                BuiltinFunctions.IS_SYSTEM_NULL, AlgebricksBuiltinFunctions.IS_MISSING,
+                AlgebricksBuiltinFunctions.IS_NULL, BuiltinFunctions.IS_UNKNOWN, BuiltinFunctions.GET_TYPE,
                 BuiltinFunctions.SCALAR_SQL_COUNT);
     }