diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index a094c22..121a76b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -19,12 +19,15 @@
 package org.apache.asterix.external.input.record.reader.aws.delta;
 
 import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
@@ -41,6 +44,7 @@
 import io.delta.kernel.data.Row;
 import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
+import io.delta.kernel.expressions.Predicate;
 import io.delta.kernel.internal.InternalScanFileUtils;
 import io.delta.kernel.internal.data.ScanStateRow;
 import io.delta.kernel.types.StructType;
@@ -65,9 +69,10 @@
     private int fileIndex;
     private Row scanFile;
     private CloseableIterator<Row> rows;
+    private Optional<Predicate> filterPredicate;
 
-    public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, ConfFactory config)
-            throws HyracksDataException {
+    public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, ConfFactory config,
+            String filterExpressionStr) throws HyracksDataException {
         JobConf conf = config.getConf();
         this.engine = DefaultEngine.create(conf);
         this.scanFiles = new ArrayList<>();
@@ -85,15 +90,16 @@
             this.scanFile = scanFiles.get(0);
             this.fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
             this.physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
+            this.filterPredicate = PredicateSerDe.deserializeExpressionFromJson(filterExpressionStr);
             try {
                 this.physicalDataIter = engine.getParquetHandler()
-                        .readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, Optional.empty());
+                        .readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, filterPredicate);
                 this.dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
                 if (dataIter.hasNext()) {
                     rows = dataIter.next().getRows();
                 }
             } catch (IOException e) {
-                throw new RuntimeException(e);
+                throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
             }
         }
     }
@@ -122,7 +128,7 @@
             physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
             try {
                 physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
-                        physicalReadSchema, Optional.empty());
+                        physicalReadSchema, filterPredicate);
                 dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
             } catch (IOException e) {
                 throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index 4e902b9..b76dd4d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -79,6 +79,7 @@
     private String scanState;
     protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
     protected ConfFactory confFactory;
+    private String filterExpressionStr;
 
     public List<PartitionWorkLoadBasedOnSize> getPartitionWorkLoadsBasedOnSize() {
         return partitionWorkLoadsBasedOnSize;
@@ -133,8 +134,14 @@
         if (filterExpression != null) {
             scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema)
                     .withFilter(engine, (Predicate) filterExpression).build();
+            if (scan.getRemainingFilter().isPresent()) {
+                filterExpressionStr = PredicateSerDe.serializeExpressionToJson(scan.getRemainingFilter().get());
+            } else {
+                filterExpressionStr = null;
+            }
         } else {
             scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
+            filterExpressionStr = null;
         }
         scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
         List<Row> scanFiles;
@@ -145,6 +152,7 @@
             // We need to fall back to skip applying the filter and return all files.
             LOGGER.info("Exception encountered while getting delta table files to scan {}", e.getMessage());
             scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
+            filterExpressionStr = null;
             scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
             scanFiles = getScanFiles(scan, engine);
         }
@@ -206,7 +214,7 @@
         try {
             int partition = context.getPartition();
             return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState,
-                    confFactory);
+                    confFactory, filterExpressionStr);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java
new file mode 100644
index 0000000..efd47f5
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PredicateSerDe.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright (2023) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Predicate;
+
+/**
+ * Utility class to serialize and deserialize {@link Predicate} object.
+ */
+public class PredicateSerDe {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private PredicateSerDe() {
+    }
+
+    public static String serializeExpressionToJson(Expression expression) {
+        Map<String, Object> expressionObject = visitExpression(expression);
+        try {
+            return OBJECT_MAPPER.writeValueAsString(expressionObject);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Optional<Predicate> deserializeExpressionFromJson(String jsonExpression) {
+        try {
+            if (jsonExpression == null) {
+                return Optional.empty();
+            }
+            JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonExpression);
+            return Optional.of((Predicate) visitExpression((ObjectNode) jsonNode));
+        } catch (JsonProcessingException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    public static Map<String, Object> visitPredicate(Predicate predicate) {
+        Map<String, Object> predicateObject = new HashMap<>();
+        predicateObject.put("type", "predicate");
+        predicateObject.put("name", predicate.getName());
+        predicateObject.put("left", visitExpression(predicate.getChildren().get(0)));
+        predicateObject.put("right", visitExpression(predicate.getChildren().get(1)));
+        return predicateObject;
+    }
+
+    public static Map<String, Object> visitLiteral(Literal literal) {
+        Map<String, Object> literalObject = new HashMap<>();
+        literalObject.put("type", "literal");
+        literalObject.put("dataType", literal.getDataType().toString());
+        literalObject.put("value", literal.getValue());
+        return literalObject;
+    }
+
+    public static Map<String, Object> visitColumn(Column column) {
+        Map<String, Object> columnObject = new HashMap<>();
+        columnObject.put("type", "column");
+        columnObject.put("names", column.getNames());
+        return columnObject;
+    }
+
+    private static Map<String, Object> visitExpression(Expression expression) {
+        return switch (expression) {
+            case Predicate predicate -> visitPredicate(predicate);
+            case Column column -> visitColumn(column);
+            case Literal literal -> visitLiteral(literal);
+            case null, default -> throw new UnsupportedOperationException("Unsupported expression type: " + expression);
+        };
+    }
+
+    public static Predicate visitPredicate(ObjectNode node) {
+        return new Predicate(node.get("name").asText(), visitExpression((ObjectNode) node.get("left")),
+                visitExpression((ObjectNode) node.get("right")));
+    }
+
+    public static Literal visitLiteral(ObjectNode node) {
+        switch (node.get("dataType").asText()) {
+            case "boolean" : return Literal.ofBoolean(node.get("value").asBoolean());
+            case "byte" : return Literal.ofByte((byte) node.get("value").asInt());
+            case "short" : return Literal.ofShort(node.get("value").shortValue());
+            case "integer" : return Literal.ofInt(node.get("value").asInt());
+            case "long" : return Literal.ofLong(node.get("value").asLong());
+            case "float" : return Literal.ofFloat(node.get("value").floatValue());
+            case "double" : return Literal.ofDouble(node.get("value").doubleValue());
+            case "date" : return Literal.ofDate(node.get("value").asInt());
+            case "timestamp" : return Literal.ofTimestamp(node.get("value").asLong());
+            case "string" : return Literal.ofString(node.get("value").asText());
+            case null, default : throw new UnsupportedOperationException("Unsupported literal type: " + node.get("dataType").asText());
+        }
+    }
+
+    public static Column visitColumn(ObjectNode node) {
+        if (node.get("names").isArray()) {
+            return new Column(StreamSupport.stream(node.get("names").spliterator(), false).map(JsonNode::asText)
+                    .toArray(String[]::new));
+        } else {
+            return new Column(node.get("names").asText());
+        }
+    }
+
+    private static Expression visitExpression(ObjectNode node) {
+        return switch (node.get("type").asText()) {
+            case "predicate" -> visitPredicate(node);
+            case "column" -> visitColumn(node);
+            case "literal" -> visitLiteral(node);
+            case null, default -> throw new UnsupportedOperationException("Unsupported expression type: " + node.get("type").asText());
+        };
+    }
+}
