[ASTERIXDB-3575][EXT] Pushdown predicates for Parquet external datasets to filter row groups

- user model changes: no
- storage format changes: no
- interface changes: yes

Ext-ref: MB-65316
Change-Id: I2c3214e2a351252fb1929aa1562cbab2d67fa9aa
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19633
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index a101bed..bd9e329 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -35,6 +35,7 @@
 import org.apache.asterix.optimizer.rules.pushdown.processor.DeltaTableFilterPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.InlineAndNormalizeFilterExpressionsProcessor;
+import org.apache.asterix.optimizer.rules.pushdown.processor.ParquetFilterPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -120,6 +121,7 @@
         // Performs prefix pushdowns
         pushdownProcessorsExecutor.add(new ExternalDatasetFilterPushdownProcessor(pushdownContext, context));
         pushdownProcessorsExecutor.add(new DeltaTableFilterPushdownProcessor(pushdownContext, context));
+        pushdownProcessorsExecutor.add(new ParquetFilterPushdownProcessor(pushdownContext, context));
         pushdownProcessorsExecutor
                 .add(new ConsolidateProjectionAndFilterExpressionsProcessor(pushdownContext, context));
         // Inlines AND/OR expression (must be last to run)
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
index 1622c6e..c4210a3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownContext.java
@@ -26,7 +26,9 @@
 import java.util.Set;
 
 import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.DefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ParquetDatasetScanDefineDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.UseDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.visitor.FilterExpressionInlineVisitor;
@@ -84,8 +86,15 @@
 
     public void registerScan(Dataset dataset, List<LogicalVariable> pkList, LogicalVariable recordVariable,
             LogicalVariable metaVariable, AbstractScanOperator scanOperator) {
-        ScanDefineDescriptor scanDefDesc =
-                new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable, scanOperator);
+        ScanDefineDescriptor scanDefDesc;
+        if (DatasetUtil.isParquetFormat(dataset)) {
+            scanDefDesc = new ParquetDatasetScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable,
+                    metaVariable, scanOperator);
+        } else {
+            scanDefDesc = new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable,
+                    scanOperator);
+        }
+        new ScanDefineDescriptor(scopes.size(), dataset, pkList, recordVariable, metaVariable, scanOperator);
         defineChain.put(recordVariable, scanDefDesc);
         useChain.put(recordVariable, new ArrayList<>());
         if (metaVariable != null) {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
index 023e4da..cf438d8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
@@ -31,12 +31,14 @@
 import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ParquetDatasetScanDefineDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.IPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.visitor.ExpectedSchemaNodeToIATypeTranslatorVisitor;
 import org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo;
 import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.asterix.runtime.projection.ParquetExternalDatasetProjectionFiltrationInfo;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -121,6 +123,12 @@
         Map<String, String> configuration = ((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties();
         boolean embedFilterValues = ExternalDataPrefix.containsComputedFields(configuration) && Boolean.parseBoolean(
                 configuration.getOrDefault(ExternalDataConstants.KEY_EMBED_FILTER_VALUES, ExternalDataConstants.TRUE));
+        if (DatasetUtil.isParquetFormat(dataset)) {
+            return new ParquetExternalDatasetProjectionFiltrationInfo(recordRequestedType, pathLocations,
+                    scanDefineDescriptor.getFilterPaths(), scanDefineDescriptor.getFilterExpression(),
+                    ((ParquetDatasetScanDefineDescriptor) scanDefineDescriptor).getRowGroupFilterExpression(),
+                    embedFilterValues);
+        }
         return new ExternalDatasetProjectionFiltrationInfo(recordRequestedType, pathLocations,
                 scanDefineDescriptor.getFilterPaths(), scanDefineDescriptor.getFilterExpression(), embedFilterValues);
     }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ParquetDatasetScanDefineDescriptor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ParquetDatasetScanDefineDescriptor.java
new file mode 100644
index 0000000..4b4d742
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/descriptor/ParquetDatasetScanDefineDescriptor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.descriptor;
+
+import java.util.List;
+
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+
+public class ParquetDatasetScanDefineDescriptor extends ScanDefineDescriptor {
+
+    private ILogicalExpression rowGroupFilterExpression;
+
+    public ParquetDatasetScanDefineDescriptor(int scope, Dataset dataset, List<LogicalVariable> primaryKeyVariables,
+            LogicalVariable recordVariable, LogicalVariable metaRecordVariable, ILogicalOperator operator) {
+        super(scope, dataset, primaryKeyVariables, recordVariable, metaRecordVariable, operator);
+        this.rowGroupFilterExpression = null;
+    }
+
+    public ILogicalExpression getRowGroupFilterExpression() {
+        return rowGroupFilterExpression;
+    }
+
+    public void setRowGroupFilterExpression(ILogicalExpression rowGroupFilterExpression) {
+        this.rowGroupFilterExpression = rowGroupFilterExpression;
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
index 1ec64d5..0d79767 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
@@ -61,7 +61,7 @@
     protected final ExpressionToExpectedSchemaNodeVisitor exprToNodeVisitor;
     protected final ColumnFilterPathBuilderVisitor pathBuilderVisitor;
     protected final Map<ILogicalExpression, ARecordType> paths;
-    private final ArrayPathCheckerVisitor checkerVisitor;
+    protected final ArrayPathCheckerVisitor checkerVisitor;
 
     public ColumnFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
         super(pushdownContext, context);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java
new file mode 100644
index 0000000..25db9f9
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ParquetFilterPushdownProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.processor;
+
+import static org.apache.asterix.metadata.utils.PushdownUtil.RANGE_FILTER_PUSHABLE_FUNCTIONS;
+
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ParquetDatasetScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ParquetFilterPushdownProcessor extends ColumnFilterPushdownProcessor {
+
+    public ParquetFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
+        super(pushdownContext, context);
+    }
+
+    @Override
+    protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws AlgebricksException {
+        return !DatasetUtil.isParquetFormat(scanDefineDescriptor.getDataset());
+    }
+
+    @Override
+    protected boolean isNotPushable(AbstractFunctionCallExpression expression) {
+        FunctionIdentifier fid = expression.getFunctionIdentifier();
+        return !RANGE_FILTER_PUSHABLE_FUNCTIONS.contains(expression.getFunctionIdentifier());
+    }
+
+    @Override
+    protected boolean handlePath(AbstractFunctionCallExpression expression) throws AlgebricksException {
+        IExpectedSchemaNode node = expression.accept(exprToNodeVisitor, null);
+        if (node == null || node.getType() != ExpectedSchemaNodeType.ANY) {
+            return false;
+        }
+
+        // The inferred path from the provided expression
+        ARecordType expressionPath = pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
+        paths.put(expression, expressionPath);
+        return true;
+    }
+
+    @Override
+    protected void putFilterInformation(ScanDefineDescriptor scanDefineDescriptor, ILogicalExpression inlinedExpr)
+            throws AlgebricksException {
+        if (checkerVisitor.containsMultipleArrayPaths(paths.values())) {
+            // Cannot pushdown a filter with multiple unnest
+            // TODO allow rewindable column readers for filters
+            // TODO this is a bit conservative (maybe too conservative) as we can push part of expression down
+            return;
+        }
+        ParquetDatasetScanDefineDescriptor scanDefDesc = (ParquetDatasetScanDefineDescriptor) scanDefineDescriptor;
+        ILogicalExpression filterExpr = scanDefDesc.getRowGroupFilterExpression();
+        if (filterExpr != null) {
+            filterExpr = andExpression(filterExpr, inlinedExpr);
+            scanDefDesc.setRowGroupFilterExpression(filterExpr);
+        } else {
+            scanDefDesc.setRowGroupFilterExpression(inlinedExpr);
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan
index b560a5b..805b9aa 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.011.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true row-group-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan
index 2bfad06..a57424c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.021.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") embed-filter-value: true row-group-filter on: and(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan
index afb74f1..5e23176 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.031.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.Department embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.Department embed-filter-value: true row-group-filter on: or(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan
index fc48056..5832f4f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/embed-one-value/one-field.121.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") embed-filter-value: true [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") embed-filter-value: true row-group-filter on: eq($$d.getField("name").getField("last"), "Jones") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan
index 8dcaa95..ecbd5ee 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.011.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") row-group-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan
index a99cc0c..35da948 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.021.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.Department prefix-filter on: eq($$d.getField("department"), "accounting") row-group-filter on: and(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan
index a7c5727..23892f1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.031.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.Department [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.Department row-group-filter on: or(eq($$d.getField("department"), "accounting"), eq($$d.getField("name").getField("last"), "Smith")) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan
index b48def6..9fb8ee1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/dynamic-prefixes/parquet/one-field/one-field.121.plan
@@ -16,7 +16,7 @@
               -- ASSIGN  |PARTITIONED|
                 exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                  data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                  data-scan []<-[$$d] <- test.LastName prefix-filter on: eq($$d.getField("name").getField("last"), "Jones") row-group-filter on: eq($$d.getField("name").getField("last"), "Jones") [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- DATASOURCE_SCAN  |PARTITIONED|
                     exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan
index 2b41307..9dab9c1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.03.plan
@@ -14,7 +14,7 @@
             -- STREAM_SELECT  |PARTITIONED|
               exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                data-scan []<-[$$p1] <- test.ParquetDataset1 [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                data-scan []<-[$$p1] <- test.ParquetDataset1 row-group-filter on: gt($$p1.getField("id"), 10) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                 -- DATASOURCE_SCAN  |PARTITIONED|
                   exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan
index 3993a4d..2fca35d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.04.plan
@@ -36,7 +36,7 @@
                         -- STREAM_SELECT  |PARTITIONED|
                           exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:any},id:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:any},id:any}) row-group-filter on: gt($$p1.getField("id"), 10) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                             -- DATASOURCE_SCAN  |PARTITIONED|
                               exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan
index 9b93e19..945f2da 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.05.plan
@@ -36,7 +36,7 @@
                         -- STREAM_SELECT  |PARTITIONED|
                           exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:[{indices:any,text:any}]},id:any}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                            data-scan []<-[$$p1] <- test.ParquetDataset1 project ({entities:{hashtags:[{indices:any,text:any}]},id:any}) row-group-filter on: gt($$p1.getField("id"), 10) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                             -- DATASOURCE_SCAN  |PARTITIONED|
                               exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan
index cccf9a3..6941720 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.08.plan
@@ -44,7 +44,7 @@
                     -- ASSIGN  |PARTITIONED|
                       exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any}]}) row-group-filter on: or(eq(scan-collection($$p.getField("val1")).getField("x"), 1), eq(scan-collection($$p.getField("val1")).getField("x"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                         -- DATASOURCE_SCAN  |PARTITIONED|
                           exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan
index cf265bf..316e465 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/parquet/pushdown-plans/pushdown-plans.09.plan
@@ -44,7 +44,7 @@
                     -- ASSIGN  |PARTITIONED|
                       exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any,y:any}]}) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
+                        data-scan []<-[$$p] <- test.ParquetDataset1 project ({val1:[{x:any,y:any}]}) row-group-filter on: or(eq(scan-collection($$p.getField("val1")).getField("x"), 1), eq(scan-collection($$p.getField("val1")).getField("y"), 2)) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                         -- DATASOURCE_SCAN  |PARTITIONED|
                           exchange [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ParquetFilterEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ParquetFilterEvaluatorFactory.java
new file mode 100644
index 0000000..774e908
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ParquetFilterEvaluatorFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+
+public class ParquetFilterEvaluatorFactory implements IExternalFilterEvaluatorFactory {
+    private static final long serialVersionUID = 1L;
+    private final FilterPredicate filterExpression;
+    private final IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory;
+
+    public ParquetFilterEvaluatorFactory(IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory,
+            FilterPredicate expression) {
+        this.externalFilterEvaluatorFactory = externalFilterEvaluatorFactory;
+        this.filterExpression = expression;
+    }
+
+    @Override
+    public IExternalFilterEvaluator create(IServiceContext serviceContext, IWarningCollector warningCollector)
+            throws HyracksDataException {
+        return externalFilterEvaluatorFactory.create(serviceContext, warningCollector);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector warningCollector) {
+        return externalFilterEvaluatorFactory.createValueEmbedder(warningCollector);
+    }
+
+    public FilterPredicate getFilterExpression() {
+        return filterExpression;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 2d92e10..f5dc7a2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -33,6 +33,7 @@
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
@@ -44,6 +45,8 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
 
 import com.amazonaws.SdkBaseException;
 
@@ -91,6 +94,13 @@
             IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
             configureAwsS3HdfsJobConf(appCtx, conf, configuration, numberOfPartitions);
             configureHdfsConf(conf, configuration);
+            if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+                FilterPredicate parquetFilterPredicate =
+                        ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+                if (parquetFilterPredicate != null) {
+                    ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+                }
+            }
         } catch (SdkException | SdkBaseException ex) {
             throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
         } catch (AlgebricksException ex) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index 530ce74..72c9977 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
@@ -41,6 +42,8 @@
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
 
 import com.azure.storage.blob.BlobServiceClient;
 import com.azure.storage.blob.models.BlobItem;
@@ -82,6 +85,13 @@
         JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
         configureAzureHdfsJobConf(conf, configuration, endPoint);
         configureHdfsConf(conf, configuration);
+        if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+            FilterPredicate parquetFilterPredicate =
+                    ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+            if (parquetFilterPredicate != null) {
+                ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+            }
+        }
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
index 4dedb08..1db4445 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
@@ -40,6 +41,8 @@
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
 
 import com.azure.storage.file.datalake.DataLakeServiceClient;
 import com.azure.storage.file.datalake.models.PathItem;
@@ -69,6 +72,13 @@
         JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
         configureAzureHdfsJobConf(conf, configuration, endPoint);
         configureHdfsConf(conf, configuration);
+        if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+            FilterPredicate parquetFilterPredicate =
+                    ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+            if (parquetFilterPredicate != null) {
+                ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+            }
+        }
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
index 874c3bd..9469747 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
@@ -39,6 +40,8 @@
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetInputFormat;
 
 import com.google.cloud.storage.Blob;
 
@@ -76,6 +79,13 @@
         int numberOfPartitions = getPartitionConstraint().getLocations().length;
         GCSAuthUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions);
         configureHdfsConf(conf, configuration);
+        if (filterEvaluatorFactory instanceof ParquetFilterEvaluatorFactory) {
+            FilterPredicate parquetFilterPredicate =
+                    ((ParquetFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+            if (parquetFilterPredicate != null) {
+                ParquetInputFormat.setFilterPredicate(conf, parquetFilterPredicate);
+            }
+        }
     }
 
     @Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 2551b86..0cd150e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -854,4 +854,9 @@
         return dataset.getDatasetType() == DatasetType.EXTERNAL && ExternalDataUtils
                 .isDeltaTable(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
     }
+
+    public static boolean isParquetFormat(Dataset dataset) {
+        return dataset.getDatasetType() == DatasetType.EXTERNAL && ExternalDataUtils
+                .isParquetFormat(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 5939290..dfefb8f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.metadata.utils;
 
 import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static org.apache.asterix.external.util.ExternalDataUtils.isParquetFormat;
 import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 
@@ -43,6 +44,7 @@
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.input.filter.NoOpDeltaTableFilterEvaluatorFactory;
 import org.apache.asterix.external.input.filter.NoOpExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.metadata.dataset.DatasetFormatInfo;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -53,6 +55,7 @@
 import org.apache.asterix.metadata.utils.filter.ColumnRangeFilterBuilder;
 import org.apache.asterix.metadata.utils.filter.DeltaTableFilterBuilder;
 import org.apache.asterix.metadata.utils.filter.ExternalFilterBuilder;
+import org.apache.asterix.metadata.utils.filter.ParquetFilterBuilder;
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.base.IAObject;
 import org.apache.asterix.om.types.ARecordType;
@@ -61,6 +64,7 @@
 import org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo;
 import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.asterix.runtime.projection.ParquetExternalDatasetProjectionFiltrationInfo;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -346,6 +350,25 @@
                         (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo, context, typeEnv);
                 return builder.build();
             }
+        } else if (isParquetFormat(properties)) {
+            if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) {
+                return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE;
+            } else {
+                ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
+                ExternalDatasetProjectionFiltrationInfo pfi =
+                        (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo;
+                IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory =
+                        NoOpExternalFilterEvaluatorFactory.INSTANCE;
+                if (!prefix.getPaths().isEmpty()) {
+                    ExternalFilterBuilder externalFilterBuilder =
+                            new ExternalFilterBuilder(pfi, context, typeEnv, prefix);
+                    externalFilterEvaluatorFactory = externalFilterBuilder.build();
+                }
+                ParquetFilterBuilder builder = new ParquetFilterBuilder(
+                        (ParquetExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo, context, typeEnv);
+                return new ParquetFilterEvaluatorFactory(externalFilterEvaluatorFactory,
+                        builder.buildFilterPredicate());
+            }
         } else {
             if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) {
                 return NoOpExternalFilterEvaluatorFactory.INSTANCE;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ParquetFilterBuilder.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ParquetFilterBuilder.java
new file mode 100644
index 0000000..38ad119
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ParquetFilterBuilder.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils.filter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt16;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AInt8;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.projection.ParquetExternalDatasetProjectionFiltrationInfo;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators;
+import org.apache.parquet.io.api.Binary;
+
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Predicate;
+
+public class ParquetFilterBuilder extends AbstractFilterBuilder {
+
+    private static final org.apache.logging.log4j.Logger LOGGER = LogManager.getLogger();
+
+    public ParquetFilterBuilder(ParquetExternalDatasetProjectionFiltrationInfo projectionFiltrationInfo,
+            JobGenContext context, IVariableTypeEnvironment typeEnv) {
+        super(projectionFiltrationInfo.getFilterPaths(), projectionFiltrationInfo.getParquetRowGroupFilterExpression(),
+                context, typeEnv);
+    }
+
+    public FilterPredicate buildFilterPredicate() throws AlgebricksException {
+        FilterPredicate parquetFilterPredicate = null;
+        if (filterExpression != null) {
+            try {
+                parquetFilterPredicate = createFilterExpression(filterExpression);
+            } catch (Exception e) {
+                LOGGER.error("Error creating Parquet row-group filter expression ", e);
+            }
+        }
+        if (parquetFilterPredicate != null && !(parquetFilterPredicate instanceof Predicate)) {
+            parquetFilterPredicate = null;
+        }
+        return parquetFilterPredicate;
+    }
+
+    private FilterPredicate createComparisonExpression(ILogicalExpression columnName, ILogicalExpression constValue,
+            FunctionIdentifier fid) throws AlgebricksException {
+        ConstantExpression constExpr = (ConstantExpression) constValue;
+        if (constExpr.getValue().isNull() || constExpr.getValue().isMissing()) {
+            throw new RuntimeException("Unsupported literal type: " + constExpr.getValue());
+        }
+        AsterixConstantValue constantValue = (AsterixConstantValue) constExpr.getValue();
+        String fieldName = createColumnExpression(columnName);
+        switch (constantValue.getObject().getType().getTypeTag()) {
+            case STRING:
+                return createComparisionFunction(FilterApi.binaryColumn(fieldName),
+                        Binary.fromString(((AString) constantValue.getObject()).getStringValue()), fid);
+            case TINYINT:
+                return createComparisionFunction(FilterApi.intColumn(fieldName),
+                        (int) ((AInt8) constantValue.getObject()).getByteValue(), fid);
+            case SMALLINT:
+                return createComparisionFunction(FilterApi.intColumn(fieldName),
+                        (int) ((AInt16) constantValue.getObject()).getShortValue(), fid);
+            case INTEGER:
+                return createComparisionFunction(FilterApi.intColumn(fieldName),
+                        ((AInt32) constantValue.getObject()).getIntegerValue(), fid);
+            case BOOLEAN:
+                if (!fid.equals(AlgebricksBuiltinFunctions.EQ)) {
+                    throw new RuntimeException("Unsupported comparison function: " + fid);
+                }
+                return FilterApi.eq(FilterApi.booleanColumn(fieldName), constantValue.isTrue());
+            case BIGINT:
+                return createComparisionFunction(FilterApi.longColumn(fieldName),
+                        ((AInt64) constantValue.getObject()).getLongValue(), fid);
+            case DOUBLE:
+                return createComparisionFunction(FilterApi.doubleColumn(fieldName),
+                        ((ADouble) constantValue.getObject()).getDoubleValue(), fid);
+            case DATE:
+                return createComparisionFunction(FilterApi.intColumn(fieldName),
+                        ((ADate) constantValue.getObject()).getChrononTimeInDays(), fid);
+            case DATETIME:
+                Long millis = ((ADateTime) constantValue.getObject()).getChrononTime();
+                return createComparisionFunction(FilterApi.longColumn(fieldName),
+                        TimeUnit.MILLISECONDS.toMicros(millis), fid);
+            default:
+                throw new RuntimeException("Unsupported literal type: " + constantValue.getObject().getType());
+        }
+    }
+
+    @Override
+    protected IScalarEvaluatorFactory createValueAccessor(ILogicalExpression expression) {
+        return null;
+    }
+
+    private FilterPredicate createFilterExpression(ILogicalExpression expr) throws AlgebricksException {
+        if (expr == null || expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+            throw new RuntimeException("Unsupported expression: " + expr);
+        }
+        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+        IFunctionDescriptor fd = resolveFunction(funcExpr);
+        FunctionIdentifier fid = fd.getIdentifier();
+        if (funcExpr.getArguments().size() != 2
+                && !(fid.equals(AlgebricksBuiltinFunctions.AND) || fid.equals(AlgebricksBuiltinFunctions.OR))) {
+            throw new RuntimeException("Unsupported function: " + funcExpr);
+        }
+        List<Mutable<ILogicalExpression>> args = funcExpr.getArguments();
+        if (fid.equals(AlgebricksBuiltinFunctions.AND) || fid.equals(AlgebricksBuiltinFunctions.OR)) {
+            return createAndOrPredicate(fid, args, 0);
+        } else {
+            return createComparisonExpression(args.get(0).getValue(), args.get(1).getValue(), fid);
+        }
+    }
+
+    private <T extends Comparable<T>, C extends Operators.Column<T> & Operators.SupportsLtGt> FilterPredicate createComparisionFunction(
+            C column, T value, FunctionIdentifier fid) {
+        if (fid.equals(AlgebricksBuiltinFunctions.EQ)) {
+            return FilterApi.eq(column, value);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.GE)) {
+            return FilterApi.gtEq(column, value);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.GT)) {
+            return FilterApi.gt(column, value);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.LE)) {
+            return FilterApi.ltEq(column, value);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.LT)) {
+            return FilterApi.lt(column, value);
+        } else {
+            throw new RuntimeException("Unsupported function: " + fid);
+        }
+    }
+
+    protected String createColumnExpression(ILogicalExpression expression) {
+        ARecordType path = filterPaths.get(expression);
+        if (path.getFieldNames().length != 1) {
+            throw new RuntimeException("Unsupported column expression: " + expression);
+        } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) {
+            // The field could be a nested field
+            List<String> fieldList = new ArrayList<>();
+            fieldList = createPathExpression(path, fieldList);
+            return String.join(".", fieldList);
+        } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) {
+            return path.getFieldNames()[0];
+        } else {
+            throw new RuntimeException("Unsupported column expression: " + expression);
+        }
+    }
+
+    private List<String> createPathExpression(ARecordType path, List<String> fieldList) {
+        if (path.getFieldNames().length != 1) {
+            throw new RuntimeException("Error creating column expression");
+        } else {
+            fieldList.add(path.getFieldNames()[0]);
+        }
+        if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.OBJECT) {
+            return createPathExpression((ARecordType) path.getFieldTypes()[0], fieldList);
+        } else if (path.getFieldTypes()[0].getTypeTag() == ATypeTag.ANY) {
+            return fieldList;
+        } else {
+            throw new RuntimeException("Error creating column expression");
+        }
+    }
+
+    // Converts or(pred1, pred2, pred3) to or(pred1, or(pred2, pred3))
+    private FilterPredicate createAndOrPredicate(FunctionIdentifier function, List<Mutable<ILogicalExpression>> args,
+            int index) throws AlgebricksException {
+        if (index == args.size() - 2) {
+            if (function.equals(AlgebricksBuiltinFunctions.AND)) {
+                return FilterApi.and(createFilterExpression(args.get(0).getValue()),
+                        createFilterExpression(args.get(1).getValue()));
+            } else {
+                return FilterApi.or(createFilterExpression(args.get(0).getValue()),
+                        createFilterExpression(args.get(1).getValue()));
+            }
+        } else {
+            if (function.equals(AlgebricksBuiltinFunctions.AND)) {
+                return FilterApi.and(createFilterExpression(args.get(index).getValue()),
+                        createAndOrPredicate(function, args, index + 1));
+            } else {
+                return FilterApi.or(createFilterExpression(args.get(index).getValue()),
+                        createAndOrPredicate(function, args, index + 1));
+            }
+        }
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
index e0ea99a..f95483e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
@@ -43,7 +43,7 @@
     protected final ARecordType projectedType;
     protected final ILogicalExpression filterExpression;
     protected final Map<String, FunctionCallInformation> functionCallInfoMap;
-    private final boolean embedFilterValues;
+    protected final boolean embedFilterValues;
     protected final Map<ILogicalExpression, ARecordType> filterPaths;
 
     public ExternalDatasetProjectionFiltrationInfo(ARecordType projectedType,
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ParquetExternalDatasetProjectionFiltrationInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ParquetExternalDatasetProjectionFiltrationInfo.java
new file mode 100644
index 0000000..323d3ed
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ParquetExternalDatasetProjectionFiltrationInfo.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.projection;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksStringBuilderWriter;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+public class ParquetExternalDatasetProjectionFiltrationInfo extends ExternalDatasetProjectionFiltrationInfo {
+
+    private ILogicalExpression parquetRowGroupFilterExpression;
+
+    public ParquetExternalDatasetProjectionFiltrationInfo(ARecordType projectedType,
+            Map<String, FunctionCallInformation> sourceInformationMap, Map<ILogicalExpression, ARecordType> filterPaths,
+            ILogicalExpression filterExpression, ILogicalExpression parquetRowGroupFilterExpression,
+            boolean embedFilterValues) {
+        super(projectedType, sourceInformationMap, filterPaths, filterExpression, embedFilterValues);
+        this.parquetRowGroupFilterExpression = parquetRowGroupFilterExpression;
+    }
+
+    private ParquetExternalDatasetProjectionFiltrationInfo(ParquetExternalDatasetProjectionFiltrationInfo other) {
+        super(other.projectedType, other.functionCallInfoMap, other.filterPaths, other.filterExpression,
+                other.embedFilterValues);
+        this.parquetRowGroupFilterExpression = other.parquetRowGroupFilterExpression;
+    }
+
+    public ILogicalExpression getParquetRowGroupFilterExpression() {
+        return parquetRowGroupFilterExpression;
+    }
+
+    @Override
+    public void print(AlgebricksStringBuilderWriter writer) {
+        super.print(writer);
+        if (parquetRowGroupFilterExpression != null) {
+            writer.append(" row-group-filter on: ");
+            writer.append(parquetRowGroupFilterExpression.toString());
+        }
+    }
+
+    @Override
+    public void print(JsonGenerator generator) throws IOException {
+        super.print(generator);
+        if (parquetRowGroupFilterExpression != null) {
+            generator.writeStringField("row-group-filter-on", parquetRowGroupFilterExpression.toString());
+        }
+    }
+
+    @Override
+    public ParquetExternalDatasetProjectionFiltrationInfo createCopy() {
+        return new ParquetExternalDatasetProjectionFiltrationInfo(this);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ParquetExternalDatasetProjectionFiltrationInfo otherInfo = (ParquetExternalDatasetProjectionFiltrationInfo) o;
+        return super.equals(o)
+                && filterExpressionEquals(parquetRowGroupFilterExpression, otherInfo.parquetRowGroupFilterExpression);
+    }
+}