[ASTERIXDB-3602][EXT] Reading Delta tables with ISO8601 formatted timestamp partition

- user model changes: no
- storage format changes: no
- interface changes: no

Ext-ref: MB-66229

Change-Id: I2cf9e532882dad8c778b43d72f3f1bcab37d5a00
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19693
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
index 4080f4c..d31fb66 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
@@ -449,6 +449,9 @@
         loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one", PARQUET_FILTER, "delta-data/");
         loadDeltaDirectory(generatedDataBasePath, "/partitioned_delta_table", PARQUET_FILTER, "delta-data/");
         loadDeltaDirectory(generatedDataBasePath, "/partitioned_delta_table/_delta_log", JSON_FILTER, "delta-data/");
+        loadDeltaDirectory(generatedDataBasePath, "/timestamp_partitioned_delta_table", PARQUET_FILTER, "delta-data/");
+        loadDeltaDirectory(generatedDataBasePath, "/timestamp_partitioned_delta_table/_delta_log", JSON_FILTER,
+                "delta-data/");
     }
 
     private static void loadDeltaDirectory(String dataBasePath, String rootPath, FilenameFilter filter,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
index 7c8840a..a3b20f8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
@@ -48,6 +48,7 @@
 import io.delta.standalone.types.StringType;
 import io.delta.standalone.types.StructField;
 import io.delta.standalone.types.StructType;
+import io.delta.standalone.types.TimestampType;
 
 public class DeltaTableGenerator {
     public static final String DELTA_GEN_BASEDIR = "target" + File.separatorChar + "generated_delta_files";
@@ -63,6 +64,8 @@
             "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_file_size_nine";
     public static final String DELTA_PARTITIONED_TABLE =
             "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "partitioned_delta_table";
+    public static final String DELTA_TIMESTAMP_PARTITIONED_TABLE = "target" + File.separatorChar
+            + "generated_delta_files" + File.separatorChar + "timestamp_partitioned_delta_table";
 
     public static void prepareDeltaTableContainer(Configuration conf) {
         File basePath = new File(".");
@@ -73,6 +76,7 @@
         prepareFileSizeOne(conf);
         prepareFileSizeNine(conf);
         preparePartitionedTable(conf);
+        prepareTimestampPartitionedTable(conf);
     }
 
     public static void cleanBinaryDirectory(File localDataRoot, String binaryFilesPath) {
@@ -498,4 +502,129 @@
         }
     }
 
+    public static void prepareTimestampPartitionedTable(Configuration conf) {
+        Schema schema = SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name")
+                .requiredString("timestamp").endRecord();
+        try {
+            List<GenericData.Record> fileFirstSnapshotRecords = List.of(new GenericData.Record(schema),
+                    new GenericData.Record(schema), new GenericData.Record(schema));
+            List<GenericData.Record> fileSecondSnapshotRecords =
+                    List.of(new GenericData.Record(schema), new GenericData.Record(schema));
+            List<GenericData.Record> fileThirdSnapshotRecords =
+                    List.of(new GenericData.Record(schema), new GenericData.Record(schema));
+            List<GenericData.Record> fileFourthSnapshotRecords =
+                    List.of(new GenericData.Record(schema), new GenericData.Record(schema));
+
+            fileFirstSnapshotRecords.get(0).put("id", 0);
+            fileFirstSnapshotRecords.get(0).put("name", "Order 1");
+            fileFirstSnapshotRecords.get(0).put("timestamp", "2025-01-01 00:01:20");
+
+            fileFirstSnapshotRecords.get(1).put("id", 1);
+            fileFirstSnapshotRecords.get(1).put("name", "Order 2");
+            fileFirstSnapshotRecords.get(1).put("timestamp", "2025-01-01 00:01:20.00");
+
+            fileFirstSnapshotRecords.get(2).put("id", 2);
+            fileFirstSnapshotRecords.get(2).put("name", "Order 3");
+            fileFirstSnapshotRecords.get(2).put("timestamp", "2025-01-01 00:01:20.000000");
+
+            fileSecondSnapshotRecords.get(0).put("id", 3);
+            fileSecondSnapshotRecords.get(0).put("name", "Order 10");
+            fileSecondSnapshotRecords.get(0).put("timestamp", "2025-01-01T00:01:30Z");
+
+            fileSecondSnapshotRecords.get(1).put("id", 4);
+            fileSecondSnapshotRecords.get(1).put("name", "Order 11");
+            fileSecondSnapshotRecords.get(1).put("timestamp", "2025-01-01T00T01:30.000000Z");
+
+            fileThirdSnapshotRecords.get(0).put("id", 5);
+            fileThirdSnapshotRecords.get(0).put("name", "Order 21");
+            fileThirdSnapshotRecords.get(0).put("timestamp", "2025-01-02 00:02:20.100");
+
+            fileThirdSnapshotRecords.get(1).put("id", 6);
+            fileThirdSnapshotRecords.get(1).put("name", "Order 22");
+            fileThirdSnapshotRecords.get(1).put("timestamp", "2025-01-02 00:02:20.100");
+
+            fileFourthSnapshotRecords.get(0).put("id", 7);
+            fileFourthSnapshotRecords.get(0).put("name", "Order 30");
+            fileFourthSnapshotRecords.get(0).put("timestamp", "2025-01-02T00:02:30.100000Z");
+
+            fileFourthSnapshotRecords.get(1).put("id", 8);
+            fileFourthSnapshotRecords.get(1).put("name", "Order 31");
+            fileFourthSnapshotRecords.get(1).put("timestamp", "2025-01-02T00:02:30.100000Z");
+
+            Path path = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, "firstFile.parquet");
+            ParquetWriter<GenericData.Record> writer =
+                    AvroParquetWriter.<GenericData.Record> builder(path).withConf(conf).withSchema(schema).build();
+            for (GenericData.Record record : fileFirstSnapshotRecords) {
+                writer.write(record);
+            }
+            long size = writer.getDataSize();
+            writer.close();
+
+            Path path2 = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, "secondFile.parquet");
+            ParquetWriter<GenericData.Record> writer2 =
+                    AvroParquetWriter.<GenericData.Record> builder(path2).withConf(conf).withSchema(schema).build();
+            for (GenericData.Record record : fileSecondSnapshotRecords) {
+                writer2.write(record);
+            }
+            long size2 = writer2.getDataSize();
+            writer2.close();
+
+            Path path3 = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, "thirdFile.parquet");
+            ParquetWriter<GenericData.Record> writer3 =
+                    AvroParquetWriter.<GenericData.Record> builder(path3).withConf(conf).withSchema(schema).build();
+            for (GenericData.Record record : fileThirdSnapshotRecords) {
+                writer3.write(record);
+            }
+            long size3 = writer3.getDataSize();
+            writer3.close();
+
+            Path path4 = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, "fourthFile.parquet");
+            ParquetWriter<GenericData.Record> writer4 =
+                    AvroParquetWriter.<GenericData.Record> builder(path4).withConf(conf).withSchema(schema).build();
+            for (GenericData.Record record : fileFourthSnapshotRecords) {
+                writer4.write(record);
+            }
+            long size4 = writer4.getDataSize();
+            writer4.close();
+
+            DeltaLog log = DeltaLog.forTable(conf, DELTA_TIMESTAMP_PARTITIONED_TABLE);
+            OptimisticTransaction txn = log.startTransaction();
+            Metadata metaData = txn.metadata().copyBuilder().partitionColumns(Arrays.asList("timestamp"))
+                    .schema(new StructType().add(new StructField("id", new IntegerType(), true))
+                            .add(new StructField("name", new StringType(), true))
+                            .add(new StructField("timestamp", new TimestampType(), true)))
+                    .build();
+
+            Map<String, String> partitionValues = new HashMap<>();
+            partitionValues.put("timestamp", "2025-01-01 00:01:20");
+            List<Action> actions = List.of(new AddFile("firstFile.parquet", partitionValues, size,
+                    System.currentTimeMillis(), true, null, null));
+            txn.updateMetadata(metaData);
+            txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create");
+
+            txn = log.startTransaction();
+            partitionValues.clear();
+            partitionValues.put("timestamp", "2025-01-01T00:01:30Z");
+            actions = List.of(new AddFile("secondFile.parquet", partitionValues, size2, System.currentTimeMillis(),
+                    true, null, null));
+            txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create");
+
+            txn = log.startTransaction();
+            partitionValues.clear();
+            partitionValues.put("timestamp", "2025-01-02 00:02:20.100");
+            actions = List.of(new AddFile("thirdFile.parquet", partitionValues, size3, System.currentTimeMillis(), true,
+                    null, null));
+            txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create");
+
+            txn = log.startTransaction();
+            partitionValues.clear();
+            partitionValues.put("timestamp", "2025-01-02T00:02:30.100000Z");
+            actions = List.of(new AddFile("fourthFile.parquet", partitionValues, size4, System.currentTimeMillis(),
+                    true, null, null));
+            txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create");
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp
new file mode 100644
index 0000000..da4b0e8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter%
+ (
+   %template%,
+   ("container"="playground"),
+   ("definition"="delta-data/timestamp_partitioned_delta_table"),
+   ("table-format" = "delta")
+ );
+
+  CREATE EXTERNAL COLLECTION DeltalakeDataset2(DeltalakeTableType) USING %adapter%
+  (
+    %template%,
+    ("container"="playground"),
+    ("definition"="delta-data/timestamp_partitioned_delta_table"),
+    ("table-format" = "delta"),
+    ("timestamp-to-long" = "false") 
+  );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp
new file mode 100644
index 0000000..bfd6581
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset as ds order by ds.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp
new file mode 100644
index 0000000..a84d381
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE test;
+
+ SELECT element ds FROM DeltalakeDataset2 as ds where ds.timestamp=datetime("2025-01-01T00:01:20Z") order by ds.id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.1.adm
new file mode 100644
index 0000000..dabfb4f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.1.adm
@@ -0,0 +1,9 @@
+{ "id": 0, "name": "Order 1", "timestamp": 1735689680000 }
+{ "id": 1, "name": "Order 2", "timestamp": 1735689680000 }
+{ "id": 2, "name": "Order 3", "timestamp": 1735689680000 }
+{ "id": 3, "name": "Order 10", "timestamp": 1735689690000 }
+{ "id": 4, "name": "Order 11", "timestamp": 1735689690000 }
+{ "id": 5, "name": "Order 21", "timestamp": 1735776140100 }
+{ "id": 6, "name": "Order 22", "timestamp": 1735776140100 }
+{ "id": 7, "name": "Order 30", "timestamp": 1735776150100 }
+{ "id": 8, "name": "Order 31", "timestamp": 1735776150100 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.2.adm
new file mode 100644
index 0000000..c185038
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.2.adm
@@ -0,0 +1,3 @@
+{ "id": 0, "name": "Order 1", "timestamp": datetime("2025-01-01T00:01:20.000") }
+{ "id": 1, "name": "Order 2", "timestamp": datetime("2025-01-01T00:01:20.000") }
+{ "id": 2, "name": "Order 3", "timestamp": datetime("2025-01-01T00:01:20.000")
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 38ebf01..c9f9da6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -639,6 +639,12 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset">
+      <compilation-unit name="common/deltalake-timestamp-partitioned-file-read">
+        <placeholder name="adapter" value="S3" />
+        <output-dir compare="Text">common/deltalake-timestamp-partitioned-file-read</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
       <compilation-unit name="common/avro/avro-types/avro-map">
         <placeholder name="adapter" value="S3" />
         <placeholder name="path_prefix" value="" />
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DefaultExpressionUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DefaultExpressionUtils.java
new file mode 100644
index 0000000..560360c
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DefaultExpressionUtils.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static io.delta.kernel.internal.util.Preconditions.checkArgument;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import io.delta.kernel.data.ArrayValue;
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.MapValue;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.TimestampNTZType;
+import io.delta.kernel.types.TimestampType;
+
+/**
+ * Utility methods used by the default expression evaluator.
+ */
+class DefaultExpressionUtils {
+    private DefaultExpressionUtils() {
+    }
+
+    static final Comparator<String> STRING_COMPARATOR = (leftOp, rightOp) -> {
+        byte[] leftBytes = leftOp.getBytes(StandardCharsets.UTF_8);
+        byte[] rightBytes = rightOp.getBytes(StandardCharsets.UTF_8);
+        int i = 0;
+        while (i < leftBytes.length && i < rightBytes.length) {
+            if (leftBytes[i] != rightBytes[i]) {
+                return Byte.toUnsignedInt(leftBytes[i]) - Byte.toUnsignedInt(rightBytes[i]);
+            }
+            i++;
+        }
+        return Integer.compare(leftBytes.length, rightBytes.length);
+    };
+
+    static final Comparator<byte[]> BINARY_COMPARTOR = (leftOp, rightOp) -> {
+        int i = 0;
+        while (i < leftOp.length && i < rightOp.length) {
+            if (leftOp[i] != rightOp[i]) {
+                return Byte.toUnsignedInt(leftOp[i]) - Byte.toUnsignedInt(rightOp[i]);
+            }
+            i++;
+        }
+        return Integer.compare(leftOp.length, rightOp.length);
+    };
+
+    /**
+     * Utility method that calculates the nullability result from given two vectors. Result is
+     * null if at least one side is a null.
+     */
+    static boolean[] evalNullability(ColumnVector left, ColumnVector right) {
+        int numRows = left.getSize();
+        boolean[] nullability = new boolean[numRows];
+        for (int rowId = 0; rowId < numRows; rowId++) {
+            nullability[rowId] = left.isNullAt(rowId) || right.isNullAt(rowId);
+        }
+        return nullability;
+    }
+
+    /**
+     * Wraps a child vector as a boolean {@link ColumnVector} with the given value and nullability
+     * accessors.
+     */
+    static ColumnVector booleanWrapperVector(ColumnVector childVector, Function<Integer, Boolean> valueAccessor,
+            Function<Integer, Boolean> nullabilityAccessor) {
+
+        return new ColumnVector() {
+
+            @Override
+            public DataType getDataType() {
+                return BooleanType.BOOLEAN;
+            }
+
+            @Override
+            public int getSize() {
+                return childVector.getSize();
+            }
+
+            @Override
+            public void close() {
+                childVector.close();
+            }
+
+            @Override
+            public boolean isNullAt(int rowId) {
+                return nullabilityAccessor.apply(rowId);
+            }
+
+            @Override
+            public boolean getBoolean(int rowId) {
+                return valueAccessor.apply(rowId);
+            }
+        };
+    }
+
+    /**
+     * Utility method to compare the left and right according to the natural ordering
+     * and return an integer array where each row contains the comparison result (-1, 0, 1) for
+     * corresponding rows in the input vectors compared.
+     * <p>
+     * Only primitive data types are supported.
+     */
+    static int[] compare(ColumnVector left, ColumnVector right) {
+        checkArgument(left.getSize() == right.getSize(), "Left and right operand have different vector sizes.");
+        DataType dataType = left.getDataType();
+
+        int numRows = left.getSize();
+        int[] result = new int[numRows];
+        if (dataType instanceof BooleanType) {
+            compareBoolean(left, right, result);
+        } else if (dataType instanceof ByteType) {
+            compareByte(left, right, result);
+        } else if (dataType instanceof ShortType) {
+            compareShort(left, right, result);
+        } else if (dataType instanceof IntegerType || dataType instanceof DateType) {
+            compareInt(left, right, result);
+        } else if (dataType instanceof LongType || dataType instanceof TimestampType
+                || dataType instanceof TimestampNTZType) {
+            compareLong(left, right, result);
+        } else if (dataType instanceof FloatType) {
+            compareFloat(left, right, result);
+        } else if (dataType instanceof DoubleType) {
+            compareDouble(left, right, result);
+        } else if (dataType instanceof DecimalType) {
+            compareDecimal(left, right, result);
+        } else if (dataType instanceof StringType) {
+            compareString(left, right, result);
+        } else if (dataType instanceof BinaryType) {
+            compareBinary(left, right, result);
+        } else {
+            throw new UnsupportedOperationException(dataType + " can not be compared.");
+        }
+        return result;
+    }
+
+    static void compareBoolean(ColumnVector left, ColumnVector right, int[] result) {
+        for (int rowId = 0; rowId < left.getSize(); rowId++) {
+            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+                result[rowId] = Boolean.compare(left.getBoolean(rowId), right.getBoolean(rowId));
+            }
+        }
+    }
+
+    static void compareByte(ColumnVector left, ColumnVector right, int[] result) {
+        for (int rowId = 0; rowId < left.getSize(); rowId++) {
+            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+                result[rowId] = Byte.compare(left.getByte(rowId), right.getByte(rowId));
+            }
+        }
+    }
+
+    static void compareShort(ColumnVector left, ColumnVector right, int[] result) {
+        for (int rowId = 0; rowId < left.getSize(); rowId++) {
+            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+                result[rowId] = Short.compare(left.getShort(rowId), right.getShort(rowId));
+            }
+        }
+    }
+
+    static void compareInt(ColumnVector left, ColumnVector right, int[] result) {
+        for (int rowId = 0; rowId < left.getSize(); rowId++) {
+            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+                result[rowId] = Integer.compare(left.getInt(rowId), right.getInt(rowId));
+            }
+        }
+    }
+
+    static void compareLong(ColumnVector left, ColumnVector right, int[] result) {
+        for (int rowId = 0; rowId < left.getSize(); rowId++) {
+            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+                result[rowId] = Long.compare(left.getLong(rowId), right.getLong(rowId));
+            }
+        }
+    }
+
+    static void compareFloat(ColumnVector left, ColumnVector right, int[] result) {
+        for (int rowId = 0; rowId < left.getSize(); rowId++) {
+            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+                result[rowId] = Float.compare(left.getFloat(rowId), right.getFloat(rowId));
+            }
+        }
+    }
+
+    static void compareDouble(ColumnVector left, ColumnVector right, int[] result) {
+        for (int rowId = 0; rowId < left.getSize(); rowId++) {
+            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+                result[rowId] = Double.compare(left.getDouble(rowId), right.getDouble(rowId));
+            }
+        }
+    }
+
+    static void compareString(ColumnVector left, ColumnVector right, int[] result) {
+        for (int rowId = 0; rowId < left.getSize(); rowId++) {
+            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+                result[rowId] = STRING_COMPARATOR.compare(left.getString(rowId), right.getString(rowId));
+            }
+        }
+    }
+
+    static void compareDecimal(ColumnVector left, ColumnVector right, int[] result) {
+        Comparator<BigDecimal> comparator = Comparator.naturalOrder();
+        for (int rowId = 0; rowId < left.getSize(); rowId++) {
+            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+                result[rowId] = comparator.compare(left.getDecimal(rowId), right.getDecimal(rowId));
+            }
+        }
+    }
+
+    static void compareBinary(ColumnVector left, ColumnVector right, int[] result) {
+        for (int rowId = 0; rowId < left.getSize(); rowId++) {
+            if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) {
+                result[rowId] = BINARY_COMPARTOR.compare(left.getBinary(rowId), right.getBinary(rowId));
+            }
+        }
+    }
+
+    static Expression childAt(Expression expression, int index) {
+        return expression.getChildren().get(index);
+    }
+
+    /**
+     * Combines a list of column vectors into one column vector based on the resolution of
+     * idxToReturn
+     * @param vectors List of ColumnVectors of the same data type with length >= 1
+     * @param idxToReturn Function that takes in a rowId and returns the index of the column vector
+     *                    to use as the return value
+     */
+    static ColumnVector combinationVector(List<ColumnVector> vectors, Function<Integer, Integer> idxToReturn) {
+        return new ColumnVector() {
+            // Store the last lookup value to avoid multiple looks up for same rowId.
+            // The general pattern is call `isNullAt(rowId)` followed by `getBoolean(rowId)` or
+            // some other value accessor. So the cache of one value is enough.
+            private int lastLookupRowId = -1;
+            private ColumnVector lastLookupVector = null;
+
+            @Override
+            public DataType getDataType() {
+                return vectors.get(0).getDataType();
+            }
+
+            @Override
+            public int getSize() {
+                return vectors.get(0).getSize();
+            }
+
+            @Override
+            public void close() {
+                Utils.closeCloseables(vectors.toArray(new ColumnVector[0]));
+            }
+
+            @Override
+            public boolean isNullAt(int rowId) {
+                return getVector(rowId).isNullAt(rowId);
+            }
+
+            @Override
+            public boolean getBoolean(int rowId) {
+                return getVector(rowId).getBoolean(rowId);
+            }
+
+            @Override
+            public byte getByte(int rowId) {
+                return getVector(rowId).getByte(rowId);
+            }
+
+            @Override
+            public short getShort(int rowId) {
+                return getVector(rowId).getShort(rowId);
+            }
+
+            @Override
+            public int getInt(int rowId) {
+                return getVector(rowId).getInt(rowId);
+            }
+
+            @Override
+            public long getLong(int rowId) {
+                return getVector(rowId).getLong(rowId);
+            }
+
+            @Override
+            public float getFloat(int rowId) {
+                return getVector(rowId).getFloat(rowId);
+            }
+
+            @Override
+            public double getDouble(int rowId) {
+                return getVector(rowId).getDouble(rowId);
+            }
+
+            @Override
+            public byte[] getBinary(int rowId) {
+                return getVector(rowId).getBinary(rowId);
+            }
+
+            @Override
+            public String getString(int rowId) {
+                return getVector(rowId).getString(rowId);
+            }
+
+            @Override
+            public BigDecimal getDecimal(int rowId) {
+                return getVector(rowId).getDecimal(rowId);
+            }
+
+            @Override
+            public MapValue getMap(int rowId) {
+                return getVector(rowId).getMap(rowId);
+            }
+
+            @Override
+            public ArrayValue getArray(int rowId) {
+                return getVector(rowId).getArray(rowId);
+            }
+
+            @Override
+            public ColumnVector getChild(int ordinal) {
+                return combinationVector(vectors.stream().map(v -> v.getChild(ordinal)).collect(Collectors.toList()),
+                        idxToReturn);
+            }
+
+            private ColumnVector getVector(int rowId) {
+                if (rowId == lastLookupRowId) {
+                    return lastLookupVector;
+                }
+                lastLookupRowId = rowId;
+                lastLookupVector = vectors.get(idxToReturn.apply(rowId));
+                return lastLookupVector;
+            }
+        };
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java
new file mode 100644
index 0000000..1b4e12e
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import org.apache.hadoop.conf.Configuration;
+
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.ExpressionHandler;
+
+public class DeltaEngine extends DefaultEngine {
+
+    protected DeltaEngine(Configuration configuration) {
+        super(configuration);
+    }
+
+    @Override
+    public ExpressionHandler getExpressionHandler() {
+        return new DeltaExpressionHandler();
+    }
+
+    public static DeltaEngine create(Configuration configuration) {
+        return new DeltaEngine(configuration);
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionEvaluator.java
new file mode 100644
index 0000000..b3082b5
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionEvaluator.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static io.delta.kernel.internal.util.ExpressionUtils.getLeft;
+import static io.delta.kernel.internal.util.ExpressionUtils.getRight;
+import static io.delta.kernel.internal.util.ExpressionUtils.getUnaryChild;
+import static io.delta.kernel.internal.util.Preconditions.checkArgument;
+import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.booleanWrapperVector;
+import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.childAt;
+import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.compare;
+import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.evalNullability;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.defaults.internal.data.vector.DefaultBooleanVector;
+import io.delta.kernel.defaults.internal.data.vector.DefaultConstantVector;
+import io.delta.kernel.defaults.internal.expressions.DefaultExpressionEvaluator;
+import io.delta.kernel.expressions.AlwaysFalse;
+import io.delta.kernel.expressions.AlwaysTrue;
+import io.delta.kernel.expressions.And;
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Or;
+import io.delta.kernel.expressions.PartitionValueExpression;
+import io.delta.kernel.expressions.Predicate;
+import io.delta.kernel.expressions.ScalarExpression;
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampNTZType;
+import io.delta.kernel.types.TimestampType;
+
+public class DeltaExpressionEvaluator extends DefaultExpressionEvaluator {
+
+    private final Expression expression;
+
+    public DeltaExpressionEvaluator(StructType structType, Expression expression, DataType dataType) {
+        super(structType, expression, dataType);
+        this.expression = expression;
+    }
+
+    @Override
+    public ColumnVector eval(ColumnarBatch input) {
+        return new ExpressionEvalVisitor(input).visit(expression);
+    }
+
+    /**
+     * Implementation of {@link ExpressionVisitor} to evaluate expression on a
+     * {@link ColumnarBatch}.
+     */
+    private static class ExpressionEvalVisitor extends ExpressionVisitor<ColumnVector> {
+        private final ColumnarBatch input;
+
+        ExpressionEvalVisitor(ColumnarBatch input) {
+            this.input = input;
+        }
+
+        /*
+        | Operand 1 | Operand 2 | `AND`      | `OR`       |
+        |-----------|-----------|------------|------------|
+        | True      | True      | True       | True       |
+        | True      | False     | False      | True       |
+        | True      | NULL      | NULL       | True       |
+        | False     | True      | False      | True       |
+        | False     | False     | False      | False      |
+        | False     | NULL      | False      | NULL       |
+        | NULL      | True      | NULL       | True       |
+        | NULL      | False     | False      | NULL       |
+        | NULL      | NULL      | NULL       | NULL       |
+         */
+        @Override
+        ColumnVector visitAnd(And and) {
+            PredicateChildrenEvalResult argResults = evalBinaryExpressionChildren(and);
+            ColumnVector left = argResults.leftResult;
+            ColumnVector right = argResults.rightResult;
+            int numRows = argResults.rowCount;
+            boolean[] result = new boolean[numRows];
+            boolean[] nullability = new boolean[numRows];
+            for (int rowId = 0; rowId < numRows; rowId++) {
+                boolean leftIsTrue = !left.isNullAt(rowId) && left.getBoolean(rowId);
+                boolean rightIsTrue = !right.isNullAt(rowId) && right.getBoolean(rowId);
+                boolean leftIsFalse = !left.isNullAt(rowId) && !left.getBoolean(rowId);
+                boolean rightIsFalse = !right.isNullAt(rowId) && !right.getBoolean(rowId);
+
+                if (leftIsFalse || rightIsFalse) {
+                    nullability[rowId] = false;
+                    result[rowId] = false;
+                } else if (leftIsTrue && rightIsTrue) {
+                    nullability[rowId] = false;
+                    result[rowId] = true;
+                } else {
+                    nullability[rowId] = true;
+                    // result[rowId] is undefined when nullability[rowId] = true
+                }
+            }
+            return new DefaultBooleanVector(numRows, Optional.of(nullability), result);
+        }
+
+        @Override
+        ColumnVector visitOr(Or or) {
+            PredicateChildrenEvalResult argResults = evalBinaryExpressionChildren(or);
+            ColumnVector left = argResults.leftResult;
+            ColumnVector right = argResults.rightResult;
+            int numRows = argResults.rowCount;
+            boolean[] result = new boolean[numRows];
+            boolean[] nullability = new boolean[numRows];
+            for (int rowId = 0; rowId < numRows; rowId++) {
+                boolean leftIsTrue = !left.isNullAt(rowId) && left.getBoolean(rowId);
+                boolean rightIsTrue = !right.isNullAt(rowId) && right.getBoolean(rowId);
+                boolean leftIsFalse = !left.isNullAt(rowId) && !left.getBoolean(rowId);
+                boolean rightIsFalse = !right.isNullAt(rowId) && !right.getBoolean(rowId);
+
+                if (leftIsTrue || rightIsTrue) {
+                    nullability[rowId] = false;
+                    result[rowId] = true;
+                } else if (leftIsFalse && rightIsFalse) {
+                    nullability[rowId] = false;
+                    result[rowId] = false;
+                } else {
+                    nullability[rowId] = true;
+                    // result[rowId] is undefined when nullability[rowId] = true
+                }
+            }
+            return new DefaultBooleanVector(numRows, Optional.of(nullability), result);
+        }
+
+        @Override
+        ColumnVector visitAlwaysTrue(AlwaysTrue alwaysTrue) {
+            return new DefaultConstantVector(BooleanType.BOOLEAN, input.getSize(), true);
+        }
+
+        @Override
+        ColumnVector visitAlwaysFalse(AlwaysFalse alwaysFalse) {
+            return new DefaultConstantVector(BooleanType.BOOLEAN, input.getSize(), false);
+        }
+
+        @Override
+        ColumnVector visitComparator(Predicate predicate) {
+            PredicateChildrenEvalResult argResults = evalBinaryExpressionChildren(predicate);
+
+            int numRows = argResults.rowCount;
+            boolean[] result = new boolean[numRows];
+            boolean[] nullability = evalNullability(argResults.leftResult, argResults.rightResult);
+            int[] compareResult = compare(argResults.leftResult, argResults.rightResult);
+            switch (predicate.getName()) {
+                case "=":
+                    for (int rowId = 0; rowId < numRows; rowId++) {
+                        result[rowId] = compareResult[rowId] == 0;
+                    }
+                    break;
+                case ">":
+                    for (int rowId = 0; rowId < numRows; rowId++) {
+                        result[rowId] = compareResult[rowId] > 0;
+                    }
+                    break;
+                case ">=":
+                    for (int rowId = 0; rowId < numRows; rowId++) {
+                        result[rowId] = compareResult[rowId] >= 0;
+                    }
+                    break;
+                case "<":
+                    for (int rowId = 0; rowId < numRows; rowId++) {
+                        result[rowId] = compareResult[rowId] < 0;
+                    }
+                    break;
+                case "<=":
+                    for (int rowId = 0; rowId < numRows; rowId++) {
+                        result[rowId] = compareResult[rowId] <= 0;
+                    }
+                    break;
+                default:
+                    // We should never reach this based on the ExpressionVisitor
+                    throw new IllegalStateException(
+                            String.format("%s is not a recognized comparator", predicate.getName()));
+            }
+
+            return new DefaultBooleanVector(numRows, Optional.of(nullability), result);
+        }
+
+        @Override
+        ColumnVector visitLiteral(Literal literal) {
+            DataType dataType = literal.getDataType();
+            if (dataType instanceof BooleanType || dataType instanceof ByteType || dataType instanceof ShortType
+                    || dataType instanceof IntegerType || dataType instanceof LongType || dataType instanceof FloatType
+                    || dataType instanceof DoubleType || dataType instanceof StringType
+                    || dataType instanceof BinaryType || dataType instanceof DecimalType || dataType instanceof DateType
+                    || dataType instanceof TimestampType || dataType instanceof TimestampNTZType) {
+                return new DefaultConstantVector(dataType, input.getSize(), literal.getValue());
+            }
+
+            throw new UnsupportedOperationException("unsupported expression encountered: " + literal);
+        }
+
+        @Override
+        ColumnVector visitColumn(Column column) {
+            String[] names = column.getNames();
+            DataType currentType = input.getSchema();
+            ColumnVector columnVector = null;
+            for (int level = 0; level < names.length; level++) {
+                assertColumnExists(currentType instanceof StructType, input.getSchema(), column);
+                StructType structSchema = ((StructType) currentType);
+                int ordinal = structSchema.indexOf(names[level]);
+                assertColumnExists(ordinal != -1, input.getSchema(), column);
+                currentType = structSchema.at(ordinal).getDataType();
+
+                if (level == 0) {
+                    columnVector = input.getColumnVector(ordinal);
+                } else {
+                    columnVector = columnVector.getChild(ordinal);
+                }
+            }
+            assertColumnExists(columnVector != null, input.getSchema(), column);
+            return columnVector;
+        }
+
+        @Override
+        ColumnVector visitCast(ImplicitCastExpression cast) {
+            ColumnVector inputResult = visit(cast.getInput());
+            return cast.eval(inputResult);
+        }
+
+        @Override
+        ColumnVector visitPartitionValue(PartitionValueExpression partitionValue) {
+            ColumnVector input = visit(partitionValue.getInput());
+            return PartitionValueEvaluator.eval(input, partitionValue.getDataType());
+        }
+
+        @Override
+        ColumnVector visitElementAt(ScalarExpression elementAt) {
+            ColumnVector map = visit(childAt(elementAt, 0));
+            ColumnVector lookupKey = visit(childAt(elementAt, 1));
+            return ElementAtEvaluator.eval(map, lookupKey);
+        }
+
+        @Override
+        ColumnVector visitNot(Predicate predicate) {
+            ColumnVector childResult = visit(childAt(predicate, 0));
+            return booleanWrapperVector(childResult, rowId -> !childResult.getBoolean(rowId),
+                    rowId -> childResult.isNullAt(rowId));
+        }
+
+        @Override
+        ColumnVector visitIsNotNull(Predicate predicate) {
+            ColumnVector childResult = visit(childAt(predicate, 0));
+            return booleanWrapperVector(childResult, rowId -> !childResult.isNullAt(rowId), rowId -> false);
+        }
+
+        @Override
+        ColumnVector visitIsNull(Predicate predicate) {
+            ColumnVector childResult = visit(getUnaryChild(predicate));
+            return booleanWrapperVector(childResult, rowId -> childResult.isNullAt(rowId), rowId -> false);
+        }
+
+        @Override
+        ColumnVector visitCoalesce(ScalarExpression coalesce) {
+            List<ColumnVector> childResults =
+                    coalesce.getChildren().stream().map(this::visit).collect(Collectors.toList());
+            return DefaultExpressionUtils.combinationVector(childResults, rowId -> {
+                for (int idx = 0; idx < childResults.size(); idx++) {
+                    if (!childResults.get(idx).isNullAt(rowId)) {
+                        return idx;
+                    }
+                }
+                return 0; // If all are null then any idx suffices
+            });
+        }
+
+        /**
+         * Utility method to evaluate inputs to the binary input expression. Also validates the
+         * evaluated expression result {@link ColumnVector}s are of the same size.
+         *
+         * @param predicate
+         * @return Triplet of (result vector size, left operand result, left operand result)
+         */
+        private PredicateChildrenEvalResult evalBinaryExpressionChildren(Predicate predicate) {
+            ColumnVector left = visit(getLeft(predicate));
+            ColumnVector right = visit(getRight(predicate));
+            checkArgument(left.getSize() == right.getSize(),
+                    "Left and right operand returned different results: left=%d, right=d", left.getSize(),
+                    right.getSize());
+            return new PredicateChildrenEvalResult(left.getSize(), left, right);
+        }
+    }
+
+    /**
+     * Encapsulates children expression result of binary input predicate
+     */
+    private static class PredicateChildrenEvalResult {
+        public final int rowCount;
+        public final ColumnVector leftResult;
+        public final ColumnVector rightResult;
+
+        PredicateChildrenEvalResult(int rowCount, ColumnVector leftResult, ColumnVector rightResult) {
+            this.rowCount = rowCount;
+            this.leftResult = leftResult;
+            this.rightResult = rightResult;
+        }
+    }
+
+    private static void assertColumnExists(boolean condition, StructType schema, Column column) {
+        if (!condition) {
+            throw new IllegalArgumentException(
+                    String.format("%s doesn't exist in input data schema: %s", column, schema));
+        }
+    }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionHandler.java
new file mode 100644
index 0000000..d9f5251
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import io.delta.kernel.defaults.engine.DefaultExpressionHandler;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.ExpressionEvaluator;
+import io.delta.kernel.expressions.Predicate;
+import io.delta.kernel.expressions.PredicateEvaluator;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.StructType;
+
+public class DeltaExpressionHandler extends DefaultExpressionHandler {
+
+    @Override
+    public ExpressionEvaluator getEvaluator(StructType inputSchema, Expression expression, DataType outputType) {
+        return new DeltaExpressionEvaluator(inputSchema, expression, outputType);
+    }
+
+    @Override
+    public PredicateEvaluator getPredicateEvaluator(StructType inputSchema, Predicate predicate) {
+        return new DeltaPredicateEvaluator(inputSchema, predicate);
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index 121a76b..44dfdf0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -22,8 +22,14 @@
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -39,15 +45,40 @@
 import org.apache.hyracks.hdfs.dataflow.ConfFactory;
 
 import io.delta.kernel.Scan;
+import io.delta.kernel.data.ColumnVector;
 import io.delta.kernel.data.ColumnarBatch;
 import io.delta.kernel.data.FilteredColumnarBatch;
 import io.delta.kernel.data.Row;
-import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
+import io.delta.kernel.engine.ExpressionHandler;
+import io.delta.kernel.expressions.ExpressionEvaluator;
+import io.delta.kernel.expressions.Literal;
 import io.delta.kernel.expressions.Predicate;
 import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
 import io.delta.kernel.internal.data.ScanStateRow;
+import io.delta.kernel.internal.data.SelectionColumnVector;
+import io.delta.kernel.internal.deletionvectors.DeletionVectorUtils;
+import io.delta.kernel.internal.deletionvectors.RoaringBitmapArray;
+import io.delta.kernel.internal.util.ColumnMapping;
+import io.delta.kernel.internal.util.InternalUtils;
+import io.delta.kernel.internal.util.Tuple2;
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
 import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampNTZType;
+import io.delta.kernel.types.TimestampType;
 import io.delta.kernel.utils.CloseableIterator;
 import io.delta.kernel.utils.FileStatus;
 
@@ -74,7 +105,7 @@
     public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, ConfFactory config,
             String filterExpressionStr) throws HyracksDataException {
         JobConf conf = config.getConf();
-        this.engine = DefaultEngine.create(conf);
+        this.engine = DeltaEngine.create(conf);
         this.scanFiles = new ArrayList<>();
         for (String scanFile : serScanFiles) {
             this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile));
@@ -94,7 +125,7 @@
             try {
                 this.physicalDataIter = engine.getParquetHandler()
                         .readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, filterPredicate);
-                this.dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+                this.dataIter = transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
                 if (dataIter.hasNext()) {
                     rows = dataIter.next().getRows();
                 }
@@ -165,4 +196,182 @@
     public boolean handleException(Throwable th) {
         return false;
     }
+
+    static CloseableIterator<FilteredColumnarBatch> transformPhysicalData(Engine engine, Row scanState, Row scanFile,
+            CloseableIterator<ColumnarBatch> physicalDataIter) throws IOException {
+        return new CloseableIterator<FilteredColumnarBatch>() {
+            boolean inited = false;
+
+            // initialized as part of init()
+            StructType physicalReadSchema = null;
+            StructType logicalReadSchema = null;
+            String tablePath = null;
+
+            RoaringBitmapArray currBitmap = null;
+            DeletionVectorDescriptor currDV = null;
+
+            private void initIfRequired() {
+                if (inited) {
+                    return;
+                }
+                physicalReadSchema = ScanStateRow.getPhysicalSchema(engine, scanState);
+                logicalReadSchema = ScanStateRow.getLogicalSchema(engine, scanState);
+
+                tablePath = ScanStateRow.getTableRoot(scanState);
+                inited = true;
+            }
+
+            @Override
+            public void close() throws IOException {
+                physicalDataIter.close();
+            }
+
+            @Override
+            public boolean hasNext() {
+                initIfRequired();
+                return physicalDataIter.hasNext();
+            }
+
+            @Override
+            public FilteredColumnarBatch next() {
+                initIfRequired();
+                ColumnarBatch nextDataBatch = physicalDataIter.next();
+
+                DeletionVectorDescriptor dv = InternalScanFileUtils.getDeletionVectorDescriptorFromRow(scanFile);
+
+                int rowIndexOrdinal = nextDataBatch.getSchema().indexOf(StructField.METADATA_ROW_INDEX_COLUMN_NAME);
+
+                // Get the selectionVector if DV is present
+                Optional<ColumnVector> selectionVector;
+                if (dv == null) {
+                    selectionVector = Optional.empty();
+                } else {
+                    if (rowIndexOrdinal == -1) {
+                        throw new IllegalArgumentException(
+                                "Row index column is not " + "present in the data read from the Parquet file.");
+                    }
+                    if (!dv.equals(currDV)) {
+                        Tuple2<DeletionVectorDescriptor, RoaringBitmapArray> dvInfo =
+                                DeletionVectorUtils.loadNewDvAndBitmap(engine, tablePath, dv);
+                        this.currDV = dvInfo._1;
+                        this.currBitmap = dvInfo._2;
+                    }
+                    ColumnVector rowIndexVector = nextDataBatch.getColumnVector(rowIndexOrdinal);
+                    selectionVector = Optional.of(new SelectionColumnVector(currBitmap, rowIndexVector));
+                }
+                if (rowIndexOrdinal != -1) {
+                    nextDataBatch = nextDataBatch.withDeletedColumnAt(rowIndexOrdinal);
+                }
+
+                // Add partition columns
+                nextDataBatch = withPartitionColumns(engine.getExpressionHandler(), nextDataBatch,
+                        InternalScanFileUtils.getPartitionValues(scanFile), physicalReadSchema);
+
+                // Change back to logical schema
+                String columnMappingMode = ScanStateRow.getColumnMappingMode(scanState);
+                switch (columnMappingMode) {
+                    case ColumnMapping.COLUMN_MAPPING_MODE_NAME:
+                    case ColumnMapping.COLUMN_MAPPING_MODE_ID:
+                        nextDataBatch = nextDataBatch.withNewSchema(logicalReadSchema);
+                        break;
+                    case ColumnMapping.COLUMN_MAPPING_MODE_NONE:
+                        break;
+                    default:
+                        throw new UnsupportedOperationException(
+                                "Column mapping mode is not yet supported: " + columnMappingMode);
+                }
+
+                return new FilteredColumnarBatch(nextDataBatch, selectionVector);
+            }
+        };
+    }
+
+    public static ColumnarBatch withPartitionColumns(ExpressionHandler expressionHandler, ColumnarBatch dataBatch,
+            Map<String, String> partitionValues, StructType schemaWithPartitionCols) {
+        if (partitionValues == null || partitionValues.size() == 0) {
+            // no partition column vectors to attach to.
+            return dataBatch;
+        }
+
+        for (int colIdx = 0; colIdx < schemaWithPartitionCols.length(); colIdx++) {
+            StructField structField = schemaWithPartitionCols.at(colIdx);
+
+            if (partitionValues.containsKey(structField.getName())) {
+                // Create a partition vector
+
+                ExpressionEvaluator evaluator = expressionHandler.getEvaluator(dataBatch.getSchema(),
+                        literalForPartitionValue(structField.getDataType(), partitionValues.get(structField.getName())),
+                        structField.getDataType());
+
+                ColumnVector partitionVector = evaluator.eval(dataBatch);
+                dataBatch = dataBatch.withNewColumn(colIdx, structField, partitionVector);
+            }
+        }
+
+        return dataBatch;
+    }
+
+    protected static Literal literalForPartitionValue(DataType dataType, String partitionValue) {
+        if (partitionValue == null) {
+            return Literal.ofNull(dataType);
+        }
+
+        if (dataType instanceof BooleanType) {
+            return Literal.ofBoolean(Boolean.parseBoolean(partitionValue));
+        }
+        if (dataType instanceof ByteType) {
+            return Literal.ofByte(Byte.parseByte(partitionValue));
+        }
+        if (dataType instanceof ShortType) {
+            return Literal.ofShort(Short.parseShort(partitionValue));
+        }
+        if (dataType instanceof IntegerType) {
+            return Literal.ofInt(Integer.parseInt(partitionValue));
+        }
+        if (dataType instanceof LongType) {
+            return Literal.ofLong(Long.parseLong(partitionValue));
+        }
+        if (dataType instanceof FloatType) {
+            return Literal.ofFloat(Float.parseFloat(partitionValue));
+        }
+        if (dataType instanceof DoubleType) {
+            return Literal.ofDouble(Double.parseDouble(partitionValue));
+        }
+        if (dataType instanceof StringType) {
+            return Literal.ofString(partitionValue);
+        }
+        if (dataType instanceof BinaryType) {
+            return Literal.ofBinary(partitionValue.getBytes());
+        }
+        if (dataType instanceof DateType) {
+            return Literal.ofDate(InternalUtils.daysSinceEpoch(Date.valueOf(partitionValue)));
+        }
+        if (dataType instanceof DecimalType) {
+            DecimalType decimalType = (DecimalType) dataType;
+            return Literal.ofDecimal(new BigDecimal(partitionValue), decimalType.getPrecision(),
+                    decimalType.getScale());
+        }
+        if (dataType instanceof TimestampType) {
+            try {
+                Timestamp timestamp = Timestamp.valueOf(partitionValue);
+                return Literal.ofTimestamp(InternalUtils.microsSinceEpoch(timestamp));
+            } catch (IllegalArgumentException e) {
+                Instant instant = Instant.parse(partitionValue);
+                return Literal.ofTimestamp(ChronoUnit.MICROS.between(Instant.EPOCH, instant));
+            }
+        }
+        if (dataType instanceof TimestampNTZType) {
+            // Both the timestamp and timestamp_ntz have no timezone info, so they are interpreted
+            // in local time zone.
+            try {
+                Timestamp timestamp = Timestamp.valueOf(partitionValue);
+                return Literal.ofTimestampNtz(InternalUtils.microsSinceEpoch(timestamp));
+            } catch (IllegalArgumentException e) {
+                Instant instant = Instant.parse(partitionValue);
+                return Literal.ofTimestampNtz(ChronoUnit.MICROS.between(Instant.EPOCH, instant));
+            }
+        }
+
+        throw new UnsupportedOperationException("Unsupported partition column: " + dataType);
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaPredicateEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaPredicateEvaluator.java
new file mode 100644
index 0000000..494d3bc
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaPredicateEvaluator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import java.util.Optional;
+
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.defaults.internal.data.vector.DefaultConstantVector;
+import io.delta.kernel.expressions.And;
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.ExpressionEvaluator;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Predicate;
+import io.delta.kernel.expressions.PredicateEvaluator;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+
+public class DeltaPredicateEvaluator implements PredicateEvaluator {
+
+    private final ExpressionEvaluator expressionEvaluator;
+    private static final String EXISTING_SEL_VECTOR_COL_NAME = "____existing_selection_vector_value____";
+    private static final StructField EXISTING_SEL_VECTOR_FIELD =
+            new StructField(EXISTING_SEL_VECTOR_COL_NAME, BooleanType.BOOLEAN, false);
+
+    public DeltaPredicateEvaluator(StructType inputSchema, Predicate predicate) {
+        Predicate rewrittenPredicate = new And(
+                new Predicate("=", new Column(EXISTING_SEL_VECTOR_COL_NAME), Literal.ofBoolean(true)), predicate);
+        StructType rewrittenInputSchema = inputSchema.add(EXISTING_SEL_VECTOR_FIELD);
+        this.expressionEvaluator =
+                new DeltaExpressionEvaluator(rewrittenInputSchema, rewrittenPredicate, BooleanType.BOOLEAN);
+    }
+
+    @Override
+    public ColumnVector eval(ColumnarBatch inputData, Optional<ColumnVector> existingSelectionVector) {
+        try {
+            ColumnVector newVector = existingSelectionVector
+                    .orElse(new DefaultConstantVector(BooleanType.BOOLEAN, inputData.getSize(), true));
+            ColumnarBatch withExistingSelVector =
+                    inputData.withNewColumn(inputData.getSchema().length(), EXISTING_SEL_VECTOR_FIELD, newVector);
+
+            return expressionEvaluator.eval(withExistingSelVector);
+        } finally {
+            // release the existing selection vector.
+            Utils.closeCloseables(existingSelectionVector.orElse(null));
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index b76dd4d..10a527e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -60,7 +60,6 @@
 import io.delta.kernel.Snapshot;
 import io.delta.kernel.data.FilteredColumnarBatch;
 import io.delta.kernel.data.Row;
-import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
 import io.delta.kernel.exceptions.KernelEngineException;
 import io.delta.kernel.exceptions.KernelException;
@@ -104,7 +103,7 @@
         configureJobConf(appCtx, conf, configuration);
         confFactory = new ConfFactory(conf);
         String tableMetadataPath = getTablePath(configuration);
-        Engine engine = DefaultEngine.create(conf);
+        Engine engine = DeltaEngine.create(conf);
         io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
         Snapshot snapshot;
         try {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ElementAtEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ElementAtEvaluator.java
new file mode 100644
index 0000000..388a287
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ElementAtEvaluator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static io.delta.kernel.defaults.internal.DefaultEngineErrors.unsupportedExpressionException;
+import static io.delta.kernel.internal.util.Preconditions.checkArgument;
+import static java.lang.String.format;
+import static org.apache.asterix.external.input.record.reader.aws.delta.ImplicitCastExpression.canCastTo;
+
+import java.util.Arrays;
+
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.MapValue;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.ScalarExpression;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.MapType;
+import io.delta.kernel.types.StringType;
+
+/**
+ * Utility methods to evaluate {@code element_at} expression.
+ */
+class ElementAtEvaluator {
+    private ElementAtEvaluator() {
+    }
+
+    /**
+     * Validate and transform the {@code element_at} expression with given validated and
+     * transformed inputs.
+     */
+    static ScalarExpression validateAndTransform(ScalarExpression elementAt, Expression mapInput, DataType mapInputType,
+            Expression lookupKey, DataType lookupKeyType) {
+
+        MapType asMapType = validateSupportedMapType(elementAt, mapInputType);
+        DataType keyTypeFromMapInput = asMapType.getKeyType();
+
+        if (!keyTypeFromMapInput.equivalent(lookupKeyType)) {
+            if (canCastTo(lookupKeyType, keyTypeFromMapInput)) {
+                lookupKey = new ImplicitCastExpression(lookupKey, keyTypeFromMapInput);
+            } else {
+                String reason = format("lookup key type (%s) is different from the map key type (%s)", lookupKeyType,
+                        asMapType.getKeyType());
+                throw unsupportedExpressionException(elementAt, reason);
+            }
+        }
+        return new ScalarExpression(elementAt.getName(), Arrays.asList(mapInput, lookupKey));
+    }
+
+    /**
+     * Utility method to evaluate the {@code element_at} on given map and key vectors.
+     * @param map {@link ColumnVector} of {@code map(string, string)} type.
+     * @param lookupKey {@link ColumnVector} of {@code string} type.
+     * @return result {@link ColumnVector} containing the lookup values.
+     */
+    static ColumnVector eval(ColumnVector map, ColumnVector lookupKey) {
+        return new ColumnVector() {
+            // Store the last lookup value to avoid multiple looks up for same row id.
+            // The general pattern is call `isNullAt(rowId)` followed by `getString`.
+            // So the cache of one value is enough.
+            private int lastLookupRowId = -1;
+            private String lastLookupValue = null;
+
+            @Override
+            public DataType getDataType() {
+                return ((MapType) map.getDataType()).getValueType();
+            }
+
+            @Override
+            public int getSize() {
+                return map.getSize();
+            }
+
+            @Override
+            public void close() {
+                Utils.closeCloseables(map, lookupKey);
+            }
+
+            @Override
+            public boolean isNullAt(int rowId) {
+                if (rowId == lastLookupRowId) {
+                    return lastLookupValue == null;
+                }
+                return map.isNullAt(rowId) || lookupValue(rowId) == null;
+            }
+
+            @Override
+            public String getString(int rowId) {
+                lookupValue(rowId);
+                return lastLookupValue == null ? null : lastLookupValue;
+            }
+
+            private Object lookupValue(int rowId) {
+                if (rowId == lastLookupRowId) {
+                    return lastLookupValue;
+                }
+                lastLookupRowId = rowId;
+                String keyValue = lookupKey.getString(rowId);
+                lastLookupValue = findValueForKey(map.getMap(rowId), keyValue);
+                return lastLookupValue;
+            }
+
+            /**
+             * Given a {@link MapValue} and string {@code key} find the corresponding value.
+             * Returns null if the key is not in the map.
+             * @param mapValue String->String map to search
+             * @param key the key to look up the value for; may be null
+             */
+            private String findValueForKey(MapValue mapValue, String key) {
+                ColumnVector keyVector = mapValue.getKeys();
+                for (int i = 0; i < mapValue.getSize(); i++) {
+                    if ((keyVector.isNullAt(i) && key == null)
+                            || (!keyVector.isNullAt(i) && keyVector.getString(i).equals(key))) {
+                        return mapValue.getValues().isNullAt(i) ? null : mapValue.getValues().getString(i);
+                    }
+                }
+                // If the key is not in the map return null
+                return null;
+            }
+        };
+    }
+
+    private static MapType validateSupportedMapType(Expression elementAt, DataType mapInputType) {
+        checkArgument(mapInputType instanceof MapType, "expected a map type input as first argument: " + elementAt);
+        MapType asMapType = (MapType) mapInputType;
+        // For now we only need to support lookup in columns of type `map(string -> string)`.
+        // Additional type support may be added later
+        if (asMapType.getKeyType().equivalent(StringType.STRING)
+                && asMapType.getValueType().equivalent(StringType.STRING)) {
+            return asMapType;
+        }
+        throw new UnsupportedOperationException(
+                format("%s: Supported only on type map(string, string) input data", elementAt));
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ExpressionVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ExpressionVisitor.java
new file mode 100644
index 0000000..7e2a65b
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ExpressionVisitor.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static io.delta.kernel.expressions.AlwaysFalse.ALWAYS_FALSE;
+import static io.delta.kernel.expressions.AlwaysTrue.ALWAYS_TRUE;
+import static java.util.stream.Collectors.joining;
+
+import java.util.List;
+import java.util.Locale;
+
+import io.delta.kernel.expressions.AlwaysFalse;
+import io.delta.kernel.expressions.AlwaysTrue;
+import io.delta.kernel.expressions.And;
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.expressions.Or;
+import io.delta.kernel.expressions.PartitionValueExpression;
+import io.delta.kernel.expressions.Predicate;
+import io.delta.kernel.expressions.ScalarExpression;
+
+/**
+ * Interface to allow visiting an expression tree and implementing handling for each
+ * specific expression type.
+ *
+ * @param <R> Return type of result of visit expression methods.
+ */
+abstract class ExpressionVisitor<R> {
+
+    abstract R visitAnd(And and);
+
+    abstract R visitOr(Or or);
+
+    abstract R visitAlwaysTrue(AlwaysTrue alwaysTrue);
+
+    abstract R visitAlwaysFalse(AlwaysFalse alwaysFalse);
+
+    abstract R visitComparator(Predicate predicate);
+
+    abstract R visitLiteral(Literal literal);
+
+    abstract R visitColumn(Column column);
+
+    abstract R visitCast(ImplicitCastExpression cast);
+
+    abstract R visitPartitionValue(PartitionValueExpression partitionValue);
+
+    abstract R visitElementAt(ScalarExpression elementAt);
+
+    abstract R visitNot(Predicate predicate);
+
+    abstract R visitIsNotNull(Predicate predicate);
+
+    abstract R visitIsNull(Predicate predicate);
+
+    abstract R visitCoalesce(ScalarExpression ifNull);
+
+    final R visit(Expression expression) {
+        if (expression instanceof PartitionValueExpression) {
+            return visitPartitionValue((PartitionValueExpression) expression);
+        } else if (expression instanceof ScalarExpression) {
+            return visitScalarExpression((ScalarExpression) expression);
+        } else if (expression instanceof Literal) {
+            return visitLiteral((Literal) expression);
+        } else if (expression instanceof Column) {
+            return visitColumn((Column) expression);
+        } else if (expression instanceof ImplicitCastExpression) {
+            return visitCast((ImplicitCastExpression) expression);
+        }
+
+        throw new UnsupportedOperationException(String.format("Expression %s is not supported.", expression));
+    }
+
+    private R visitScalarExpression(ScalarExpression expression) {
+        List<Expression> children = expression.getChildren();
+        String name = expression.getName().toUpperCase(Locale.ENGLISH);
+        switch (name) {
+            case "ALWAYS_TRUE":
+                return visitAlwaysTrue(ALWAYS_TRUE);
+            case "ALWAYS_FALSE":
+                return visitAlwaysFalse(ALWAYS_FALSE);
+            case "AND":
+                return visitAnd(new And(elemAsPredicate(children, 0), elemAsPredicate(children, 1)));
+            case "OR":
+                return visitOr(new Or(elemAsPredicate(children, 0), elemAsPredicate(children, 1)));
+            case "=":
+            case "<":
+            case "<=":
+            case ">":
+            case ">=":
+                return visitComparator(new Predicate(name, children));
+            case "ELEMENT_AT":
+                return visitElementAt(expression);
+            case "NOT":
+                return visitNot(new Predicate(name, children));
+            case "IS_NOT_NULL":
+                return visitIsNotNull(new Predicate(name, children));
+            case "IS_NULL":
+                return visitIsNull(new Predicate(name, children));
+            case "COALESCE":
+                return visitCoalesce(expression);
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Scalar expression `%s` is not supported.", name));
+        }
+    }
+
+    private static Predicate elemAsPredicate(List<Expression> expressions, int index) {
+        if (expressions.size() <= index) {
+            throw new RuntimeException(String.format("Trying to access invalid entry (%d) in list %s", index,
+                    expressions.stream().map(Object::toString).collect(joining(","))));
+        }
+        Expression elemExpression = expressions.get(index);
+        if (!(elemExpression instanceof Predicate)) {
+            throw new RuntimeException("Expected a predicate, but got " + elemExpression);
+        }
+        return (Predicate) expressions.get(index);
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ImplicitCastExpression.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ImplicitCastExpression.java
new file mode 100644
index 0000000..febd4d3
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ImplicitCastExpression.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static java.lang.String.format;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.expressions.Expression;
+import io.delta.kernel.types.DataType;
+
+/**
+ * An implicit cast expression to convert the input type to another given type. Here is the valid
+ * list of casts
+ * <p>
+ *  <ul>
+ *    <li>{@code byte} to {@code short, int, long, float, double}</li>
+ *    <li>{@code short} to {@code int, long, float, double}</li>
+ *    <li>{@code int} to {@code long, float, double}</li>
+ *    <li>{@code long} to {@code float, double}</li>
+ *    <li>{@code float} to {@code double}</li>
+ *  </ul>
+ *
+ * <p>
+ * The above list is not exhaustive. Based on the need, we can add more casts.
+ * <p>
+ */
+final class ImplicitCastExpression implements Expression {
+    private final Expression input;
+    private final DataType outputType;
+
+    /**
+     * Create a cast around the given input expression to specified output data
+     * type. It is the responsibility of the caller to validate the input expression can be cast
+     * to the new type using {@link #canCastTo(DataType, DataType)}
+     */
+    ImplicitCastExpression(Expression input, DataType outputType) {
+        this.input = requireNonNull(input, "input is null");
+        this.outputType = requireNonNull(outputType, "outputType is null");
+    }
+
+    public Expression getInput() {
+        return input;
+    }
+
+    public DataType getOutputType() {
+        return outputType;
+    }
+
+    @Override
+    public List<Expression> getChildren() {
+        return Collections.singletonList(input);
+    }
+
+    /**
+     * Evaluate the given column expression on the input {@link ColumnVector}.
+     *
+     * @param input {@link ColumnVector} data of the input to the cast expression.
+     * @return {@link ColumnVector} result applying target type casting on every element in the
+     * input {@link ColumnVector}.
+     */
+    ColumnVector eval(ColumnVector input) {
+        String fromTypeStr = input.getDataType().toString();
+        switch (fromTypeStr) {
+            case "byte":
+                return new ByteUpConverter(outputType, input);
+            case "short":
+                return new ShortUpConverter(outputType, input);
+            case "integer":
+                return new IntUpConverter(outputType, input);
+            case "long":
+                return new LongUpConverter(outputType, input);
+            case "float":
+                return new FloatUpConverter(outputType, input);
+            default:
+                throw new UnsupportedOperationException(format("Cast from %s is not supported", fromTypeStr));
+        }
+    }
+
+    /**
+     * Map containing for each type what are the target cast types can be.
+     */
+    private static final Map<String, List<String>> UP_CASTABLE_TYPE_TABLE;
+
+    static {
+        Map<String, List<String>> map = new HashMap<>();
+        map.put("byte", Arrays.asList("short", "integer", "long", "float", "double"));
+        map.put("short", Arrays.asList("integer", "long", "float", "double"));
+        map.put("integer", Arrays.asList("long", "float", "double"));
+        map.put("long", Arrays.asList("float", "double"));
+        map.put("float", Arrays.asList("double"));
+        UP_CASTABLE_TYPE_TABLE = unmodifiableMap(map);
+    }
+
+    /**
+     * Utility method which returns whether the given {@code from} type can be cast to {@code to}
+     * type.
+     */
+    static boolean canCastTo(DataType from, DataType to) {
+        // TODO: The type name should be a first class method on `DataType` instead of getting it
+        // using the `toString`.
+        String fromStr = from.toString();
+        String toStr = to.toString();
+        return UP_CASTABLE_TYPE_TABLE.containsKey(fromStr) && UP_CASTABLE_TYPE_TABLE.get(fromStr).contains(toStr);
+    }
+
+    /**
+     * Base class for up casting {@link ColumnVector} data.
+     */
+    private abstract static class UpConverter implements ColumnVector {
+        protected final DataType targetType;
+        protected final ColumnVector inputVector;
+
+        UpConverter(DataType targetType, ColumnVector inputVector) {
+            this.targetType = targetType;
+            this.inputVector = inputVector;
+        }
+
+        @Override
+        public DataType getDataType() {
+            return targetType;
+        }
+
+        @Override
+        public boolean isNullAt(int rowId) {
+            return inputVector.isNullAt(rowId);
+        }
+
+        @Override
+        public int getSize() {
+            return inputVector.getSize();
+        }
+
+        @Override
+        public void close() {
+            inputVector.close();
+        }
+    }
+
+    private static class ByteUpConverter extends UpConverter {
+        ByteUpConverter(DataType targetType, ColumnVector inputVector) {
+            super(targetType, inputVector);
+        }
+
+        @Override
+        public short getShort(int rowId) {
+            return inputVector.getByte(rowId);
+        }
+
+        @Override
+        public int getInt(int rowId) {
+            return inputVector.getByte(rowId);
+        }
+
+        @Override
+        public long getLong(int rowId) {
+            return inputVector.getByte(rowId);
+        }
+
+        @Override
+        public float getFloat(int rowId) {
+            return inputVector.getByte(rowId);
+        }
+
+        @Override
+        public double getDouble(int rowId) {
+            return inputVector.getByte(rowId);
+        }
+    }
+
+    private static class ShortUpConverter extends UpConverter {
+        ShortUpConverter(DataType targetType, ColumnVector inputVector) {
+            super(targetType, inputVector);
+        }
+
+        @Override
+        public int getInt(int rowId) {
+            return inputVector.getShort(rowId);
+        }
+
+        @Override
+        public long getLong(int rowId) {
+            return inputVector.getShort(rowId);
+        }
+
+        @Override
+        public float getFloat(int rowId) {
+            return inputVector.getShort(rowId);
+        }
+
+        @Override
+        public double getDouble(int rowId) {
+            return inputVector.getShort(rowId);
+        }
+    }
+
+    private static class IntUpConverter extends UpConverter {
+        IntUpConverter(DataType targetType, ColumnVector inputVector) {
+            super(targetType, inputVector);
+        }
+
+        @Override
+        public long getLong(int rowId) {
+            return inputVector.getInt(rowId);
+        }
+
+        @Override
+        public float getFloat(int rowId) {
+            return inputVector.getInt(rowId);
+        }
+
+        @Override
+        public double getDouble(int rowId) {
+            return inputVector.getInt(rowId);
+        }
+    }
+
+    private static class LongUpConverter extends UpConverter {
+        LongUpConverter(DataType targetType, ColumnVector inputVector) {
+            super(targetType, inputVector);
+        }
+
+        @Override
+        public float getFloat(int rowId) {
+            return inputVector.getLong(rowId);
+        }
+
+        @Override
+        public double getDouble(int rowId) {
+            return inputVector.getLong(rowId);
+        }
+    }
+
+    private static class FloatUpConverter extends UpConverter {
+        FloatUpConverter(DataType targetType, ColumnVector inputVector) {
+            super(targetType, inputVector);
+        }
+
+        @Override
+        public double getDouble(int rowId) {
+            return inputVector.getFloat(rowId);
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PartitionValueEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PartitionValueEvaluator.java
new file mode 100644
index 0000000..c24b225
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PartitionValueEvaluator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.internal.util.InternalUtils;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.TimestampNTZType;
+import io.delta.kernel.types.TimestampType;
+
+/**
+ * Utility methods to evaluate {@code partition_value} expression
+ */
+class PartitionValueEvaluator {
+    /**
+     * Evaluate the {@code partition_value} expression for given input column vector and generate
+     * a column vector with decoded values according to the given partition type.
+     */
+    static ColumnVector eval(ColumnVector input, DataType partitionType) {
+        return new ColumnVector() {
+            @Override
+            public DataType getDataType() {
+                return partitionType;
+            }
+
+            @Override
+            public int getSize() {
+                return input.getSize();
+            }
+
+            @Override
+            public void close() {
+                input.close();
+            }
+
+            @Override
+            public boolean isNullAt(int rowId) {
+                return input.isNullAt(rowId);
+            }
+
+            @Override
+            public boolean getBoolean(int rowId) {
+                return Boolean.parseBoolean(input.getString(rowId));
+            }
+
+            @Override
+            public byte getByte(int rowId) {
+                return Byte.parseByte(input.getString(rowId));
+            }
+
+            @Override
+            public short getShort(int rowId) {
+                return Short.parseShort(input.getString(rowId));
+            }
+
+            @Override
+            public int getInt(int rowId) {
+                if (partitionType.equivalent(IntegerType.INTEGER)) {
+                    return Integer.parseInt(input.getString(rowId));
+                } else if (partitionType.equivalent(DateType.DATE)) {
+                    return InternalUtils.daysSinceEpoch(Date.valueOf(input.getString(rowId)));
+                }
+                throw new UnsupportedOperationException("Invalid value request for data type");
+            }
+
+            @Override
+            public long getLong(int rowId) {
+                if (partitionType.equivalent(LongType.LONG)) {
+                    return Long.parseLong(input.getString(rowId));
+                } else if (partitionType.equivalent(TimestampType.TIMESTAMP)
+                        || partitionType.equivalent(TimestampNTZType.TIMESTAMP_NTZ)) {
+                    // Both the timestamp and timestamp_ntz have no timezone info,
+                    // so they are interpreted in local time zone.
+                    try {
+                        Timestamp timestamp = Timestamp.valueOf(input.getString(rowId));
+                        return InternalUtils.microsSinceEpoch(timestamp);
+                    } catch (IllegalArgumentException e) {
+                        Instant instant = Instant.parse(input.getString(rowId));
+                        return ChronoUnit.MICROS.between(Instant.EPOCH, instant);
+                    }
+                }
+                throw new UnsupportedOperationException("Invalid value request for data type");
+            }
+
+            @Override
+            public float getFloat(int rowId) {
+                return Float.parseFloat(input.getString(rowId));
+            }
+
+            @Override
+            public double getDouble(int rowId) {
+                return Double.parseDouble(input.getString(rowId));
+            }
+
+            @Override
+            public byte[] getBinary(int rowId) {
+                return input.isNullAt(rowId) ? null : input.getString(rowId).getBytes();
+            }
+
+            @Override
+            public String getString(int rowId) {
+                return input.getString(rowId);
+            }
+
+            @Override
+            public BigDecimal getDecimal(int rowId) {
+                return input.isNullAt(rowId) ? null : new BigDecimal(input.getString(rowId));
+            }
+        };
+    }
+}