[ASTERIXDB-3561][EXT] Predicate pushdown for Delta Tables

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Predicates are pushed down to reduce the number of files to scan.

Ext-ref: MB-65217

Change-Id: If1a46db488ee0f26aeea27069a4668665d1781dc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19406
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index 16aeecb..a101bed 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnRangeFilterPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.ColumnValueAccessPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.ConsolidateProjectionAndFilterExpressionsProcessor;
+import org.apache.asterix.optimizer.rules.pushdown.processor.DeltaTableFilterPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.processor.InlineAndNormalizeFilterExpressionsProcessor;
 import org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor;
@@ -118,6 +119,7 @@
         }
         // Performs prefix pushdowns
         pushdownProcessorsExecutor.add(new ExternalDatasetFilterPushdownProcessor(pushdownContext, context));
+        pushdownProcessorsExecutor.add(new DeltaTableFilterPushdownProcessor(pushdownContext, context));
         pushdownProcessorsExecutor
                 .add(new ConsolidateProjectionAndFilterExpressionsProcessor(pushdownContext, context));
         // Inlines AND/OR expression (must be last to run)
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java
new file mode 100644
index 0000000..8e5bf5b
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/DeltaTableFilterPushdownProcessor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.processor;
+
+import static org.apache.asterix.metadata.utils.PushdownUtil.ARRAY_FUNCTIONS;
+
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class DeltaTableFilterPushdownProcessor extends ColumnFilterPushdownProcessor {
+
+    public DeltaTableFilterPushdownProcessor(PushdownContext pushdownContext, IOptimizationContext context) {
+        super(pushdownContext, context);
+    }
+
+    @Override
+    protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws AlgebricksException {
+        return !DatasetUtil.isDeltaTable(scanDefineDescriptor.getDataset());
+    }
+
+    @Override
+    protected boolean isNotPushable(AbstractFunctionCallExpression expression) {
+        FunctionIdentifier fid = expression.getFunctionIdentifier();
+        return ARRAY_FUNCTIONS.contains(fid) || super.isNotPushable(expression);
+    }
+
+    @Override
+    protected boolean handlePath(AbstractFunctionCallExpression expression) throws AlgebricksException {
+        IExpectedSchemaNode node = expression.accept(exprToNodeVisitor, null);
+        if (node == null || node.getType() != ExpectedSchemaNodeType.ANY) {
+            return false;
+        }
+
+        // The inferred path from the provided expression
+        ARecordType expressionPath = pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
+        paths.put(expression, expressionPath);
+        return true;
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
index c0b5e26..9a0be3f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
@@ -446,7 +446,8 @@
         loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_nine", PARQUET_FILTER, "delta-data/");
         loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one/_delta_log", JSON_FILTER, "delta-data/");
         loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one", PARQUET_FILTER, "delta-data/");
-
+        loadDeltaDirectory(generatedDataBasePath, "/partitioned_delta_table", PARQUET_FILTER, "delta-data/");
+        loadDeltaDirectory(generatedDataBasePath, "/partitioned_delta_table/_delta_log", JSON_FILTER, "delta-data/");
     }
 
     private static void loadDeltaDirectory(String dataBasePath, String rootPath, FilenameFilter filter,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
index 67d460c..7c8840a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
@@ -23,8 +23,10 @@
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
@@ -59,6 +61,8 @@
             "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_file_size_one";
     public static final String DELTA_FILE_SIZE_NINE =
             "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_file_size_nine";
+    public static final String DELTA_PARTITIONED_TABLE =
+            "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "partitioned_delta_table";
 
     public static void prepareDeltaTableContainer(Configuration conf) {
         File basePath = new File(".");
@@ -68,6 +72,7 @@
         prepareEmptyTable(conf);
         prepareFileSizeOne(conf);
         prepareFileSizeNine(conf);
+        preparePartitionedTable(conf);
     }
 
     public static void cleanBinaryDirectory(File localDataRoot, String binaryFilesPath) {
@@ -352,4 +357,145 @@
             throw new RuntimeException(e);
         }
     }
+
+    public static void preparePartitionedTable(Configuration conf) {
+        Schema schema = SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name")
+                .requiredString("date").requiredString("hour").endRecord();
+        try {
+            List<GenericData.Record> fileFirstSnapshotRecords = List.of(new GenericData.Record(schema),
+                    new GenericData.Record(schema), new GenericData.Record(schema));
+            List<GenericData.Record> fileSecondSnapshotRecords =
+                    List.of(new GenericData.Record(schema), new GenericData.Record(schema));
+            List<GenericData.Record> fileThirdSnapshotRecords =
+                    List.of(new GenericData.Record(schema), new GenericData.Record(schema));
+            List<GenericData.Record> fileFourthSnapshotRecords =
+                    List.of(new GenericData.Record(schema), new GenericData.Record(schema));
+
+            fileFirstSnapshotRecords.get(0).put("id", 0);
+            fileFirstSnapshotRecords.get(0).put("name", "Order 1");
+            fileFirstSnapshotRecords.get(0).put("date", "01-01-2025");
+            fileFirstSnapshotRecords.get(0).put("hour", 10);
+
+            fileFirstSnapshotRecords.get(1).put("id", 1);
+            fileFirstSnapshotRecords.get(1).put("name", "Order 2");
+            fileFirstSnapshotRecords.get(1).put("date", "01-01-2025");
+            fileFirstSnapshotRecords.get(1).put("hour", 10);
+
+            fileFirstSnapshotRecords.get(2).put("id", 2);
+            fileFirstSnapshotRecords.get(2).put("name", "Order 3");
+            fileFirstSnapshotRecords.get(2).put("date", "01-01-2025");
+            fileFirstSnapshotRecords.get(2).put("hour", 10);
+
+            fileSecondSnapshotRecords.get(0).put("id", 3);
+            fileSecondSnapshotRecords.get(0).put("name", "Order 10");
+            fileSecondSnapshotRecords.get(0).put("date", "01-01-2025");
+            fileSecondSnapshotRecords.get(0).put("hour", 15);
+
+            fileSecondSnapshotRecords.get(1).put("id", 4);
+            fileSecondSnapshotRecords.get(1).put("name", "Order 11");
+            fileSecondSnapshotRecords.get(1).put("date", "01-01-2025");
+            fileSecondSnapshotRecords.get(1).put("hour", 15);
+
+            fileThirdSnapshotRecords.get(0).put("id", 5);
+            fileThirdSnapshotRecords.get(0).put("name", "Order 21");
+            fileThirdSnapshotRecords.get(0).put("date", "01-02-2025");
+            fileThirdSnapshotRecords.get(0).put("hour", 12);
+
+            fileThirdSnapshotRecords.get(1).put("id", 6);
+            fileThirdSnapshotRecords.get(1).put("name", "Order 22");
+            fileThirdSnapshotRecords.get(1).put("date", "01-02-2025");
+            fileThirdSnapshotRecords.get(1).put("hour", 12);
+
+            fileFourthSnapshotRecords.get(0).put("id", 7);
+            fileFourthSnapshotRecords.get(0).put("name", "Order 30");
+            fileFourthSnapshotRecords.get(0).put("date", "01-02-2025");
+            fileFourthSnapshotRecords.get(0).put("hour", 16);
+
+            fileFourthSnapshotRecords.get(1).put("id", 8);
+            fileFourthSnapshotRecords.get(1).put("name", "Order 31");
+            fileFourthSnapshotRecords.get(1).put("date", "01-02-2025");
+            fileFourthSnapshotRecords.get(1).put("hour", 16);
+
+            Path path = new Path(DELTA_PARTITIONED_TABLE, "firstFile.parquet");
+            ParquetWriter<GenericData.Record> writer =
+                    AvroParquetWriter.<GenericData.Record> builder(path).withConf(conf).withSchema(schema).build();
+            for (GenericData.Record record : fileFirstSnapshotRecords) {
+                writer.write(record);
+            }
+            long size = writer.getDataSize();
+            writer.close();
+
+            Path path2 = new Path(DELTA_PARTITIONED_TABLE, "secondFile.parquet");
+            ParquetWriter<GenericData.Record> writer2 =
+                    AvroParquetWriter.<GenericData.Record> builder(path2).withConf(conf).withSchema(schema).build();
+            for (GenericData.Record record : fileSecondSnapshotRecords) {
+                writer2.write(record);
+            }
+            long size2 = writer2.getDataSize();
+            writer2.close();
+
+            Path path3 = new Path(DELTA_PARTITIONED_TABLE, "thirdFile.parquet");
+            ParquetWriter<GenericData.Record> writer3 =
+                    AvroParquetWriter.<GenericData.Record> builder(path3).withConf(conf).withSchema(schema).build();
+            for (GenericData.Record record : fileThirdSnapshotRecords) {
+                writer3.write(record);
+            }
+            long size3 = writer3.getDataSize();
+            writer3.close();
+
+            Path path4 = new Path(DELTA_PARTITIONED_TABLE, "fourthFile.parquet");
+            ParquetWriter<GenericData.Record> writer4 =
+                    AvroParquetWriter.<GenericData.Record> builder(path4).withConf(conf).withSchema(schema).build();
+            for (GenericData.Record record : fileFourthSnapshotRecords) {
+                writer4.write(record);
+            }
+            long size4 = writer4.getDataSize();
+            writer4.close();
+
+            DeltaLog log = DeltaLog.forTable(conf, DELTA_PARTITIONED_TABLE);
+            OptimisticTransaction txn = log.startTransaction();
+            Metadata metaData = txn.metadata().copyBuilder().partitionColumns(Arrays.asList("date", "hour"))
+                    .schema(new StructType().add(new StructField("id", new IntegerType(), true))
+                            .add(new StructField("name", new StringType(), true))
+                            .add(new StructField("date", new StringType(), true))
+                            .add(new StructField("hour", new IntegerType(), true)))
+                    .build();
+
+            Map<String, String> partitionValues = new HashMap<>();
+            partitionValues.put("date", "01-01-2025");
+            partitionValues.put("hour", "10");
+            List<Action> actions = List.of(new AddFile("firstFile.parquet", partitionValues, size,
+                    System.currentTimeMillis(), true, null, null));
+            txn.updateMetadata(metaData);
+            txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create");
+
+            txn = log.startTransaction();
+            partitionValues.clear();
+            partitionValues.put("date", "01-01-2025");
+            partitionValues.put("hour", "15");
+            actions = List.of(new AddFile("secondFile.parquet", partitionValues, size2, System.currentTimeMillis(),
+                    true, null, null));
+            txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create");
+
+            txn = log.startTransaction();
+            partitionValues.clear();
+            partitionValues.put("date", "01-02-2025");
+            partitionValues.put("hour", "12");
+            actions = List.of(new AddFile("thirdFile.parquet", partitionValues, size3, System.currentTimeMillis(), true,
+                    null, null));
+            txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create");
+
+            txn = log.startTransaction();
+            partitionValues.clear();
+            partitionValues.put("date", "01-02-2025");
+            partitionValues.put("hour", "16");
+            actions = List.of(new AddFile("fourthFile.parquet", partitionValues, size4, System.currentTimeMillis(),
+                    true, null, null));
+            txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create");
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp
new file mode 100644
index 0000000..a085d6f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter%
+ (
+   %template%,
+   ("container"="playground"),
+   ("definition"="delta-data/partitioned_delta_table"),
+   ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp
new file mode 100644
index 0000000..73ef0f9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds WHERE date = "01-01-2025" order by ds.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm
new file mode 100644
index 0000000..86d41fe
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-partitioned-file-read/read-data.2.adm
@@ -0,0 +1,5 @@
+{ "id": 0, "name": "Order 1", "date": "01-01-2025", "hour": 10 }
+{ "id": 1, "name": "Order 2", "date": "01-01-2025", "hour": 10 }
+{ "id": 2, "name": "Order 3", "date": "01-01-2025", "hour": 10 }
+{ "id": 3, "name": "Order 10", "date": "01-01-2025", "hour": 15 }
+{ "id": 4, "name": "Order 11", "date": "01-01-2025", "hour": 15 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 1a05334..e8e89de 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -575,6 +575,12 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset">
+      <compilation-unit name="common/deltalake-partitioned-file-read">
+        <placeholder name="adapter" value="S3" />
+        <output-dir compare="Text">common/deltalake-partitioned-file-read</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
       <compilation-unit name="common/avro/avro-types/avro-map">
         <placeholder name="adapter" value="S3" />
         <placeholder name="path_prefix" value="" />
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java
new file mode 100644
index 0000000..c954f9b
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/DeltaTableFilterEvaluatorFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+import io.delta.kernel.expressions.Expression;
+
+public class DeltaTableFilterEvaluatorFactory implements IExternalFilterEvaluatorFactory {
+    private static final long serialVersionUID = 1L;
+    private final Expression filterExpression;
+
+    public DeltaTableFilterEvaluatorFactory(Expression expression) {
+        this.filterExpression = expression;
+    }
+
+    @Override
+    public IExternalFilterEvaluator create(IServiceContext serviceContext, IWarningCollector warningCollector)
+            throws HyracksDataException {
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector warningCollector) {
+        return NoOpFilterValueEmbedder.INSTANCE;
+    }
+
+    public Expression getFilterExpression() {
+        return filterExpression;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpDeltaTableFilterEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpDeltaTableFilterEvaluatorFactory.java
new file mode 100644
index 0000000..406a630
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpDeltaTableFilterEvaluatorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public class NoOpDeltaTableFilterEvaluatorFactory extends DeltaTableFilterEvaluatorFactory {
+    public static final IExternalFilterEvaluatorFactory INSTANCE = new NoOpDeltaTableFilterEvaluatorFactory();
+    private static final long serialVersionUID = 1L;
+
+    private NoOpDeltaTableFilterEvaluatorFactory() {
+        super(null);
+    }
+
+    @Override
+    public IExternalFilterEvaluator create(IServiceContext serviceContext, IWarningCollector warningCollector) {
+        return NoOpExternalFilterEvaluator.INSTANCE;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector warningCollector) {
+        return NoOpFilterValueEmbedder.INSTANCE;
+    }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index e3313f7..3c998a5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -39,6 +39,7 @@
 import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.filter.DeltaTableFilterEvaluatorFactory;
 import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.HDFSUtils;
@@ -62,6 +63,8 @@
 import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
 import io.delta.kernel.exceptions.KernelException;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Predicate;
 import io.delta.kernel.internal.InternalScanFileUtils;
 import io.delta.kernel.types.StructType;
 import io.delta.kernel.utils.CloseableIterator;
@@ -124,19 +127,35 @@
         } catch (AsterixDeltaRuntimeException e) {
             throw e.getHyracksDataException();
         }
-        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
+        Expression filterExpression = ((DeltaTableFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression();
+        Scan scan;
+        if (filterExpression != null) {
+            scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema)
+                    .withFilter(engine, (Predicate) filterExpression).build();
+        } else {
+            scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
+        }
         scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
         CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
 
         List<Row> scanFiles = new ArrayList<>();
         while (iter.hasNext()) {
-            FilteredColumnarBatch batch = iter.next();
+            FilteredColumnarBatch batch = null;
+            try {
+                batch = iter.next();
+            } catch (UnsupportedOperationException e) {
+                // Failed to apply expression due to type mismatch. We can skip the files where partitioned column
+                // type is different from the type of value provided in the predicate
+                LOGGER.info("Unsupported operation {}", e.getMessage());
+                continue;
+            }
             CloseableIterator<Row> rowIter = batch.getRows();
             while (rowIter.hasNext()) {
                 Row row = rowIter.next();
                 scanFiles.add(row);
             }
         }
+        LOGGER.info("Number of files to scan: {}", scanFiles.size());
         locationConstraints = getPartitions(appCtx);
         configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
         distributeFiles(scanFiles, getPartitionConstraint().getLocations().length);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 923dbd4..7df9b47 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -744,4 +744,9 @@
         return dataset.getDatasetType() == DatasetType.INTERNAL
                 && dataset.getDatasetFormatInfo().getFormat() == DatasetConfig.DatasetFormat.COLUMN;
     }
+
+    public static boolean isDeltaTable(Dataset dataset) {
+        return dataset.getDatasetType() == DatasetType.EXTERNAL && ExternalDataUtils
+                .isDeltaTable(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index dda2111..5939290 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.metadata.utils;
 
+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
 import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 
@@ -40,6 +41,7 @@
 import org.apache.asterix.common.metadata.MetadataConstants;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.input.filter.NoOpDeltaTableFilterEvaluatorFactory;
 import org.apache.asterix.external.input.filter.NoOpExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.metadata.dataset.DatasetFormatInfo;
@@ -49,6 +51,7 @@
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.utils.filter.ColumnFilterBuilder;
 import org.apache.asterix.metadata.utils.filter.ColumnRangeFilterBuilder;
+import org.apache.asterix.metadata.utils.filter.DeltaTableFilterBuilder;
 import org.apache.asterix.metadata.utils.filter.ExternalFilterBuilder;
 import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.base.IAObject;
@@ -335,16 +338,25 @@
     public static IExternalFilterEvaluatorFactory createExternalFilterEvaluatorFactory(JobGenContext context,
             IVariableTypeEnvironment typeEnv, IProjectionFiltrationInfo projectionFiltrationInfo,
             Map<String, String> properties) throws AlgebricksException {
-        if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) {
-            return NoOpExternalFilterEvaluatorFactory.INSTANCE;
+        if (isDeltaTable(properties)) {
+            if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) {
+                return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE;
+            } else {
+                DeltaTableFilterBuilder builder = new DeltaTableFilterBuilder(
+                        (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo, context, typeEnv);
+                return builder.build();
+            }
+        } else {
+            if (projectionFiltrationInfo == DefaultProjectionFiltrationInfo.INSTANCE) {
+                return NoOpExternalFilterEvaluatorFactory.INSTANCE;
+            } else {
+                ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
+                ExternalDatasetProjectionFiltrationInfo pfi =
+                        (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo;
+                ExternalFilterBuilder build = new ExternalFilterBuilder(pfi, context, typeEnv, prefix);
+                return build.build();
+            }
         }
-
-        ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
-        ExternalDatasetProjectionFiltrationInfo pfi =
-                (ExternalDatasetProjectionFiltrationInfo) projectionFiltrationInfo;
-        ExternalFilterBuilder build = new ExternalFilterBuilder(pfi, context, typeEnv, prefix);
-
-        return build.build();
     }
 
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
index ce39220..f41a116 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/AbstractFilterBuilder.java
@@ -100,7 +100,7 @@
         return argsEvalFactories;
     }
 
-    private IFunctionDescriptor resolveFunction(AbstractFunctionCallExpression funcExpr) throws AlgebricksException {
+    protected IFunctionDescriptor resolveFunction(AbstractFunctionCallExpression funcExpr) throws AlgebricksException {
         MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
         IFunctionManager functionManager = metadataProvider.getFunctionManager();
         FunctionIdentifier fnId = funcExpr.getFunctionIdentifier();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java
new file mode 100644
index 0000000..73ed81e
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/DeltaTableFilterBuilder.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils.filter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.input.filter.DeltaTableFilterEvaluatorFactory;
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt16;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AInt8;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.logging.log4j.LogManager;
+
+import com.microsoft.azure.storage.core.Logger;
+
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Predicate;
+
+public class DeltaTableFilterBuilder extends AbstractFilterBuilder {
+
+    private static final org.apache.logging.log4j.Logger LOGGER = LogManager.getLogger();
+
+    public DeltaTableFilterBuilder(ExternalDatasetProjectionFiltrationInfo projectionFiltrationInfo,
+            JobGenContext context, IVariableTypeEnvironment typeEnv) {
+        super(projectionFiltrationInfo.getFilterPaths(), projectionFiltrationInfo.getFilterExpression(), context,
+                typeEnv);
+    }
+
+    public IExternalFilterEvaluatorFactory build() throws AlgebricksException {
+        Expression deltaTablePredicate = null;
+        if (filterExpression != null) {
+            try {
+                deltaTablePredicate = createExpression(filterExpression);
+            } catch (Exception e) {
+                LOGGER.error("Error creating DeltaTable filter expression ", e);
+            }
+        }
+        if (deltaTablePredicate != null && !(deltaTablePredicate instanceof Predicate)) {
+            deltaTablePredicate = null;
+        }
+        return new DeltaTableFilterEvaluatorFactory(deltaTablePredicate);
+    }
+
+    protected Expression createExpression(ILogicalExpression expression) throws AlgebricksException {
+        if (filterPaths.containsKey(expression)) {
+            // Path expression, create a value accessor (i.e., a column reader)
+            return createColumnExpression(expression);
+        } else if (expression.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
+            return createLiteralExpression(expression);
+        } else if (expression.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+            return handleFunction(expression);
+        }
+
+        /*
+         * A variable expression: This should not happen as the provided filter expression is inlined.
+         * If a variable was encountered for some reason, it should only be the record variable. If the record variable
+         * was encountered, that means there's a missing value path the compiler didn't provide.
+         */
+        throw new RuntimeException("Unsupported expression " + expression + ". the provided paths are: " + filterPaths);
+    }
+
+    private Expression createLiteralExpression(ILogicalExpression expression) throws AlgebricksException {
+        ConstantExpression constExpr = (ConstantExpression) expression;
+        if (constExpr.getValue().isNull() || constExpr.getValue().isMissing()) {
+            throw new RuntimeException("Unsupported literal type: " + constExpr.getValue());
+        }
+        AsterixConstantValue constantValue = (AsterixConstantValue) constExpr.getValue();
+        switch (constantValue.getObject().getType().getTypeTag()) {
+            case STRING:
+                return Literal.ofString(((AString) constantValue.getObject()).getStringValue());
+            case TINYINT:
+                return Literal.ofByte(((AInt8) constantValue.getObject()).getByteValue());
+            case SMALLINT:
+                return Literal.ofShort(((AInt16) constantValue.getObject()).getShortValue());
+            case INTEGER:
+                return Literal.ofInt(((AInt32) constantValue.getObject()).getIntegerValue());
+            case BOOLEAN:
+                return Literal.ofBoolean(constantValue.isTrue());
+            case BIGINT:
+                return Literal.ofLong(((AInt64) constantValue.getObject()).getLongValue());
+            case DOUBLE:
+                return Literal.ofDouble(((ADouble) constantValue.getObject()).getDoubleValue());
+            case DATE:
+                return Literal.ofDate(((ADate) constantValue.getObject()).getChrononTimeInDays());
+            case DATETIME:
+                Long millis = ((ADateTime) constantValue.getObject()).getChrononTime();
+                return Literal.ofTimestamp(TimeUnit.MILLISECONDS.toMicros(millis));
+            default:
+                throw new RuntimeException("Unsupported literal type: " + constantValue.getObject().getType());
+        }
+    }
+
+    @Override
+    protected IScalarEvaluatorFactory createValueAccessor(ILogicalExpression expression) {
+        return null;
+    }
+
+    private Expression handleFunction(ILogicalExpression expr) throws AlgebricksException {
+        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+        IFunctionDescriptor fd = resolveFunction(funcExpr);
+        List<Expression> args = handleArgs(funcExpr);
+        FunctionIdentifier fid = fd.getIdentifier();
+        if (fid.equals(AlgebricksBuiltinFunctions.AND)) {
+            return new Predicate("AND", args);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.OR)) {
+            return new Predicate("OR", args);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.EQ)) {
+            return new Predicate("=", args);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.GE)) {
+            return new Predicate(">=", args);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.GT)) {
+            return new Predicate(">", args);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.LE)) {
+            return new Predicate("<=", args);
+        } else if (fid.equals(AlgebricksBuiltinFunctions.LT)) {
+            return new Predicate("<", args);
+        } else {
+            throw new RuntimeException("Unsupported function: " + funcExpr);
+        }
+    }
+
+    private List<Expression> handleArgs(AbstractFunctionCallExpression funcExpr) throws AlgebricksException {
+        List<Mutable<ILogicalExpression>> args = funcExpr.getArguments();
+        List<Expression> argsExpressions = new ArrayList<>();
+        for (int i = 0; i < args.size(); i++) {
+            ILogicalExpression expr = args.get(i).getValue();
+            Expression evalFactory = createExpression(expr);
+            argsExpressions.add(evalFactory);
+        }
+        return argsExpressions;
+    }
+
+    protected Column createColumnExpression(ILogicalExpression expression) {
+        ARecordType path = filterPaths.get(expression);
+        if (path.getFieldNames().length != 1) {
+            throw new RuntimeException("Unsupported expression: " + expression);
+        }
+        return new Column(path.getFieldNames()[0]);
+    }
+}