[ASTERIXDB-3582][COMP] Fix expected schema tree generation

- user model changes: no
- storage format changes: no
- interface changes: no

Ext-ref: MB-65792
Change-Id: Ic04a618c7aa182af4b1f4b7ade64d687147ef705
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19532
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractComplexExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractComplexExpectedSchemaNode.java
index 34ff582..27933c4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractComplexExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/AbstractComplexExpectedSchemaNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.optimizer.rules.pushdown.schema;
 
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 
@@ -86,6 +87,9 @@
 
     protected abstract IExpectedSchemaNode replaceChild(IExpectedSchemaNode oldNode, IExpectedSchemaNode newNode);
 
+    protected abstract IExpectedSchemaNode getChildNode(AbstractFunctionCallExpression parentExpr)
+            throws AlgebricksException;
+
     /**
      * A child is replaceable if
      * - child is allowed to be replaced
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java
index b8135b7..5cab1c0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ArrayExpectedSchemaNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.optimizer.rules.pushdown.schema;
 
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 
@@ -60,4 +61,9 @@
         // This should never happen, but safeguard against unexpected behavior
         throw new IllegalStateException("Cannot replace " + child.getType() + " with " + newNode.getType());
     }
+
+    @Override
+    protected IExpectedSchemaNode getChildNode(AbstractFunctionCallExpression parentExpr) throws AlgebricksException {
+        return child;
+    }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ExpectedSchemaBuilder.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ExpectedSchemaBuilder.java
index f4b6cb0..70bbef1 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ExpectedSchemaBuilder.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ExpectedSchemaBuilder.java
@@ -65,10 +65,12 @@
         AbstractComplexExpectedSchemaNode parent = (AbstractComplexExpectedSchemaNode) buildNestedNode(expr, typeEnv);
         if (parent != null) {
             IExpectedSchemaNode leaf = new AnyExpectedSchemaNode(parent, expr);
+            IExpectedSchemaNode oldChildNode = parent.getChildNode(expr);
             IExpectedSchemaNode actualNode = addOrReplaceChild(expr, typeEnv, parent, leaf);
             if (producedVar != null) {
                 //Register the node if a variable is produced
                 varToNode.put(producedVar, actualNode);
+                updateVarToNodeRef(oldChildNode, actualNode);
             }
         }
 
@@ -93,10 +95,44 @@
                     (AnyExpectedSchemaNode) node.replaceIfNeeded(ExpectedSchemaNodeType.ANY, null, null);
             // make the leaf node irreplaceable
             leafNode.preventReplacing();
+            // if the node has been replaced by the leafNode,
+            // check for the variable that is associated with the node to be replaced.
             varToNode.put(variable, leafNode);
+            updateVarToNodeRef(node, leafNode);
         }
     }
 
+    /**
+     * Updates variable references when schema paths point to the same field access.
+     *
+     * Example Query:
+     * SELECT i
+     * ... FROM orders AS o, o.items AS i
+     * ... WHERE (SOME li IN o.items SATISFIES li.qty < 3)
+     * ... AND i.qty >= 100;
+     *
+     * In this query, both 'i' and 'li' reference the same field access (o.items).
+     * The schema paths evolve as follows:
+     * 1. i.qty creates path: {"items": [{qty: any}]}
+     * 2. li.qty references the same qty node
+     * 3. When processing 'i', we replace i.qty with a broader path: {"items": [any]}
+     *
+     * Since both variables reference the same field, we need to update all references
+     * to the old node (qty-specific) with the new node (array-level) in varToNode cache.
+     *
+     * @param oldNode The original node being replaced (e.g., qty-specific node)
+     * @param newNode The new node replacing it (e.g., array-level node)
+     */
+    private void updateVarToNodeRef(IExpectedSchemaNode oldNode, IExpectedSchemaNode newNode) {
+        // Skip if nodes are null, identical, or oldNode is a root node
+        if (oldNode == null || oldNode == newNode || RootExpectedSchemaNode.isPreDefinedRootNode(oldNode)) {
+            return;
+        }
+
+        // Update all variable references from oldNode to newNode
+        varToNode.replaceAll((var, node) -> node == oldNode ? newNode : node);
+    }
+
     public boolean isVariableRegistered(LogicalVariable variable) {
         return varToNode.containsKey(variable);
     }
@@ -231,6 +267,7 @@
         IExpectedSchemaNode newNode = oldNode.replaceIfNeeded(varExpectedType, parentExpression, expression);
         //Map the sourceVar to the node
         varToNode.put(sourceVar, newNode);
+        updateVarToNodeRef(oldNode, newNode);
         return newNode;
     }
 
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java
index e220437..5f33aed 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/ObjectExpectedSchemaNode.java
@@ -22,6 +22,7 @@
 import java.util.Map;
 
 import org.apache.asterix.metadata.utils.PushdownUtil;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -88,12 +89,20 @@
 
     public String getChildFieldName(IExpectedSchemaNode requestedChild) {
         AbstractFunctionCallExpression expr = requestedChild.getParentExpression();
-        int fieldNameId = PushdownUtil.getFieldNameId(requestedChild.getParentExpression());
+        return getChildFieldName(expr);
+    }
 
+    public String getChildFieldName(AbstractFunctionCallExpression parentExpr) {
+        int fieldNameId = PushdownUtil.getFieldNameId(parentExpr);
         if (fieldNameId > -1) {
             return fieldIdToFieldName.get(fieldNameId);
         }
+        return PushdownUtil.getFieldName(parentExpr);
+    }
 
-        return PushdownUtil.getFieldName(expr);
+    @Override
+    public IExpectedSchemaNode getChildNode(AbstractFunctionCallExpression parentExpr) throws AlgebricksException {
+        String fieldName = getChildFieldName(parentExpr);
+        return children.get(fieldName);
     }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/RootExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/RootExpectedSchemaNode.java
index e72e997..354dfea 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/RootExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/RootExpectedSchemaNode.java
@@ -77,4 +77,8 @@
     public boolean isAllFields() {
         return rootType == ALL_FIELDS_ROOT || rootType == ALL_FIELDS_ROOT_IRREPLACEABLE;
     }
+
+    public static boolean isPreDefinedRootNode(IExpectedSchemaNode node) {
+        return node == ALL_FIELDS_ROOT_NODE || node == ALL_FIELDS_ROOT_IRREPLACEABLE_NODE || node == EMPTY_ROOT_NODE;
+    }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/UnionExpectedSchemaNode.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/UnionExpectedSchemaNode.java
index a15e359..54e090f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/UnionExpectedSchemaNode.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/schema/UnionExpectedSchemaNode.java
@@ -18,10 +18,13 @@
  */
 package org.apache.asterix.optimizer.rules.pushdown.schema;
 
+import static org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaBuilder.getExpectedNestedNodeType;
+
 import java.util.EnumMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 
@@ -91,4 +94,11 @@
         }
         return this;
     }
+
+    @Override
+    protected IExpectedSchemaNode getChildNode(AbstractFunctionCallExpression parentExpr) throws AlgebricksException {
+        ExpectedSchemaNodeType parentType = getExpectedNestedNodeType(parentExpr);
+        AbstractComplexExpectedSchemaNode actualParent = getChild(parentType);
+        return actualParent.getChildNode(parentExpr);
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582-2.006.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582-2.006.query.sqlpp
new file mode 100644
index 0000000..cd30f96
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582-2.006.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- there was issue in expected schema tree generation for i and li.qty
+SELECT *
+FROM websales.orders AS o, o.items AS i
+WHERE
+(SOME li IN o.items SATISFIES li.qty < 3)
+  AND i.qty >= 100;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582-2.007.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582-2.007.query.sqlpp
new file mode 100644
index 0000000..357a5e2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582-2.007.query.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+EXPLAIN
+WITH joined AS (
+    SELECT *
+    FROM websales.orders AS o
+    JOIN inventory.products AS p
+    ON o.items.itemno = p.itemno
+)
+SELECT 'joined' AS source, orderno AS id, order_date AS date, name, NULL AS rating
+FROM joined
+UNION ALL
+SELECT 'reviews' AS source, itemno AS id, rev_date AS date, name, rating
+FROM marketing.reviews;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582.006.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582.006.adm
new file mode 100644
index 0000000..27100ee
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582.006.adm
@@ -0,0 +1 @@
+{ "o": { "orderno": 1005, "custid": "C37", "order_date": "2020-08-30", "items": [ { "itemno": 460, "qty": 2, "price": 29.99 }, { "itemno": 347, "qty": 120, "price": 22.0 }, { "itemno": 780, "qty": 1, "price": 1500.0 }, { "itemno": 375, "qty": 2, "price": 149.98 } ] }, "i": { "itemno": 347, "qty": 120, "price": 22.0 } }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582.007.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582.007.plan
new file mode 100644
index 0000000..035816f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582.007.plan
@@ -0,0 +1,54 @@
+distribute result [$$86] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    union ($$101, $$102, $$86) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- UNION_ALL  |PARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        assign [$$101] <- [cast({"source": "joined", "id": $$68.getField("orderno"), "date": $$68.getField("order_date"), "name": $$68.getField("name"), "rating": null})] project: [$$101] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- ASSIGN  |PARTITIONED|
+          assign [$$68] <- [{"o": $$o, "p": $$p}] project: [$$68] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ASSIGN  |PARTITIONED|
+            project ([$$o, $$p]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- STREAM_PROJECT  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                join (eq($$91, $$88)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- HYBRID_HASH_JOIN [$$91][$$88]  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HASH_PARTITION_EXCHANGE [$$91]  |PARTITIONED|
+                    assign [$$91] <- [$$o.getField("items").getField("itemno")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ASSIGN  |PARTITIONED|
+                      project ([$$o]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$87, $$o] <- websales.orders [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    data-scan []<-[$$88, $$p] <- inventory.products [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- DATASOURCE_SCAN  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        assign [$$102] <- [cast({"source": "reviews", "id": $$89, "date": $$reviews.getField("rev_date"), "name": $$reviews.getField("name"), "rating": $$reviews.getField("rating")})] project: [$$102] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- ASSIGN  |PARTITIONED|
+          project ([$$89, $$reviews]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STREAM_PROJECT  |PARTITIONED|
+            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              data-scan []<-[$$89, $$90, $$reviews] <- marketing.reviews project ({name:any,rating:any,rev_date:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- DATASOURCE_SCAN  |PARTITIONED|
+                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582.007.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582.007.plan
new file mode 100644
index 0000000..03786f3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582.007.plan
@@ -0,0 +1,54 @@
+distribute result [$$86] [cardinality: 9.0, op-cost: 0.0, total-cost: 54.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 9.0, op-cost: 0.0, total-cost: 54.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    union ($$101, $$102, $$86) [cardinality: 9.0, op-cost: 0.0, total-cost: 54.0]
+    -- UNION_ALL  |PARTITIONED|
+      exchange [cardinality: 9.0, op-cost: 0.0, total-cost: 54.0]
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        assign [$$101] <- [cast({"source": "joined", "id": $$68.getField("orderno"), "date": $$68.getField("order_date"), "name": $$68.getField("name"), "rating": null})] project: [$$101] [cardinality: 9.0, op-cost: 0.0, total-cost: 54.0]
+        -- ASSIGN  |PARTITIONED|
+          assign [$$68] <- [{"o": $$o, "p": $$p}] project: [$$68] [cardinality: 9.0, op-cost: 0.0, total-cost: 54.0]
+          -- ASSIGN  |PARTITIONED|
+            project ([$$o, $$p]) [cardinality: 9.0, op-cost: 0.0, total-cost: 54.0]
+            -- STREAM_PROJECT  |PARTITIONED|
+              exchange [cardinality: 9.0, op-cost: 0.0, total-cost: 54.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                join (eq($$91, $$88)) [cardinality: 9.0, op-cost: 18.0, total-cost: 54.0]
+                -- HYBRID_HASH_JOIN [$$91][$$88]  |PARTITIONED|
+                  exchange [cardinality: 9.0, op-cost: 9.0, total-cost: 18.0]
+                  -- HASH_PARTITION_EXCHANGE [$$91]  |PARTITIONED|
+                    assign [$$91] <- [$$o.getField("items").getField("itemno")] [cardinality: 9.0, op-cost: 0.0, total-cost: 9.0]
+                    -- ASSIGN  |PARTITIONED|
+                      project ([$$o]) [cardinality: 9.0, op-cost: 0.0, total-cost: 9.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange [cardinality: 9.0, op-cost: 9.0, total-cost: 18.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$87, $$o] <- websales.orders [cardinality: 9.0, op-cost: 9.0, total-cost: 9.0]
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  exchange [cardinality: 9.0, op-cost: 9.0, total-cost: 18.0]
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    data-scan []<-[$$88, $$p] <- inventory.products [cardinality: 9.0, op-cost: 9.0, total-cost: 9.0]
+                    -- DATASOURCE_SCAN  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        assign [$$102] <- [cast({"source": "reviews", "id": $$89, "date": $$reviews.getField("rev_date"), "name": $$reviews.getField("name"), "rating": $$reviews.getField("rating")})] project: [$$102] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- ASSIGN  |PARTITIONED|
+          project ([$$89, $$reviews]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STREAM_PROJECT  |PARTITIONED|
+            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              data-scan []<-[$$89, $$90, $$reviews] <- marketing.reviews project ({name:any,rating:any,rev_date:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- DATASOURCE_SCAN  |PARTITIONED|
+                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_full_parallelism/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582.007.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_full_parallelism/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582.007.plan
new file mode 100644
index 0000000..908f054
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_full_parallelism/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582.007.plan
@@ -0,0 +1,54 @@
+distribute result [$$86] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    union ($$101, $$102, $$86) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- UNION_ALL  |PARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        assign [$$101] <- [cast({"source": "joined", "id": $$68.getField("orderno"), "date": $$68.getField("order_date"), "name": $$68.getField("name"), "rating": null})] project: [$$101] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- ASSIGN  |PARTITIONED|
+          assign [$$68] <- [{"o": $$o, "p": $$p}] project: [$$68] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ASSIGN  |PARTITIONED|
+            project ([$$o, $$p]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- STREAM_PROJECT  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                join (eq($$91, $$88)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- HYBRID_HASH_JOIN [$$91][$$88]  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HASH_PARTITION_EXCHANGE [$$91]  |PARTITIONED|
+                    assign [$$91] <- [$$o.getField("items").getField("itemno")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ASSIGN  |PARTITIONED|
+                      project ([$$o]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$87, $$o] <- websales.orders [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HASH_PARTITION_EXCHANGE [$$88]  |PARTITIONED|
+                    data-scan []<-[$$88, $$p] <- inventory.products [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- DATASOURCE_SCAN  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+        assign [$$102] <- [cast({"source": "reviews", "id": $$89, "date": $$reviews.getField("rev_date"), "name": $$reviews.getField("name"), "rating": $$reviews.getField("rating")})] project: [$$102] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- ASSIGN  |PARTITIONED|
+          project ([$$89, $$reviews]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STREAM_PROJECT  |PARTITIONED|
+            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              data-scan []<-[$$89, $$90, $$reviews] <- marketing.reviews project ({name:any,rating:any,rev_date:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- DATASOURCE_SCAN  |PARTITIONED|
+                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_less_parallelism/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582.007.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_less_parallelism/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582.007.plan
new file mode 100644
index 0000000..908f054
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_less_parallelism/column/filter/ASTERIXDB-3582-2/ASTERIXDB-3582.007.plan
@@ -0,0 +1,54 @@
+distribute result [$$86] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    union ($$101, $$102, $$86) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- UNION_ALL  |PARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        assign [$$101] <- [cast({"source": "joined", "id": $$68.getField("orderno"), "date": $$68.getField("order_date"), "name": $$68.getField("name"), "rating": null})] project: [$$101] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- ASSIGN  |PARTITIONED|
+          assign [$$68] <- [{"o": $$o, "p": $$p}] project: [$$68] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ASSIGN  |PARTITIONED|
+            project ([$$o, $$p]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- STREAM_PROJECT  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                join (eq($$91, $$88)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- HYBRID_HASH_JOIN [$$91][$$88]  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HASH_PARTITION_EXCHANGE [$$91]  |PARTITIONED|
+                    assign [$$91] <- [$$o.getField("items").getField("itemno")] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- ASSIGN  |PARTITIONED|
+                      project ([$$o]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          data-scan []<-[$$87, $$o] <- websales.orders [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                          -- DATASOURCE_SCAN  |PARTITIONED|
+                            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- HASH_PARTITION_EXCHANGE [$$88]  |PARTITIONED|
+                    data-scan []<-[$$88, $$p] <- inventory.products [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- DATASOURCE_SCAN  |PARTITIONED|
+                      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- RANDOM_PARTITION_EXCHANGE  |PARTITIONED|
+        assign [$$102] <- [cast({"source": "reviews", "id": $$89, "date": $$reviews.getField("rev_date"), "name": $$reviews.getField("name"), "rating": $$reviews.getField("rating")})] project: [$$102] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- ASSIGN  |PARTITIONED|
+          project ([$$89, $$reviews]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- STREAM_PROJECT  |PARTITIONED|
+            exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              data-scan []<-[$$89, $$90, $$reviews] <- marketing.reviews project ({name:any,rating:any,rev_date:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- DATASOURCE_SCAN  |PARTITIONED|
+                exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|