[ASTERIXDB-3471][COMP] Fix pushing down repeated expressions

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
When computing an expected schema node for a repeated
expression, the compiler creates a new node of type
ANY and tries to add or replace it. The issue is that
the newly created ANY node for the repeated expression
is replaceable and that fails the condition of
adding/replacing a node -- hence the thrown exception.
The add/replace node should ignore the newly created
ANY node if a node exists for the same expression.

Ext-ref: MB-62906
Change-Id: I6f25efc5e247d474ebbb6dba0b951aa0fe897ea3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18546
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractComplexExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractComplexExpectedSchemaNode.java
index 7b26cf6..4ca0dd8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractComplexExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractComplexExpectedSchemaNode.java
@@ -105,6 +105,13 @@
                 || newType == ExpectedSchemaNodeType.UNION || !newNode.allowsReplacing());
     }
 
+    /**
+     * @return true if {@code newNode} is a replaceable {@link ExpectedSchemaNodeType#ANY} node, false otherwise
+     */
+    protected boolean isReplaceableAny(IExpectedSchemaNode newNode) {
+        return newNode.getType() == ExpectedSchemaNodeType.ANY && newNode.allowsReplacing();
+    }
+
     public static AbstractComplexExpectedSchemaNode createNestedNode(ExpectedSchemaNodeType type,
             AbstractComplexExpectedSchemaNode parent, SourceLocation sourceLocation, String functionName) {
         switch (type) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java
index 57e0d98..e647862 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java
@@ -48,8 +48,8 @@
 
     @Override
     public IExpectedSchemaNode replaceChild(IExpectedSchemaNode oldNode, IExpectedSchemaNode newNode) {
-        if (child.getType() == newNode.getType()) {
-            // We are trying to replace with the same node type
+        if (child.getType() == newNode.getType() || isReplaceableAny(newNode)) {
+            // We are trying to replace with the same node type, or with a replaceable any, ignore.
             return child;
         } else if (isChildReplaceable(child, newNode)) {
             child = newNode;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java
index aa48e60..a6d4399 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java
@@ -58,8 +58,8 @@
     public IExpectedSchemaNode replaceChild(IExpectedSchemaNode oldNode, IExpectedSchemaNode newNode) {
         String fieldName = getChildFieldName(oldNode);
         IExpectedSchemaNode child = children.get(fieldName);
-        if (child.getType() == newNode.getType()) {
-            // We are trying to replace with the same node type
+        if (child.getType() == newNode.getType() || isReplaceableAny(newNode)) {
+            // We are trying to replace with the same node type, or with a replaceable any, ignore.
             return child;
         } else if (isChildReplaceable(child, newNode)) {
             children.replace(fieldName, newNode);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.022.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.022.query.sqlpp
new file mode 100644
index 0000000..904651b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.022.query.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- See ASTERIXDB-3409
+USE test;
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT ht.name, ht.phone,
+ARRAY_COUNT(ht.reviews) AS num_reviews,
+(SELECT VALUE MIN(ratings.Overall) FROM ht.reviews)[0] AS overall_avg,
+(SELECT VALUE ratings.Overall FROM ht.reviews) AS overall_reviews
+FROM ColumnDataset ht
+WHERE ht.city = 'Los Angeles'
+ORDER BY overall_avg DESC
+LIMIT 5;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.023.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.023.query.sqlpp
new file mode 100644
index 0000000..a841c75
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/pushdown/other-pushdowns/other-pushdowns.023.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- See ASTERIXDB-3471
+USE test;
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT v.payload
+FROM ColumnDataset o
+WHERE v.`type`="WeMo"
+LIMIT 10;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.022.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.022.plan
new file mode 100644
index 0000000..05bf25c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.022.plan
@@ -0,0 +1,68 @@
+distribute result [$$84] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    limit 5 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_LIMIT  |UNPARTITIONED|
+      project ([$$84]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- STREAM_PROJECT  |PARTITIONED|
+        assign [$$84] <- [{"name": $$93, "phone": $$94, "num_reviews": sql-count($$91), "overall_avg": $#2, "overall_reviews": $$83}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- ASSIGN  |PARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- SORT_MERGE_EXCHANGE [$#2(DESC) ]  |PARTITIONED|
+            limit 5 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- STREAM_LIMIT  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                order (topK: 5) (DESC, $#2) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- STABLE_SORT [topK: 5] [$#2(DESC)]  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    subplan {
+                              aggregate [$$83] <- [listify($$82)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- AGGREGATE  |LOCAL|
+                                assign [$$82] <- [$$98.getField("Overall")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |LOCAL|
+                                  assign [$$98] <- [$$reviews.getField("ratings")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ASSIGN  |LOCAL|
+                                    unnest $$reviews <- scan-collection($$91) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- UNNEST  |LOCAL|
+                                      nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                           } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- SUBPLAN  |PARTITIONED|
+                      project ([$$93, $$94, $$91, $#2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$#2] <- [get-item($$72, 0)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ASSIGN  |PARTITIONED|
+                          subplan {
+                                    aggregate [$$72] <- [listify($$86)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- AGGREGATE  |LOCAL|
+                                      aggregate [$$86] <- [agg-sql-min($$69)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- AGGREGATE  |LOCAL|
+                                        assign [$$69] <- [$$97.getField("Overall")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- ASSIGN  |LOCAL|
+                                          assign [$$97] <- [$$89.getField("ratings")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- ASSIGN  |LOCAL|
+                                            unnest $$89 <- scan-collection($$91) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- UNNEST  |LOCAL|
+                                              nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                 } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- SUBPLAN  |PARTITIONED|
+                            project ([$$93, $$94, $$91]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              select (eq($$ht.getField("city"), "Los Angeles")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- STREAM_SELECT  |PARTITIONED|
+                                assign [$$94, $$91, $$93] <- [$$ht.getField("phone"), $$ht.getField("reviews"), $$ht.getField("name")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |PARTITIONED|
+                                  project ([$$ht]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      data-scan []<-[$$85, $$ht] <- test.ColumnDataset project ({reviews:[{ratings:{Overall:any}}],phone:any,city:any,name:any}) filter on: eq($$ht.getField("city"), "Los Angeles") range-filter on: eq($$ht.getField("city"), "Los Angeles") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.023.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.023.plan
new file mode 100644
index 0000000..7ddc1a9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/pushdown/other-pushdowns/other-pushdowns.023.plan
@@ -0,0 +1,28 @@
+distribute result [$$19] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    limit 10 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_LIMIT  |UNPARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        project ([$$19]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          assign [$$19] <- [{"payload": $$20.getField("payload")}] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ASSIGN  |PARTITIONED|
+            limit 10 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- STREAM_LIMIT  |PARTITIONED|
+              project ([$$20]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                assign [$$20] <- [$$o.getField("v")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- ASSIGN  |PARTITIONED|
+                  project ([$$o]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      data-scan []<-[$$21, $$o] <- test.ColumnDataset condition (eq($$o.getField("v").getField("type"), "WeMo")) limit 10 project ({v:{payload:any,type:any}}) filter on: eq($$o.getField("v").getField("type"), "WeMo") range-filter on: eq($$o.getField("v").getField("type"), "WeMo") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- DATASOURCE_SCAN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.022.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.022.plan
new file mode 100644
index 0000000..9997dae
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.022.plan
@@ -0,0 +1,68 @@
+distribute result [$$84] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    limit 5 [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+    -- STREAM_LIMIT  |UNPARTITIONED|
+      project ([$$84]) [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+      -- STREAM_PROJECT  |PARTITIONED|
+        assign [$$84] <- [{"name": $$93, "phone": $$94, "num_reviews": sql-count($$91), "overall_avg": $#2, "overall_reviews": $$83}] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+        -- ASSIGN  |PARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+          -- SORT_MERGE_EXCHANGE [$#2(DESC) ]  |PARTITIONED|
+            limit 5 [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+            -- STREAM_LIMIT  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                order (topK: 5) (DESC, $#2) [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+                -- STABLE_SORT [topK: 5] [$#2(DESC)]  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    subplan {
+                              aggregate [$$83] <- [listify($$82)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- AGGREGATE  |LOCAL|
+                                assign [$$82] <- [$$98.getField("Overall")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                -- ASSIGN  |LOCAL|
+                                  assign [$$98] <- [$$reviews.getField("ratings")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                  -- ASSIGN  |LOCAL|
+                                    unnest $$reviews <- scan-collection($$91) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- UNNEST  |LOCAL|
+                                      nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                           } [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+                    -- SUBPLAN  |PARTITIONED|
+                      project ([$$93, $$94, $$91, $#2]) [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        assign [$#2] <- [get-item($$72, 0)] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+                        -- ASSIGN  |PARTITIONED|
+                          subplan {
+                                    aggregate [$$72] <- [listify($$86)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                    -- AGGREGATE  |LOCAL|
+                                      aggregate [$$86] <- [agg-sql-min($$69)] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                      -- AGGREGATE  |LOCAL|
+                                        assign [$$69] <- [$$97.getField("Overall")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- ASSIGN  |LOCAL|
+                                          assign [$$97] <- [$$89.getField("ratings")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- ASSIGN  |LOCAL|
+                                            unnest $$89 <- scan-collection($$91) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                            -- UNNEST  |LOCAL|
+                                              nested tuple source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                 } [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+                          -- SUBPLAN  |PARTITIONED|
+                            project ([$$93, $$94, $$91]) [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              select (eq($$ht.getField("city"), "Los Angeles")) [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+                              -- STREAM_SELECT  |PARTITIONED|
+                                assign [$$94, $$91, $$93] <- [$$ht.getField("phone"), $$ht.getField("reviews"), $$ht.getField("name")] [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                                -- ASSIGN  |PARTITIONED|
+                                  project ([$$ht]) [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                                  -- STREAM_PROJECT  |PARTITIONED|
+                                    exchange [cardinality: 2.0, op-cost: 0.0, total-cost: 2.0]
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      data-scan []<-[$$85, $$ht] <- test.ColumnDataset project ({reviews:[{ratings:{Overall:any}}],phone:any,city:any,name:any}) filter on: eq($$ht.getField("city"), "Los Angeles") range-filter on: eq($$ht.getField("city"), "Los Angeles") [cardinality: 2.0, op-cost: 2.0, total-cost: 2.0]
+                                      -- DATASOURCE_SCAN  |PARTITIONED|
+                                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.023.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.023.plan
new file mode 100644
index 0000000..f685929
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/pushdown/other-pushdowns/other-pushdowns.023.plan
@@ -0,0 +1,28 @@
+distribute result [$$19] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+-- DISTRIBUTE_RESULT  |UNPARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+  -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+    limit 10 [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+    -- STREAM_LIMIT  |UNPARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+      -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+        project ([$$19]) [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+        -- STREAM_PROJECT  |PARTITIONED|
+          assign [$$19] <- [{"payload": $$20.getField("payload")}] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+          -- ASSIGN  |PARTITIONED|
+            limit 10 [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+            -- STREAM_LIMIT  |PARTITIONED|
+              project ([$$20]) [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+              -- STREAM_PROJECT  |PARTITIONED|
+                assign [$$20] <- [$$o.getField("v")] [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+                -- ASSIGN  |PARTITIONED|
+                  project ([$$o]) [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 2.0]
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      data-scan []<-[$$21, $$o] <- test.ColumnDataset condition (eq($$o.getField("v").getField("type"), "WeMo")) limit 10 project ({v:{payload:any,type:any}}) filter on: eq($$o.getField("v").getField("type"), "WeMo") range-filter on: eq($$o.getField("v").getField("type"), "WeMo") [cardinality: 0.0, op-cost: 2.0, total-cost: 2.0]
+                      -- DATASOURCE_SCAN  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|