[ASTERIXDB-3395][COMP] Fix pushing common array expressions

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Fix pushing common array expressions

Change-Id: I14b81d8fcf2785fd87c37c516e26b9203acb7ec0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18265
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractComplexExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractComplexExpectedSchemaNode.java
index 7184e1f..7b26cf6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractComplexExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractComplexExpectedSchemaNode.java
@@ -84,7 +84,26 @@
         return node;
     }
 
-    protected abstract void replaceChild(IExpectedSchemaNode oldNode, IExpectedSchemaNode newNode);
+    protected abstract IExpectedSchemaNode replaceChild(IExpectedSchemaNode oldNode, IExpectedSchemaNode newNode);
+
+    /**
+     * A child is replaceable if
+     * - child is allowed to be replaced
+     * - AND either of the following is satisfied:
+     * - - child is of type {@link ExpectedSchemaNodeType#ANY}
+     * - - OR the newNode is of type {@link ExpectedSchemaNodeType#UNION}
+     * - - OR the newNode is not replaceable
+     *
+     * @param child   current child
+     * @param newNode the new node to replace the current child
+     * @return true if child is replaceable, false otherwise
+     */
+    protected boolean isChildReplaceable(IExpectedSchemaNode child, IExpectedSchemaNode newNode) {
+        ExpectedSchemaNodeType childType = child.getType();
+        ExpectedSchemaNodeType newType = newNode.getType();
+        return child.allowsReplacing() && (childType == ExpectedSchemaNodeType.ANY
+                || newType == ExpectedSchemaNodeType.UNION || !newNode.allowsReplacing());
+    }
 
     public static AbstractComplexExpectedSchemaNode createNestedNode(ExpectedSchemaNodeType type,
             AbstractComplexExpectedSchemaNode parent, SourceLocation sourceLocation, String functionName) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AnyExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AnyExpectedSchemaNode.java
index 891d744..80069e3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AnyExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AnyExpectedSchemaNode.java
@@ -59,8 +59,7 @@
         AbstractComplexExpectedSchemaNode parent = getParent();
         AbstractComplexExpectedSchemaNode nestedNode = AbstractComplexExpectedSchemaNode
                 .createNestedNode(expectedNodeType, parent, getSourceLocation(), functionName);
-        parent.replaceChild(this, nestedNode);
-        return nestedNode;
+        return parent.replaceChild(this, nestedNode);
     }
 
     @Override
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java
index d7c4948..57e0d98 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java
@@ -47,11 +47,16 @@
     }
 
     @Override
-    public void replaceChild(IExpectedSchemaNode oldNode, IExpectedSchemaNode newNode) {
-        if (oldNode != child) {
-            //this should not happen
-            throw new IllegalStateException("Node " + oldNode.getType() + " is not a child");
+    public IExpectedSchemaNode replaceChild(IExpectedSchemaNode oldNode, IExpectedSchemaNode newNode) {
+        if (child.getType() == newNode.getType()) {
+            // We are trying to replace with the same node type
+            return child;
+        } else if (isChildReplaceable(child, newNode)) {
+            child = newNode;
+            return child;
         }
-        child = newNode;
+
+        // This should never happen, but safeguard against unexpected behavior
+        throw new IllegalStateException("Cannot replace " + child.getType() + " with " + newNode.getType());
     }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ExpectedSchemaBuilder.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ExpectedSchemaBuilder.java
index fa3c196..d42ffed 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ExpectedSchemaBuilder.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ExpectedSchemaBuilder.java
@@ -61,12 +61,13 @@
         if (parent != null) {
             IExpectedSchemaNode leaf =
                     new AnyExpectedSchemaNode(parent, expr.getSourceLocation(), expr.getFunctionIdentifier().getName());
-            addChild(expr, typeEnv, parent, leaf);
+            IExpectedSchemaNode actualNode = addOrReplaceChild(expr, typeEnv, parent, leaf);
             if (producedVar != null) {
                 //Register the node if a variable is produced
-                varToNode.put(producedVar, leaf);
+                varToNode.put(producedVar, actualNode);
             }
         }
+
         return parent != null;
     }
 
@@ -135,9 +136,8 @@
              */
             AbstractComplexExpectedSchemaNode myNode = AbstractComplexExpectedSchemaNode.createNestedNode(myType,
                     newParent, myExpr.getSourceLocation(), myExpr.getFunctionIdentifier().getName());
-            //Add myNode to the parent
-            addChild(parentFuncExpr, typeEnv, newParent, myNode);
-            return myNode;
+            // Add (or replace old child with) myNode to the parent
+            return addOrReplaceChild(parentFuncExpr, typeEnv, newParent, myNode);
         }
         return null;
     }
@@ -166,18 +166,16 @@
         return newNode;
     }
 
-    public static void addChild(AbstractFunctionCallExpression parentExpr, IVariableTypeEnvironment typeEnv,
-            AbstractComplexExpectedSchemaNode parent, IExpectedSchemaNode child) throws AlgebricksException {
+    public static IExpectedSchemaNode addOrReplaceChild(AbstractFunctionCallExpression parentExpr,
+            IVariableTypeEnvironment typeEnv, AbstractComplexExpectedSchemaNode parent, IExpectedSchemaNode child)
+            throws AlgebricksException {
         switch (parent.getType()) {
             case OBJECT:
-                handleObject(parentExpr, typeEnv, parent, child);
-                break;
+                return handleObject(parentExpr, typeEnv, parent, child);
             case ARRAY:
-                handleArray(parent, child);
-                break;
+                return handleArray(parent, child);
             case UNION:
-                handleUnion(parentExpr, parent, child);
-                break;
+                return handleUnion(parentExpr, parent, child);
             default:
                 throw new IllegalStateException("Node " + parent.getType() + " is not nested");
 
@@ -194,25 +192,43 @@
         throw new IllegalStateException("Function " + fid + " should not be pushed down");
     }
 
-    private static void handleObject(AbstractFunctionCallExpression parentExpr, IVariableTypeEnvironment typeEnv,
-            AbstractComplexExpectedSchemaNode parent, IExpectedSchemaNode child) throws AlgebricksException {
+    private static IExpectedSchemaNode handleObject(AbstractFunctionCallExpression parentExpr,
+            IVariableTypeEnvironment typeEnv, AbstractComplexExpectedSchemaNode parent, IExpectedSchemaNode child)
+            throws AlgebricksException {
         String fieldName = PushdownUtil.getFieldName(parentExpr, typeEnv);
         ObjectExpectedSchemaNode objectNode = (ObjectExpectedSchemaNode) parent;
-        objectNode.addChild(fieldName, child);
+        IExpectedSchemaNode actualChild = objectNode.getChildren().get(fieldName);
+        if (actualChild == null) {
+            objectNode.addChild(fieldName, child);
+            actualChild = child;
+        } else {
+            actualChild = objectNode.replaceChild(actualChild, child);
+        }
+
+        return actualChild;
     }
 
-    private static void handleArray(AbstractComplexExpectedSchemaNode parent, IExpectedSchemaNode child) {
+    private static IExpectedSchemaNode handleArray(AbstractComplexExpectedSchemaNode parent,
+            IExpectedSchemaNode child) {
         ArrayExpectedSchemaNode arrayNode = (ArrayExpectedSchemaNode) parent;
-        arrayNode.addChild(child);
+        IExpectedSchemaNode actualChild = arrayNode.getChild();
+        if (actualChild == null) {
+            arrayNode.addChild(child);
+            actualChild = child;
+        } else {
+            actualChild = arrayNode.replaceChild(actualChild, child);
+        }
+
+        return actualChild;
     }
 
-    private static void handleUnion(AbstractFunctionCallExpression parentExpr, AbstractComplexExpectedSchemaNode parent,
-            IExpectedSchemaNode child) throws AlgebricksException {
+    private static IExpectedSchemaNode handleUnion(AbstractFunctionCallExpression parentExpr,
+            AbstractComplexExpectedSchemaNode parent, IExpectedSchemaNode child) throws AlgebricksException {
         UnionExpectedSchemaNode unionNode = (UnionExpectedSchemaNode) parent;
         ExpectedSchemaNodeType parentType = getExpectedNestedNodeType(parentExpr);
         AbstractComplexExpectedSchemaNode actualParent = unionNode.getChild(parentType);
         child.setParent(actualParent);
-        addChild(parentExpr, null, actualParent, child);
+        return addOrReplaceChild(parentExpr, null, actualParent, child);
     }
 
     private static boolean isVariable(ILogicalExpression expr) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java
index 2745a69..aa48e60 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java
@@ -21,8 +21,6 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class ObjectExpectedSchemaNode extends AbstractComplexExpectedSchemaNode {
@@ -57,9 +55,19 @@
     }
 
     @Override
-    public void replaceChild(IExpectedSchemaNode oldNode, IExpectedSchemaNode newNode) {
+    public IExpectedSchemaNode replaceChild(IExpectedSchemaNode oldNode, IExpectedSchemaNode newNode) {
         String fieldName = getChildFieldName(oldNode);
-        children.replace(fieldName, newNode);
+        IExpectedSchemaNode child = children.get(fieldName);
+        if (child.getType() == newNode.getType()) {
+            // We are trying to replace with the same node type
+            return child;
+        } else if (isChildReplaceable(child, newNode)) {
+            children.replace(fieldName, newNode);
+            return newNode;
+        }
+
+        // This should never happen, but safeguard against unexpected behavior
+        throw new IllegalStateException("Cannot replace " + child.getType() + " with " + newNode.getType());
     }
 
     public String getChildFieldName(IExpectedSchemaNode requestedChild) {
@@ -77,12 +85,4 @@
         }
         return key;
     }
-
-    protected IAType getType(IAType childType, IExpectedSchemaNode childNode, String typeName) {
-        String key = getChildFieldName(childNode);
-        IAType[] fieldTypes = { childType };
-        String[] fieldNames = { key };
-
-        return new ARecordType("typeName", fieldNames, fieldTypes, false);
-    }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/UnionExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/UnionExpectedSchemaNode.java
index 1bd316b..af6d2be 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/UnionExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/UnionExpectedSchemaNode.java
@@ -38,7 +38,7 @@
      * replace a child of a UNION type to ANY. We can only replace the union itself to ANY.
      */
     @Override
-    protected void replaceChild(IExpectedSchemaNode oldChildNode, IExpectedSchemaNode newChildNode) {
+    protected IExpectedSchemaNode replaceChild(IExpectedSchemaNode oldChildNode, IExpectedSchemaNode newChildNode) {
         throw new UnsupportedOperationException("Cannot replace a child of UNION");
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpressionToExpectedSchemaNodeVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpressionToExpectedSchemaNodeVisitor.java
index 0f2a36a..cccc936 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpressionToExpectedSchemaNodeVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/visitor/ExpressionToExpectedSchemaNodeVisitor.java
@@ -113,7 +113,7 @@
         AbstractComplexExpectedSchemaNode newParent = replaceIfNeeded(parent, expr);
         IExpectedSchemaNode myNode =
                 new AnyExpectedSchemaNode(newParent, expr.getSourceLocation(), expr.getFunctionIdentifier().getName());
-        ExpectedSchemaBuilder.addChild(expr, typeEnv, newParent, myNode);
+        ExpectedSchemaBuilder.addOrReplaceChild(expr, typeEnv, newParent, myNode);
         return myNode;
     }
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.016.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.016.query.sqlpp
new file mode 100644
index 0000000..852ef58
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.016.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT COUNT(*)
+FROM ColumnDataset c
+WHERE (ANY e IN c.val1 SATISFIES x=1 END)
+   OR (ANY e IN c.val1 SATISFIES x=2 END)
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.017.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.017.query.sqlpp
new file mode 100644
index 0000000..233a7c3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.017.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT COUNT(*)
+FROM ColumnDataset c
+WHERE (ANY e IN c.val1 SATISFIES x=1 END)
+   OR (ANY e IN c.val1 SATISFIES y=2 END)
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.018.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.018.query.sqlpp
new file mode 100644
index 0000000..bcc77de
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.018.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT COUNT(*)
+FROM ColumnDataset c
+WHERE (ANY e IN c.val1 SATISFIES x=1 END)
+   OR (ANY e IN c.val2 SATISFIES y=2 END)
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.019.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.019.query.sqlpp
new file mode 100644
index 0000000..23fcf0e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.019.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT c.f1[0].f2[0][0],
+       c.f1[1].f3[1],
+       c.f1[1].f2[1][1]
+FROM ColumnDataset c
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.query.sqlpp
new file mode 100644
index 0000000..305428d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.query.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "true";
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT COUNT(*)
+FROM ParquetDataset1 p
+WHERE (ANY e IN p.val1 SATISFIES x=1 END)
+   OR (ANY e IN p.val1 SATISFIES x=2 END)
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.query.sqlpp
new file mode 100644
index 0000000..884e43b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.query.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "true";
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT COUNT(*)
+FROM ParquetDataset1 p
+WHERE (ANY e IN p.val1 SATISFIES x=1 END)
+   OR (ANY e IN p.val1 SATISFIES y=2 END)
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/pushdown-plans/pushdown-plans.10.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/pushdown-plans/pushdown-plans.10.query.sqlpp
new file mode 100644
index 0000000..cde8e27
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/pushdown-plans/pushdown-plans.10.query.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "true";
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT COUNT(*)
+FROM ParquetDataset1 p
+WHERE (ANY e IN p.val1 SATISFIES x=1 END)
+   OR (ANY e IN p.val2 SATISFIES y=2 END)
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/pushdown-plans/pushdown-plans.11.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/pushdown-plans/pushdown-plans.11.query.sqlpp
new file mode 100644
index 0000000..42c299c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/parquet/pushdown-plans/pushdown-plans.11.query.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+SET `compiler.external.field.pushdown` "true";
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT p.f1[0].f2[0][0],
+       p.f1[1].f3[1],
+       p.f1[1].f2[1][1]
+FROM ParquetDataset1 p
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.016.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.016.plan
new file mode 100644
index 0000000..fe8cca9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.016.plan
@@ -0,0 +1,58 @@
+distribute result [$$65] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    project ([$$65]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      assign [$$65] <- [{"$1": $$67}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |UNPARTITIONED|
+        aggregate [$$67] <- [agg-sql-sum($$72)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$72] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- AGGREGATE  |PARTITIONED|
+              select (or($$53, $$57)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_SELECT  |PARTITIONED|
+                project ([$$53, $$57]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  subplan {
+                            aggregate [$$57] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- AGGREGATE  |LOCAL|
+                              select (eq($$71, 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_SELECT  |LOCAL|
+                                assign [$$71] <- [$$e.getField("x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |LOCAL|
+                                  unnest $$e <- scan-collection($$68) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- UNNEST  |LOCAL|
+                                    nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                         } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- SUBPLAN  |PARTITIONED|
+                    subplan {
+                              aggregate [$$53] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- AGGREGATE  |LOCAL|
+                                select (eq($$69, 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_SELECT  |LOCAL|
+                                  assign [$$69] <- [$$e.getField("x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ASSIGN  |LOCAL|
+                                    unnest $$e <- scan-collection($$68) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- UNNEST  |LOCAL|
+                                      nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                           } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- SUBPLAN  |PARTITIONED|
+                      project ([$$68]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$68] <- [$$c.getField("val1")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$c]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$66, $$c] <- test.ColumnDataset project ({val1:[{x:any}]}) filter on: or(eq(scan-collection($$c.getField("val1")).getField("x"), 1), eq(scan-collection($$c.getField("val1")).getField("x"), 2)) range-filter on: or(eq(scan-collection($$c.getField("val1")).getField("x"), 1), eq(scan-collection($$c.getField("val1")).getField("x"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.017.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.017.plan
new file mode 100644
index 0000000..3d9bcfe
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.017.plan
@@ -0,0 +1,58 @@
+distribute result [$$65] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    project ([$$65]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      assign [$$65] <- [{"$1": $$67}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |UNPARTITIONED|
+        aggregate [$$67] <- [agg-sql-sum($$72)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$72] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- AGGREGATE  |PARTITIONED|
+              select (or($$53, $$57)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_SELECT  |PARTITIONED|
+                project ([$$53, $$57]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  subplan {
+                            aggregate [$$57] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- AGGREGATE  |LOCAL|
+                              select (eq($$71, 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_SELECT  |LOCAL|
+                                assign [$$71] <- [$$e.getField("y")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |LOCAL|
+                                  unnest $$e <- scan-collection($$68) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- UNNEST  |LOCAL|
+                                    nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                         } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- SUBPLAN  |PARTITIONED|
+                    subplan {
+                              aggregate [$$53] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- AGGREGATE  |LOCAL|
+                                select (eq($$69, 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_SELECT  |LOCAL|
+                                  assign [$$69] <- [$$e.getField("x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ASSIGN  |LOCAL|
+                                    unnest $$e <- scan-collection($$68) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- UNNEST  |LOCAL|
+                                      nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                           } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- SUBPLAN  |PARTITIONED|
+                      project ([$$68]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$68] <- [$$c.getField("val1")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$c]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$66, $$c] <- test.ColumnDataset project ({val1:[{x:any,y:any}]}) filter on: or(eq(scan-collection($$c.getField("val1")).getField("x"), 1), eq(scan-collection($$c.getField("val1")).getField("y"), 2)) range-filter on: or(eq(scan-collection($$c.getField("val1")).getField("x"), 1), eq(scan-collection($$c.getField("val1")).getField("y"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.018.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.018.plan
new file mode 100644
index 0000000..5e6a214
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.018.plan
@@ -0,0 +1,60 @@
+distribute result [$$65] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    project ([$$65]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      assign [$$65] <- [{"$1": $$67}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |UNPARTITIONED|
+        aggregate [$$67] <- [agg-sql-sum($$72)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$72] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- AGGREGATE  |PARTITIONED|
+              select (or($$53, $$57)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_SELECT  |PARTITIONED|
+                project ([$$53, $$57]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  subplan {
+                            aggregate [$$57] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- AGGREGATE  |LOCAL|
+                              select (eq($$71, 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_SELECT  |LOCAL|
+                                assign [$$71] <- [$$e.getField("y")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |LOCAL|
+                                  unnest $$e <- scan-collection($$70) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- UNNEST  |LOCAL|
+                                    nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                         } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- SUBPLAN  |PARTITIONED|
+                    project ([$$70, $$53]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      subplan {
+                                aggregate [$$53] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- AGGREGATE  |LOCAL|
+                                  select (eq($$69, 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- STREAM_SELECT  |LOCAL|
+                                    assign [$$69] <- [$$e.getField("x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ASSIGN  |LOCAL|
+                                      unnest $$e <- scan-collection($$68) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- UNNEST  |LOCAL|
+                                        nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                             } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- SUBPLAN  |PARTITIONED|
+                        project ([$$70, $$68]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          assign [$$70, $$68] <- [$$c.getField("val2"), $$c.getField("val1")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |PARTITIONED|
+                            project ([$$c]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                data-scan []<-[$$66, $$c] <- test.ColumnDataset project ({val2:[{y:any}],val1:[{x:any}]}) filter on: or(eq(scan-collection($$c.getField("val1")).getField("x"), 1), eq(scan-collection($$c.getField("val2")).getField("y"), 2)) range-filter on: or(eq(scan-collection($$c.getField("val1")).getField("x"), 1), eq(scan-collection($$c.getField("val2")).getField("y"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.019.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.019.plan
new file mode 100644
index 0000000..9089f9f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.019.plan
@@ -0,0 +1,24 @@
+distribute result [$$31] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$31]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$31] <- [{"$1": get-item(get-item(get-item($$32, 0).getField("f2"), 0), 0), "$2": get-item($$47.getField("f3"), 1), "$3": get-item(get-item($$47.getField("f2"), 1), 1)}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        assign [$$47] <- [get-item($$32, 1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- ASSIGN  |PARTITIONED|
+          project ([$$32]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STREAM_PROJECT  |PARTITIONED|
+            assign [$$32] <- [$$c.getField("f1")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ASSIGN  |PARTITIONED|
+              project ([$$c]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  data-scan []<-[$$33, $$c] <- test.ColumnDataset project ({f1:[{f2:[[any]],f3:[any]}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- DATASOURCE_SCAN  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan
new file mode 100644
index 0000000..94921f4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan
@@ -0,0 +1,56 @@
+distribute result [$$65] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    project ([$$65]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      assign [$$65] <- [{"$1": $$66}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |UNPARTITIONED|
+        aggregate [$$66] <- [agg-sql-sum($$75)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$75] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- AGGREGATE  |PARTITIONED|
+              select (or($$53, $$57)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_SELECT  |PARTITIONED|
+                project ([$$53, $$57]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  subplan {
+                            aggregate [$$57] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- AGGREGATE  |LOCAL|
+                              select (eq($$70, 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_SELECT  |LOCAL|
+                                assign [$$70] <- [$$e.getField("x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |LOCAL|
+                                  unnest $$e <- scan-collection($$67) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- UNNEST  |LOCAL|
+                                    nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                         } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- SUBPLAN  |PARTITIONED|
+                    subplan {
+                              aggregate [$$53] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- AGGREGATE  |LOCAL|
+                                select (eq($$68, 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_SELECT  |LOCAL|
+                                  assign [$$68] <- [$$e.getField("x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ASSIGN  |LOCAL|
+                                    unnest $$e <- scan-collection($$67) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- UNNEST  |LOCAL|
+                                      nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                           } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- SUBPLAN  |PARTITIONED|
+                      project ([$$67]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$67] <- [$$p.getField("val1")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ASSIGN  |PARTITIONED|
+                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- DATASOURCE_SCAN  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan
new file mode 100644
index 0000000..1de2926
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan
@@ -0,0 +1,56 @@
+distribute result [$$65] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    project ([$$65]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      assign [$$65] <- [{"$1": $$66}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |UNPARTITIONED|
+        aggregate [$$66] <- [agg-sql-sum($$75)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$75] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- AGGREGATE  |PARTITIONED|
+              select (or($$53, $$57)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_SELECT  |PARTITIONED|
+                project ([$$53, $$57]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  subplan {
+                            aggregate [$$57] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- AGGREGATE  |LOCAL|
+                              select (eq($$70, 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_SELECT  |LOCAL|
+                                assign [$$70] <- [$$e.getField("y")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |LOCAL|
+                                  unnest $$e <- scan-collection($$67) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- UNNEST  |LOCAL|
+                                    nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                         } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- SUBPLAN  |PARTITIONED|
+                    subplan {
+                              aggregate [$$53] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- AGGREGATE  |LOCAL|
+                                select (eq($$68, 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_SELECT  |LOCAL|
+                                  assign [$$68] <- [$$e.getField("x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ASSIGN  |LOCAL|
+                                    unnest $$e <- scan-collection($$67) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- UNNEST  |LOCAL|
+                                      nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                           } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- SUBPLAN  |PARTITIONED|
+                      project ([$$67]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$67] <- [$$p.getField("val1")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ASSIGN  |PARTITIONED|
+                          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any,y:any}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- DATASOURCE_SCAN  |PARTITIONED|
+                              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.10.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.10.plan
new file mode 100644
index 0000000..b7ac895
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.10.plan
@@ -0,0 +1,58 @@
+distribute result [$$65] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    project ([$$65]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      assign [$$65] <- [{"$1": $$66}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |UNPARTITIONED|
+        aggregate [$$66] <- [agg-sql-sum($$75)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$75] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- AGGREGATE  |PARTITIONED|
+              select (or($$53, $$57)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_SELECT  |PARTITIONED|
+                project ([$$53, $$57]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  subplan {
+                            aggregate [$$57] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- AGGREGATE  |LOCAL|
+                              select (eq($$70, 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_SELECT  |LOCAL|
+                                assign [$$70] <- [$$e.getField("y")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |LOCAL|
+                                  unnest $$e <- scan-collection($$69) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- UNNEST  |LOCAL|
+                                    nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                         } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- SUBPLAN  |PARTITIONED|
+                    project ([$$69, $$53]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      subplan {
+                                aggregate [$$53] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- AGGREGATE  |LOCAL|
+                                  select (eq($$68, 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- STREAM_SELECT  |LOCAL|
+                                    assign [$$68] <- [$$e.getField("x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ASSIGN  |LOCAL|
+                                      unnest $$e <- scan-collection($$67) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- UNNEST  |LOCAL|
+                                        nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                             } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- SUBPLAN  |PARTITIONED|
+                        project ([$$69, $$67]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          assign [$$69, $$67] <- [$$p.getField("val2"), $$p.getField("val1")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- ASSIGN  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$p] <- test.ParquetDataset1 project ({val2:[{y:any}],val1:[{x:any}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.11.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.11.plan
new file mode 100644
index 0000000..bb6814a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.11.plan
@@ -0,0 +1,22 @@
+distribute result [$$31] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$31]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$31] <- [{"$1": get-item(get-item(get-item($$32, 0).getField("f2"), 0), 0), "$2": get-item($$46.getField("f3"), 1), "$3": get-item(get-item($$46.getField("f2"), 1), 1)}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ASSIGN  |PARTITIONED|
+        assign [$$46] <- [get-item($$32, 1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- ASSIGN  |PARTITIONED|
+          project ([$$32]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STREAM_PROJECT  |PARTITIONED|
+            assign [$$32] <- [$$p.getField("f1")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ASSIGN  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                data-scan []<-[$$p] <- test.ParquetDataset1 project ({f1:[{f2:[[any]],f3:[any]}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.016.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.016.plan
new file mode 100644
index 0000000..a06989f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.016.plan
@@ -0,0 +1,58 @@
+distribute result [$$65] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    project ([$$65]) [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      assign [$$65] <- [{"$1": $$67}] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+      -- ASSIGN  |UNPARTITIONED|
+        aggregate [$$67] <- [agg-sql-sum($$72)] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$72] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+            -- AGGREGATE  |PARTITIONED|
+              select (or($$53, $$57)) [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+              -- STREAM_SELECT  |PARTITIONED|
+                project ([$$53, $$57]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  subplan {
+                            aggregate [$$57] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- AGGREGATE  |LOCAL|
+                              select (eq($$71, 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_SELECT  |LOCAL|
+                                assign [$$71] <- [$$e.getField("x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |LOCAL|
+                                  unnest $$e <- scan-collection($$68) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- UNNEST  |LOCAL|
+                                    nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                         } [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                  -- SUBPLAN  |PARTITIONED|
+                    subplan {
+                              aggregate [$$53] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- AGGREGATE  |LOCAL|
+                                select (eq($$69, 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_SELECT  |LOCAL|
+                                  assign [$$69] <- [$$e.getField("x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ASSIGN  |LOCAL|
+                                    unnest $$e <- scan-collection($$68) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- UNNEST  |LOCAL|
+                                      nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                           } [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                    -- SUBPLAN  |PARTITIONED|
+                      project ([$$68]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$68] <- [$$c.getField("val1")] [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$c]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$66, $$c] <- test.ColumnDataset project ({val1:[{x:any}]}) filter on: or(eq(scan-collection($$c.getField("val1")).getField("x"), 1), eq(scan-collection($$c.getField("val1")).getField("x"), 2)) range-filter on: or(eq(scan-collection($$c.getField("val1")).getField("x"), 1), eq(scan-collection($$c.getField("val1")).getField("x"), 2)) [cardinality: 2.0, op-cost: 2.0, total-cost: 2.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.017.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.017.plan
new file mode 100644
index 0000000..d108091
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.017.plan
@@ -0,0 +1,58 @@
+distribute result [$$65] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    project ([$$65]) [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      assign [$$65] <- [{"$1": $$67}] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+      -- ASSIGN  |UNPARTITIONED|
+        aggregate [$$67] <- [agg-sql-sum($$72)] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$72] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+            -- AGGREGATE  |PARTITIONED|
+              select (or($$53, $$57)) [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+              -- STREAM_SELECT  |PARTITIONED|
+                project ([$$53, $$57]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  subplan {
+                            aggregate [$$57] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- AGGREGATE  |LOCAL|
+                              select (eq($$71, 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_SELECT  |LOCAL|
+                                assign [$$71] <- [$$e.getField("y")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |LOCAL|
+                                  unnest $$e <- scan-collection($$68) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- UNNEST  |LOCAL|
+                                    nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                         } [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                  -- SUBPLAN  |PARTITIONED|
+                    subplan {
+                              aggregate [$$53] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- AGGREGATE  |LOCAL|
+                                select (eq($$69, 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- STREAM_SELECT  |LOCAL|
+                                  assign [$$69] <- [$$e.getField("x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ASSIGN  |LOCAL|
+                                    unnest $$e <- scan-collection($$68) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- UNNEST  |LOCAL|
+                                      nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                           } [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                    -- SUBPLAN  |PARTITIONED|
+                      project ([$$68]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$$68] <- [$$c.getField("val1")] [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                        -- ASSIGN  |PARTITIONED|
+                          project ([$$c]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                          -- STREAM_PROJECT  |PARTITIONED|
+                            exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              data-scan []<-[$$66, $$c] <- test.ColumnDataset project ({val1:[{x:any,y:any}]}) filter on: or(eq(scan-collection($$c.getField("val1")).getField("x"), 1), eq(scan-collection($$c.getField("val1")).getField("y"), 2)) range-filter on: or(eq(scan-collection($$c.getField("val1")).getField("x"), 1), eq(scan-collection($$c.getField("val1")).getField("y"), 2)) [cardinality: 2.0, op-cost: 2.0, total-cost: 2.0]
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.018.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.018.plan
new file mode 100644
index 0000000..6f485c7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.018.plan
@@ -0,0 +1,60 @@
+distribute result [$$65] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    project ([$$65]) [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+    -- STREAM_PROJECT  |UNPARTITIONED|
+      assign [$$65] <- [{"$1": $$67}] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+      -- ASSIGN  |UNPARTITIONED|
+        aggregate [$$67] <- [agg-sql-sum($$72)] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+        -- AGGREGATE  |UNPARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+            aggregate [$$72] <- [agg-sql-count(1)] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+            -- AGGREGATE  |PARTITIONED|
+              select (or($$53, $$57)) [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+              -- STREAM_SELECT  |PARTITIONED|
+                project ([$$53, $$57]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                -- STREAM_PROJECT  |PARTITIONED|
+                  subplan {
+                            aggregate [$$57] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- AGGREGATE  |LOCAL|
+                              select (eq($$71, 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_SELECT  |LOCAL|
+                                assign [$$71] <- [$$e.getField("y")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |LOCAL|
+                                  unnest $$e <- scan-collection($$70) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- UNNEST  |LOCAL|
+                                    nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
+                         } [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                  -- SUBPLAN  |PARTITIONED|
+                    project ([$$70, $$53]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      subplan {
+                                aggregate [$$53] <- [non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- AGGREGATE  |LOCAL|
+                                  select (eq($$69, 1)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- STREAM_SELECT  |LOCAL|
+                                    assign [$$69] <- [$$e.getField("x")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ASSIGN  |LOCAL|
+                                      unnest $$e <- scan-collection($$68) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- UNNEST  |LOCAL|
+                                        nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
+                             } [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                      -- SUBPLAN  |PARTITIONED|
+                        project ([$$70, $$68]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          assign [$$70, $$68] <- [$$c.getField("val2"), $$c.getField("val1")] [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                          -- ASSIGN  |PARTITIONED|
+                            project ([$$c]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                data-scan []<-[$$66, $$c] <- test.ColumnDataset project ({val2:[{y:any}],val1:[{x:any}]}) filter on: or(eq(scan-collection($$c.getField("val1")).getField("x"), 1), eq(scan-collection($$c.getField("val2")).getField("y"), 2)) range-filter on: or(eq(scan-collection($$c.getField("val1")).getField("x"), 1), eq(scan-collection($$c.getField("val2")).getField("y"), 2)) [cardinality: 2.0, op-cost: 2.0, total-cost: 2.0]
+                                -- DATASOURCE_SCAN  |PARTITIONED|
+                                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.019.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.019.plan
new file mode 100644
index 0000000..fb96640
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.019.plan
@@ -0,0 +1,24 @@
+distribute result [$$31] [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$31]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      assign [$$31] <- [{"$1": get-item(get-item(get-item($$32, 0).getField("f2"), 0), 0), "$2": get-item($$47.getField("f3"), 1), "$3": get-item(get-item($$47.getField("f2"), 1), 1)}] [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+      -- ASSIGN  |PARTITIONED|
+        assign [$$47] <- [get-item($$32, 1)] [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+        -- ASSIGN  |PARTITIONED|
+          project ([$$32]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+          -- STREAM_PROJECT  |PARTITIONED|
+            assign [$$32] <- [$$c.getField("f1")] [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+            -- ASSIGN  |PARTITIONED|
+              project ([$$c]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  data-scan []<-[$$33, $$c] <- test.ColumnDataset project ({f1:[{f2:[[any]],f3:[any]}]}) [cardinality: 2.0, op-cost: 2.0, total-cost: 2.0]
+                  -- DATASOURCE_SCAN  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|