[ASTERIXDB-3331][COMP] Avoid pushing agg functions in columnar filter

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Aggregate functions should not be pushed down to data-scan
in columnar filter.

Change-Id: I6826279e24f92aad7ae7cd7de6b74a3811a57183
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18011
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/001/001.027.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/001/001.027.query.sqlpp
new file mode 100644
index 0000000..72abcf9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/001/001.027.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+SET `compiler.column.filter` "true";
+
+
+
+SELECT VALUE d
+FROM ColumnDataset d
+-- Should not be pushed. The expression is translated to neq(count(d.array), 0)
+-- Since count() is an aggregate function, the filter should not be pushed.
+WHERE EXISTS d.array
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/001/001.028.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/001/001.028.query.sqlpp
new file mode 100644
index 0000000..1fc3d9a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/filter/001/001.028.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+SET `compiler.column.filter` "true";
+SET `compiler.parallelism` "0";
+SET `compiler.sort.parallel` "false";
+EXPLAIN
+SELECT VALUE d
+FROM ColumnDataset d
+-- Should not be pushed. The predicate is translated to neq(count(d.array), 0)
+-- Since count() is an aggregate function, the filter should not be pushed.
+WHERE EXISTS d.array
+ORDER BY d.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/001/001.027.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/001/001.027.adm
new file mode 100644
index 0000000..2a830ed
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/001/001.027.adm
@@ -0,0 +1,6 @@
+{ "id": 1, "a": "1", "array": [ 10, 20, 30 ] }
+{ "id": 2, "a": "2", "array": [ 40, 50, 60 ] }
+{ "id": 3, "a": "3", "array": [ 70, 80, 90 ] }
+{ "id": 4, "a": "4", "array": [ 100, 200, 300 ] }
+{ "id": 5, "a": "5", "array": [ 400, 500, 600 ] }
+{ "id": 6, "a": "6", "array": [ 700, 800, 900 ] }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/001/001.028.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/001/001.028.plan
new file mode 100644
index 0000000..09484f3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/filter/001/001.028.plan
@@ -0,0 +1,22 @@
+distribute result [$$d] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$d]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+    -- STREAM_PROJECT  |PARTITIONED|
+      exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+      -- SORT_MERGE_EXCHANGE [$$16(ASC) ]  |PARTITIONED|
+        order (ASC, $$16) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+        -- STABLE_SORT [$$16(ASC)]  |PARTITIONED|
+          exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            select (neq(count($$d.getField("array")), 0)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+            -- STREAM_SELECT  |PARTITIONED|
+              exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                data-scan []<-[$$16, $$d] <- test.ColumnDataset [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/001/001.028.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/001/001.028.plan
new file mode 100644
index 0000000..e3a9af1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_cbo/column/filter/001/001.028.plan
@@ -0,0 +1,22 @@
+distribute result [$$d] [cardinality: 6.0, op-cost: 0.0, total-cost: 21.51]
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  exchange [cardinality: 6.0, op-cost: 0.0, total-cost: 21.51]
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    project ([$$d]) [cardinality: 6.0, op-cost: 0.0, total-cost: 21.51]
+    -- STREAM_PROJECT  |PARTITIONED|
+      exchange [cardinality: 6.0, op-cost: 0.0, total-cost: 21.51]
+      -- SORT_MERGE_EXCHANGE [$$16(ASC) ]  |PARTITIONED|
+        order (ASC, $$16) [cardinality: 6.0, op-cost: 15.51, total-cost: 21.51]
+        -- STABLE_SORT [$$16(ASC)]  |PARTITIONED|
+          exchange [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            select (neq(count($$d.getField("array")), 0)) [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+            -- STREAM_SELECT  |PARTITIONED|
+              exchange [cardinality: 6.0, op-cost: 0.0, total-cost: 6.0]
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                data-scan []<-[$$16, $$d] <- test.ColumnDataset [cardinality: 6.0, op-cost: 6.0, total-cost: 6.0]
+                -- DATASOURCE_SCAN  |PARTITIONED|
+                  exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    empty-tuple-source [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/PushdownUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/PushdownUtil.java
index 86fa2bf..ced2a25 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/PushdownUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/PushdownUtil.java
@@ -147,7 +147,7 @@
     }
 
     public static boolean isNestedFunction(FunctionIdentifier fid) {
-        return isObjectFunction(fid) || isArrayFunction(fid) || BuiltinFunctions.DEEP_EQUAL.equals(fid);
+        return isObjectFunction(fid) || isArrayOrAggregateFunction(fid) || BuiltinFunctions.DEEP_EQUAL.equals(fid);
     }
 
     public static boolean isObjectFunction(FunctionIdentifier fid) {
@@ -155,10 +155,11 @@
         return functionName.contains("object") || BuiltinFunctions.PAIRS.equals(fid);
     }
 
-    public static boolean isArrayFunction(FunctionIdentifier fid) {
+    public static boolean isArrayOrAggregateFunction(FunctionIdentifier fid) {
         String functionName = fid.getName();
         return functionName.startsWith("array") || functionName.startsWith("strict") || functionName.startsWith("sql")
-                || BuiltinFunctions.GET_ITEM.equals(fid);
+                || BuiltinFunctions.GET_ITEM.equals(fid) || BuiltinFunctions.isBuiltinScalarAggregateFunction(fid)
+                || BuiltinFunctions.isBuiltinAggregateFunction(fid);
     }
 
     public static boolean isSameFunction(ILogicalExpression expr1, ILogicalExpression expr2) {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index 0659d00..255af53 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -2780,19 +2780,33 @@
     }
 
     public enum WindowFunctionProperty implements BuiltinFunctionProperty {
-        /** Whether the order clause is prohibited */
+        /**
+         * Whether the order clause is prohibited
+         */
         NO_ORDER_CLAUSE,
-        /** Whether the frame clause is prohibited */
+        /**
+         * Whether the frame clause is prohibited
+         */
         NO_FRAME_CLAUSE,
-        /** Whether the first argument is a list */
+        /**
+         * Whether the first argument is a list
+         */
         HAS_LIST_ARG,
-        /** Whether order by expressions must be injected as arguments */
+        /**
+         * Whether order by expressions must be injected as arguments
+         */
         INJECT_ORDER_ARGS,
-        /** Whether a running aggregate requires partition materialization runtime */
+        /**
+         * Whether a running aggregate requires partition materialization runtime
+         */
         MATERIALIZE_PARTITION,
-        /** Whether FROM (FIRST | LAST) modifier is allowed */
+        /**
+         * Whether FROM (FIRST | LAST) modifier is allowed
+         */
         ALLOW_FROM_FIRST_LAST,
-        /** Whether (RESPECT | IGNORE) NULLS modifier is allowed */
+        /**
+         * Whether (RESPECT | IGNORE) NULLS modifier is allowed
+         */
         ALLOW_RESPECT_IGNORE_NULLS
     }
 
@@ -2823,7 +2837,9 @@
     }
 
     public enum DataSourceFunctionProperty implements BuiltinFunctionProperty {
-        /** Force minimum memory budget if a query only uses this function */
+        /**
+         * Force minimum memory budget if a query only uses this function
+         */
         MIN_MEMORY_BUDGET
     }
 
@@ -2863,6 +2879,10 @@
         return builtinAggregateFunctions.contains(fi);
     }
 
+    public static boolean isBuiltinScalarAggregateFunction(FunctionIdentifier fi) {
+        return scalarToAggregateFunctionMap.containsKey(fi);
+    }
+
     public static boolean isBuiltinUnnestingFunction(FunctionIdentifier fi) {
         return builtinUnnestingFunctions.containsKey(fi);
     }