Merge branch 'gerrit/goldfish' into 'master'

Change-Id: Ic8b85f85eb80fcfd046ebfca721de09c0ad5c753
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
index f041fa2..1236636 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java
@@ -196,7 +196,7 @@
             Metadata metaData = txn.metadata().copyBuilder().partitionColumns(new ArrayList<>())
                     .schema(new StructType().add(new StructField("id", new IntegerType(), true))
                             .add(new StructField("name", new StringType(), true))
-                            .add(new StructField("age", new IntegerType(), true)))
+                            .add(new StructField("age", new StringType(), true)))
                     .build();
             txn.updateMetadata(metaData);
             txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create");
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp
index 2e77beb..63a83d1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp
@@ -30,6 +30,6 @@
  (
    %template%,
    ("container"="playground"),
-   ("definition"="my-table-empty"),
+   ("definition"="delta-data/empty_delta_table"),
    ("table-format" = "delta")
  );
\ No newline at end of file
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 8c8ad10..27c3ac1 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -579,6 +579,22 @@
     </dependency>
     <dependency>
       <groupId>io.delta</groupId>
+      <artifactId>delta-kernel-api</artifactId>
+      <version>3.2.1</version>
+    </dependency>
+    <dependency>
+      <groupId>io.delta</groupId>
+      <artifactId>delta-kernel-defaults</artifactId>
+      <version>3.2.1</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client-api</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>io.delta</groupId>
       <artifactId>delta-standalone_2.12</artifactId>
       <version>3.0.0</version>
     </dependency>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index f5a2cd5..9909cc3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -18,35 +18,70 @@
  */
 package org.apache.asterix.external.input.record.reader.aws.delta;
 
-import static org.apache.asterix.external.util.ExternalDataUtils.prepareDeltaTableFormat;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
-import org.apache.asterix.external.input.record.reader.aws.parquet.AwsS3ParquetReaderFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
-public class AwsS3DeltaReaderFactory extends AwsS3ParquetReaderFactory {
+import io.delta.kernel.Scan;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.utils.CloseableIterator;
+
+public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
 
     private static final long serialVersionUID = 1L;
+    private static final List<String> recordReaderNames =
+            Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
+    private static final Logger LOGGER = LogManager.getLogger();
+    private transient AlgebricksAbsolutePartitionConstraint locationConstraints;
+    private Map<Integer, List<String>> schedule;
+    private String scanState;
+    private Map<String, String> configuration;
+    private List<String> scanFiles;
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        return locationConstraints;
+    }
 
     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
             IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
             throws AlgebricksException, HyracksDataException {
-
+        this.configuration = configuration;
         Configuration conf = new Configuration();
         conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
         conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+        if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
+            conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
+        }
         conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME));
         String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
         if (serviceEndpoint != null) {
@@ -56,8 +91,77 @@
                 + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
                 + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
 
-        prepareDeltaTableFormat(configuration, conf, tableMetadataPath);
-        super.configure(serviceCtx, configuration, warningCollector, filterEvaluatorFactory);
+        ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
+
+        Engine engine = DefaultEngine.create(conf);
+        io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
+        Snapshot snapshot = table.getLatestSnapshot(engine);
+        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, snapshot.getSchema(engine)).build();
+        scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
+        CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
+
+        scanFiles = new ArrayList<>();
+        while (iter.hasNext()) {
+            FilteredColumnarBatch batch = iter.next();
+            CloseableIterator<Row> rowIter = batch.getRows();
+            while (rowIter.hasNext()) {
+                Row row = rowIter.next();
+                scanFiles.add(RowSerDe.serializeRowToJson(row));
+            }
+        }
+        locationConstraints = configureLocationConstraints(appCtx);
+        configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
+        distributeFiles();
+    }
+
+    private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx) {
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+
+        String[] locations = csm.getClusterLocations().getLocations();
+        if (scanFiles.size() == 0) {
+            return AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
+        } else if (locations.length > scanFiles.size()) {
+            LOGGER.debug(
+                    "analytics partitions ({}) exceeds total partition count ({}); limiting ingestion partitions to total partition count",
+                    locations.length, scanFiles.size());
+            final String[] locationCopy = locations.clone();
+            ArrayUtils.shuffle(locationCopy);
+            locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
+        }
+        return new AlgebricksAbsolutePartitionConstraint(locations);
+    }
+
+    private void distributeFiles() {
+        final int numComputePartitions = getPartitionConstraint().getLocations().length;
+        schedule = new HashMap<>();
+        for (int i = 0; i < numComputePartitions; i++) {
+            schedule.put(i, new ArrayList<>());
+        }
+        int i = 0;
+        for (String scanFile : scanFiles) {
+            schedule.get(i).add(scanFile);
+            i = (i + 1) % numComputePartitions;
+        }
+    }
+
+    @Override
+    public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException {
+        try {
+            int partition = context.getPartition();
+            return new DeltaFileRecordReader(schedule.get(partition), scanState, configuration, context);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public Class<?> getRecordClass() throws AsterixException {
+        return Row.class;
+    }
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DataTypeJsonSerDe.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DataTypeJsonSerDe.java
new file mode 100644
index 0000000..13c3d0c
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DataTypeJsonSerDe.java
@@ -0,0 +1,511 @@
+/*
+ * Copyright (2023) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static io.delta.kernel.internal.util.Preconditions.checkArgument;
+import static java.lang.String.format;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import io.delta.kernel.exceptions.KernelException;
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.BasePrimitiveType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.FieldMetadata;
+import io.delta.kernel.types.MapType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+
+/**
+ * Serialize and deserialize Delta data types {@link DataType} to JSON and from JSON class based on
+ * the <a
+ * href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types">serialization
+ * rules </a> outlined in the Delta Protocol.
+ */
+public class DataTypeJsonSerDe {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+            .registerModule(new SimpleModule().addSerializer(StructType.class, new StructTypeSerializer()));
+
+    private DataTypeJsonSerDe() {
+    }
+
+    /**
+     * Serializes a {@link DataType} to a JSON string according to the Delta Protocol. TODO: Only
+     * reason why this API added was due to Flink-Kernel dependency. Currently Flink-Kernel uses the
+     * Kernel DataType.toJson and Standalone DataType.fromJson to convert between types.
+     *
+     * @param dataType
+     * @return JSON string representing the data type
+     */
+    public static String serializeDataType(DataType dataType) {
+        try {
+            StringWriter stringWriter = new StringWriter();
+            JsonGenerator generator = OBJECT_MAPPER.createGenerator(stringWriter);
+            writeDataType(generator, dataType);
+            generator.flush();
+            return stringWriter.toString();
+        } catch (IOException ex) {
+            throw new KernelException("Could not serialize DataType to JSON", ex);
+        }
+    }
+
+    /**
+     * Deserializes a JSON string representing a Delta data type to a {@link DataType}.
+     *
+     * @param structTypeJson JSON string representing a {@link StructType} data type
+     */
+    public static StructType deserializeStructType(String structTypeJson) {
+        try {
+            DataType parsedType = parseDataType(OBJECT_MAPPER.reader().readTree(structTypeJson), "" /* fieldPath */,
+                    new FieldMetadata.Builder().build() /* collationsMetadata */);
+            if (parsedType instanceof StructType) {
+                return (StructType) parsedType;
+            } else {
+                throw new IllegalArgumentException(
+                        String.format("Could not parse the following JSON as a valid StructType:\n%s", structTypeJson));
+            }
+        } catch (JsonProcessingException ex) {
+            throw new KernelException(format("Could not parse schema given as JSON string: %s", structTypeJson), ex);
+        }
+    }
+
+    /**
+     * Parses a Delta data type from JSON. Data types can either be serialized as strings (for
+     * primitive types) or as objects (for complex types).
+     *
+     * <p>For example:
+     *
+     * <pre>
+     * // Map type field is serialized as:
+     * {
+     *   "name" : "f",
+     *   "type" : {
+     *     "type" : "map",
+     *     "keyType" : "string",
+     *     "valueType" : "string",
+     *     "valueContainsNull" : true
+     *   },
+     *   "nullable" : true,
+     *   "metadata" : { }
+     * }
+     *
+     * // Integer type field serialized as:
+     * {
+     *   "name" : "a",
+     *   "type" : "integer",
+     *   "nullable" : false,
+     *   "metadata" : { }
+     * }
+     *
+     * // Collated string type field serialized as:
+     * {
+     *   "name" : "s",
+     *   "type" : "string",
+     *   "nullable", false,
+     *   "metadata" : {
+     *     "__COLLATIONS": { "s": "ICU.de_DE" }
+     *   }
+     * }
+     *
+     * // Array with collated strings field serialized as:
+     * {
+     *   "name" : "arr",
+     *   "type" : {
+     *     "type" : "array",
+     *     "elementType" : "string",
+     *     "containsNull" : false
+     *   }
+     *   "nullable" : false,
+     *   "metadata" : {
+     *     "__COLLATIONS": { "arr.element": "ICU.de_DE" }
+     *   }
+     * }
+     * </pre>
+     *
+     * @param fieldPath Path from the nearest ancestor that is of the {@link StructField} type. For
+     *     example, "c1.key.element" represents a path starting from the {@link StructField} named
+     *     "c1." The next element, "key," indicates that "c1" stores a {@link MapType} type. The final
+     *     element, "element", shows that the key of the map is an {@link ArrayType} type.
+     * @param collationsMetadata Metadata that maps the path of a {@link StringType} to its collation.
+     *     Only maps non-UTF8_BINARY collated {@link StringType}. Collation metadata is stored in the
+     *     nearest ancestor, which is the StructField. This is because StructField includes a metadata
+     *     field, whereas Map and Array do not, making them unable to store this information. Paths
+     *     are in same form as `fieldPath`. <a
+     *     href="https://github.com/delta-io/delta/blob/master/protocol_rfcs/collated-string-type.md#collation-identifiers">Docs</a>
+     */
+    static DataType parseDataType(JsonNode json, String fieldPath, FieldMetadata collationsMetadata) {
+        switch (json.getNodeType()) {
+            case STRING:
+                // simple types are stored as just a string
+                return nameToType(json.textValue(), fieldPath, collationsMetadata);
+            case OBJECT:
+                // complex types (array, map, or struct are stored as JSON objects)
+                String type = getStringField(json, "type");
+                switch (type) {
+                    case "struct":
+                        assertValidTypeForCollations(fieldPath, "struct", collationsMetadata);
+                        return parseStructType(json);
+                    case "array":
+                        assertValidTypeForCollations(fieldPath, "array", collationsMetadata);
+                        return parseArrayType(json, fieldPath, collationsMetadata);
+                    case "map":
+                        assertValidTypeForCollations(fieldPath, "map", collationsMetadata);
+                        return parseMapType(json, fieldPath, collationsMetadata);
+                    // No default case here; fall through to the following error when no match
+                }
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Could not parse the following JSON as a valid Delta data type:\n%s", json));
+        }
+    }
+
+    /**
+     * Parses an <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#array-type">array
+     * type </a>
+     */
+    private static ArrayType parseArrayType(JsonNode json, String fieldPath, FieldMetadata collationsMetadata) {
+        checkArgument(json.isObject() && json.size() == 3,
+                "Expected JSON object with 3 fields for array data type but got:\n%s", json);
+        boolean containsNull = getBooleanField(json, "containsNull");
+        DataType dataType =
+                parseDataType(getNonNullField(json, "elementType"), fieldPath + ".element", collationsMetadata);
+        return new ArrayType(dataType, containsNull);
+    }
+
+    /**
+     * Parses an <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#map-type">map type
+     * </a>
+     */
+    private static MapType parseMapType(JsonNode json, String fieldPath, FieldMetadata collationsMetadata) {
+        checkArgument(json.isObject() && json.size() == 4,
+                "Expected JSON object with 4 fields for map data type but got:\n%s", json);
+        boolean valueContainsNull = getBooleanField(json, "valueContainsNull");
+        DataType keyType = parseDataType(getNonNullField(json, "keyType"), fieldPath + ".key", collationsMetadata);
+        DataType valueType =
+                parseDataType(getNonNullField(json, "valueType"), fieldPath + ".value", collationsMetadata);
+        return new MapType(keyType, valueType, valueContainsNull);
+    }
+
+    /**
+     * Parses an <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#struct-type">
+     * struct type </a>
+     */
+    private static StructType parseStructType(JsonNode json) {
+        checkArgument(json.isObject() && json.size() == 2,
+                "Expected JSON object with 2 fields for struct data type but got:\n%s", json);
+        JsonNode fieldsNode = getNonNullField(json, "fields");
+        checkArgument(fieldsNode.isArray(), "Expected array for fieldName=%s in:\n%s", "fields", json);
+        Iterator<JsonNode> fields = fieldsNode.elements();
+        List<StructField> parsedFields = new ArrayList<>();
+        while (fields.hasNext()) {
+            parsedFields.add(parseStructField(fields.next()));
+        }
+        return new StructType(parsedFields);
+    }
+
+    /**
+     * Parses an <a href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#struct-field">
+     * struct field </a>
+     */
+    private static StructField parseStructField(JsonNode json) {
+        checkArgument(json.isObject(), "Expected JSON object for struct field");
+        String name = getStringField(json, "name");
+        FieldMetadata metadata = parseFieldMetadata(json.get("metadata"), false);
+        DataType type = parseDataType(getNonNullField(json, "type"), name, null);
+        boolean nullable = getBooleanField(json, "nullable");
+        return new StructField(name, type, nullable, metadata);
+    }
+
+    /** Parses an {@link FieldMetadata}. */
+    private static FieldMetadata parseFieldMetadata(JsonNode json) {
+        return parseFieldMetadata(json, true);
+    }
+
+    /**
+     * Parses a {@link FieldMetadata}, optionally including collation metadata, depending on
+     * `includecollationsMetadata`.
+     */
+    private static FieldMetadata parseFieldMetadata(JsonNode json, boolean includecollationsMetadata) {
+        if (json == null || json.isNull()) {
+            return FieldMetadata.empty();
+        }
+
+        checkArgument(json.isObject(), "Expected JSON object for struct field metadata");
+        final Iterator<Map.Entry<String, JsonNode>> iterator = json.fields();
+        final FieldMetadata.Builder builder = FieldMetadata.builder();
+        while (iterator.hasNext()) {
+            Map.Entry<String, JsonNode> entry = iterator.next();
+            JsonNode value = entry.getValue();
+            String key = entry.getKey();
+
+            if (value.isNull()) {
+                builder.putNull(key);
+            } else if (value.isIntegralNumber()) { // covers both int and long
+                builder.putLong(key, value.longValue());
+            } else if (value.isDouble()) {
+                builder.putDouble(key, value.doubleValue());
+            } else if (value.isBoolean()) {
+                builder.putBoolean(key, value.booleanValue());
+            } else if (value.isTextual()) {
+                builder.putString(key, value.textValue());
+            } else if (value.isObject()) {
+                builder.putFieldMetadata(key, parseFieldMetadata(value));
+            } else if (value.isArray()) {
+                final Iterator<JsonNode> fields = value.elements();
+                if (!fields.hasNext()) {
+                    // If it is an empty array, we cannot infer its element type.
+                    // We put an empty Array[Long].
+                    builder.putLongArray(key, new Long[0]);
+                } else {
+                    final JsonNode head = fields.next();
+                    if (head.isInt()) {
+                        builder.putLongArray(key,
+                                buildList(value, node -> (long) node.intValue()).toArray(new Long[0]));
+                    } else if (head.isDouble()) {
+                        builder.putDoubleArray(key, buildList(value, JsonNode::doubleValue).toArray(new Double[0]));
+                    } else if (head.isBoolean()) {
+                        builder.putBooleanArray(key, buildList(value, JsonNode::booleanValue).toArray(new Boolean[0]));
+                    } else if (head.isTextual()) {
+                        builder.putStringArray(key, buildList(value, JsonNode::textValue).toArray(new String[0]));
+                    } else if (head.isObject()) {
+                        builder.putFieldMetadataArray(key,
+                                buildList(value, DataTypeJsonSerDe::parseFieldMetadata).toArray(new FieldMetadata[0]));
+                    } else {
+                        throw new IllegalArgumentException(
+                                String.format("Unsupported type for Array as field metadata value: %s", value));
+                    }
+                }
+            } else {
+                throw new IllegalArgumentException(
+                        String.format("Unsupported type for field metadata value: %s", value));
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * For an array JSON node builds a {@link List} using the provided {@code accessor} for each
+     * element.
+     */
+    private static <T> List<T> buildList(JsonNode json, Function<JsonNode, T> accessor) {
+        List<T> result = new ArrayList<>();
+        Iterator<JsonNode> elements = json.elements();
+        while (elements.hasNext()) {
+            result.add(accessor.apply(elements.next()));
+        }
+        return result;
+    }
+
+    private static String FIXED_DECIMAL_REGEX = "decimal\\(\\s*(\\d+)\\s*,\\s*(\\-?\\d+)\\s*\\)";
+    private static Pattern FIXED_DECIMAL_PATTERN = Pattern.compile(FIXED_DECIMAL_REGEX);
+
+    /** Parses primitive string type names to a {@link DataType} */
+    private static DataType nameToType(String name, String fieldPath, FieldMetadata collationsMetadata) {
+        if (BasePrimitiveType.isPrimitiveType(name)) {
+            return BasePrimitiveType.createPrimitive(name);
+        } else if (name.equals("decimal")) {
+            return DecimalType.USER_DEFAULT;
+        } else if ("void".equalsIgnoreCase(name)) {
+            // Earlier versions of Delta had VOID type which is not specified in Delta Protocol.
+            // It is not readable or writable. Throw a user-friendly error message.
+            throw new IllegalArgumentException(String.format("%s is not a supported delta data type", name));
+        } else {
+            // decimal has a special pattern with a precision and scale
+            Matcher decimalMatcher = FIXED_DECIMAL_PATTERN.matcher(name);
+            if (decimalMatcher.matches()) {
+                int precision = Integer.parseInt(decimalMatcher.group(1));
+                int scale = Integer.parseInt(decimalMatcher.group(2));
+                return new DecimalType(precision, scale);
+            }
+
+            // We have encountered a type that is beyond the specification of the protocol
+            // checks. This must be an invalid type (according to protocol) and
+            // not an unsupported data type by Kernel.
+            throw new IllegalArgumentException(String.format("%s is not a supported delta data type", name));
+        }
+    }
+
+    private static JsonNode getNonNullField(JsonNode rootNode, String fieldName) {
+        JsonNode node = rootNode.get(fieldName);
+        if (node == null || node.isNull()) {
+            throw new IllegalArgumentException(
+                    String.format("Expected non-null for fieldName=%s in:\n%s", fieldName, rootNode));
+        }
+        return node;
+    }
+
+    private static String getStringField(JsonNode rootNode, String fieldName) {
+        JsonNode node = getNonNullField(rootNode, fieldName);
+        checkArgument(node.isTextual(), "Expected string for fieldName=%s in:\n%s", fieldName, rootNode);
+        return node.textValue(); // double check this only works for string values! and isTextual()!
+    }
+
+    private static void assertValidTypeForCollations(String fieldPath, String fieldType,
+            FieldMetadata collationsMetadata) {
+
+    }
+
+    /** Returns a metadata with a map of field path to collation name. */
+
+    private static boolean getBooleanField(JsonNode rootNode, String fieldName) {
+        JsonNode node = getNonNullField(rootNode, fieldName);
+        checkArgument(node.isBoolean(), "Expected boolean for fieldName=%s in:\n%s", fieldName, rootNode);
+        return node.booleanValue();
+    }
+
+    protected static class StructTypeSerializer extends StdSerializer<StructType> {
+        private static final long serialVersionUID = 1L;
+
+        public StructTypeSerializer() {
+            super(StructType.class);
+        }
+
+        @Override
+        public void serialize(StructType structType, JsonGenerator gen, SerializerProvider provider)
+                throws IOException {
+            writeDataType(gen, structType);
+        }
+    }
+
+    private static void writeDataType(JsonGenerator gen, DataType dataType) throws IOException {
+        if (dataType instanceof StructType) {
+            writeStructType(gen, (StructType) dataType);
+        } else if (dataType instanceof ArrayType) {
+            writeArrayType(gen, (ArrayType) dataType);
+        } else if (dataType instanceof MapType) {
+            writeMapType(gen, (MapType) dataType);
+        } else if (dataType instanceof DecimalType) {
+            DecimalType decimalType = (DecimalType) dataType;
+            gen.writeString(format("decimal(%d,%d)", decimalType.getPrecision(), decimalType.getScale()));
+        } else {
+            gen.writeString(dataType.toString());
+        }
+    }
+
+    private static void writeArrayType(JsonGenerator gen, ArrayType arrayType) throws IOException {
+        gen.writeStartObject();
+        gen.writeStringField("type", "array");
+        gen.writeFieldName("elementType");
+        writeDataType(gen, arrayType.getElementType());
+        gen.writeBooleanField("containsNull", arrayType.containsNull());
+        gen.writeEndObject();
+    }
+
+    private static void writeMapType(JsonGenerator gen, MapType mapType) throws IOException {
+        gen.writeStartObject();
+        gen.writeStringField("type", "map");
+        gen.writeFieldName("keyType");
+        writeDataType(gen, mapType.getKeyType());
+        gen.writeFieldName("valueType");
+        writeDataType(gen, mapType.getValueType());
+        gen.writeBooleanField("valueContainsNull", mapType.isValueContainsNull());
+        gen.writeEndObject();
+    }
+
+    private static void writeStructType(JsonGenerator gen, StructType structType) throws IOException {
+        gen.writeStartObject();
+        gen.writeStringField("type", "struct");
+        gen.writeArrayFieldStart("fields");
+        for (StructField field : structType.fields()) {
+            writeStructField(gen, field);
+        }
+        gen.writeEndArray();
+        gen.writeEndObject();
+    }
+
+    private static void writeStructField(JsonGenerator gen, StructField field) throws IOException {
+        gen.writeStartObject();
+        gen.writeStringField("name", field.getName());
+        gen.writeFieldName("type");
+        writeDataType(gen, field.getDataType());
+        gen.writeBooleanField("nullable", field.isNullable());
+        gen.writeFieldName("metadata");
+        writeFieldMetadata(gen, field.getMetadata());
+        gen.writeEndObject();
+    }
+
+    private static void writeFieldMetadata(JsonGenerator gen, FieldMetadata metadata) throws IOException {
+        gen.writeStartObject();
+        for (Map.Entry<String, Object> entry : metadata.getEntries().entrySet()) {
+            gen.writeFieldName(entry.getKey());
+            Object value = entry.getValue();
+            if (value instanceof Long) {
+                gen.writeNumber((Long) value);
+            } else if (value instanceof Double) {
+                gen.writeNumber((Double) value);
+            } else if (value instanceof Boolean) {
+                gen.writeBoolean((Boolean) value);
+            } else if (value instanceof String) {
+                gen.writeString((String) value);
+            } else if (value instanceof FieldMetadata) {
+                writeFieldMetadata(gen, (FieldMetadata) value);
+            } else if (value instanceof Long[]) {
+                gen.writeStartArray();
+                for (Long v : (Long[]) value) {
+                    gen.writeNumber(v);
+                }
+                gen.writeEndArray();
+            } else if (value instanceof Double[]) {
+                gen.writeStartArray();
+                for (Double v : (Double[]) value) {
+                    gen.writeNumber(v);
+                }
+                gen.writeEndArray();
+            } else if (value instanceof Boolean[]) {
+                gen.writeStartArray();
+                for (Boolean v : (Boolean[]) value) {
+                    gen.writeBoolean(v);
+                }
+                gen.writeEndArray();
+            } else if (value instanceof String[]) {
+                gen.writeStartArray();
+                for (String v : (String[]) value) {
+                    gen.writeString(v);
+                }
+                gen.writeEndArray();
+            } else if (value instanceof FieldMetadata[]) {
+                gen.writeStartArray();
+                for (FieldMetadata v : (FieldMetadata[]) value) {
+                    writeFieldMetadata(gen, v);
+                }
+                gen.writeEndArray();
+            } else if (value == null) {
+                gen.writeNull();
+            } else {
+                throw new IllegalArgumentException(format("Unsupported type for field metadata value: %s", value));
+            }
+        }
+        gen.writeEndObject();
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
new file mode 100644
index 0000000..558f8a9
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.IFeedLogManager;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+
+import io.delta.kernel.Scan;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.data.ScanStateRow;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
+
+/**
+ * Delta record reader.
+ * The reader returns records in Delta Kernel Row format.
+ */
+public class DeltaFileRecordReader implements IRecordReader<Row> {
+
+    private Engine engine;
+    private List<Row> scanFiles;
+    private Row scanState;
+    protected IRawRecord<Row> record;
+    protected VoidPointable value = null;
+    private FileStatus fileStatus;
+    private StructType physicalReadSchema;
+    private CloseableIterator<ColumnarBatch> physicalDataIter;
+    private CloseableIterator<FilteredColumnarBatch> dataIter;
+    private int fileIndex;
+    private Row scanFile;
+    private CloseableIterator<Row> rows;
+
+    public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, Map<String, String> conf,
+            IExternalDataRuntimeContext context) {
+        Configuration config = new Configuration();
+        config.set(S3Constants.HADOOP_ACCESS_KEY_ID, conf.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
+        config.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, conf.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+        if (conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
+            config.set(S3Constants.HADOOP_SESSION_TOKEN, conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
+        }
+        config.set(S3Constants.HADOOP_REGION, conf.get(S3Constants.REGION_FIELD_NAME));
+        String serviceEndpoint = conf.get(SERVICE_END_POINT_FIELD_NAME);
+        if (serviceEndpoint != null) {
+            config.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
+        }
+        this.engine = DefaultEngine.create(config);
+        this.scanFiles = new ArrayList<>();
+        for (String scanFile : serScanFiles) {
+            this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile));
+        }
+        this.scanState = RowSerDe.deserializeRowFromJson(serScanState);
+        this.fileStatus = null;
+        this.physicalReadSchema = null;
+        this.physicalDataIter = null;
+        this.dataIter = null;
+        this.record = new GenericRecord<>();
+        if (scanFiles.size() > 0) {
+            this.fileIndex = 0;
+            this.scanFile = scanFiles.get(0);
+            this.fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
+            this.physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
+            try {
+                this.physicalDataIter = engine.getParquetHandler()
+                        .readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, Optional.empty());
+                this.dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+                if (dataIter.hasNext()) {
+                    rows = dataIter.next().getRows();
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (dataIter != null) {
+            dataIter.close();
+        }
+        if (physicalDataIter != null) {
+            physicalDataIter.close();
+        }
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        if (rows != null && rows.hasNext()) {
+            return true;
+        } else if (dataIter != null && dataIter.hasNext()) {
+            rows = dataIter.next().getRows();
+            return this.hasNext();
+        } else if (fileIndex < scanFiles.size() - 1) {
+            fileIndex++;
+            scanFile = scanFiles.get(fileIndex);
+            fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
+            physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
+            physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
+                    physicalReadSchema, Optional.empty());
+            dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+            return this.hasNext();
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public IRawRecord<Row> next() throws IOException, InterruptedException {
+        Row row = rows.next();
+        record.set(row);
+        return record;
+    }
+
+    @Override
+    public boolean stop() {
+        return false;
+    }
+
+    @Override
+    public void setController(AbstractFeedDataFlowController controller) {
+
+    }
+
+    @Override
+    public void setFeedLogManager(IFeedLogManager feedLogManager) throws HyracksDataException {
+
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/RowSerDe.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/RowSerDe.java
new file mode 100644
index 0000000..d91cd9a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/RowSerDe.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright (2023) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.internal.data.DefaultJsonRow;
+import io.delta.kernel.internal.util.VectorUtils;
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.MapType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+
+/**
+ * Utility class to serialize and deserialize {@link Row} object.
+ */
+public class RowSerDe {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private RowSerDe() {
+    }
+
+    /**
+     * Utility method to serialize a {@link Row} as a JSON string
+     */
+    public static String serializeRowToJson(Row row) {
+        Map<String, Object> rowObject = convertRowToJsonObject(row);
+        try {
+            Map<String, Object> rowWithSchema = new HashMap<>();
+            rowWithSchema.put("schema", row.getSchema().toJson());
+            rowWithSchema.put("row", rowObject);
+            return OBJECT_MAPPER.writeValueAsString(rowWithSchema);
+        } catch (JsonProcessingException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    /**
+     * Utility method to deserialize a {@link Row} object from the JSON form.
+     */
+    public static Row deserializeRowFromJson(String jsonRowWithSchema) {
+        try {
+            JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema);
+            JsonNode schemaNode = jsonNode.get("schema");
+            StructType schema = DataTypeJsonSerDe.deserializeStructType(schemaNode.asText());
+            return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), schema);
+        } catch (JsonProcessingException ex) {
+            throw new UncheckedIOException(ex);
+        }
+    }
+
+    private static Map<String, Object> convertRowToJsonObject(Row row) {
+        StructType rowType = row.getSchema();
+        Map<String, Object> rowObject = new HashMap<>();
+        for (int fieldId = 0; fieldId < rowType.length(); fieldId++) {
+            StructField field = rowType.at(fieldId);
+            DataType fieldType = field.getDataType();
+            String name = field.getName();
+
+            if (row.isNullAt(fieldId)) {
+                rowObject.put(name, null);
+                continue;
+            }
+
+            Object value;
+            if (fieldType instanceof BooleanType) {
+                value = row.getBoolean(fieldId);
+            } else if (fieldType instanceof ByteType) {
+                value = row.getByte(fieldId);
+            } else if (fieldType instanceof ShortType) {
+                value = row.getShort(fieldId);
+            } else if (fieldType instanceof IntegerType) {
+                value = row.getInt(fieldId);
+            } else if (fieldType instanceof LongType) {
+                value = row.getLong(fieldId);
+            } else if (fieldType instanceof FloatType) {
+                value = row.getFloat(fieldId);
+            } else if (fieldType instanceof DoubleType) {
+                value = row.getDouble(fieldId);
+            } else if (fieldType instanceof DateType) {
+                value = row.getInt(fieldId);
+            } else if (fieldType instanceof TimestampType) {
+                value = row.getLong(fieldId);
+            } else if (fieldType instanceof StringType) {
+                value = row.getString(fieldId);
+            } else if (fieldType instanceof ArrayType) {
+                value = VectorUtils.toJavaList(row.getArray(fieldId));
+            } else if (fieldType instanceof MapType) {
+                value = VectorUtils.toJavaMap(row.getMap(fieldId));
+            } else if (fieldType instanceof StructType) {
+                Row subRow = row.getStruct(fieldId);
+                value = convertRowToJsonObject(subRow);
+            } else {
+                throw new UnsupportedOperationException("NYI");
+            }
+
+            rowObject.put(name, value);
+        }
+
+        return rowObject;
+    }
+
+    private static Row parseRowFromJsonWithSchema(ObjectNode rowJsonNode, StructType rowType) {
+        return new DefaultJsonRow(rowJsonNode, rowType);
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
new file mode 100644
index 0000000..81e465c
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta.converter;
+
+import java.io.DataOutput;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.AMutableDate;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class DeltaConverterContext extends ParserContext {
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<ADate> dateSerDer =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATE);
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<ADateTime> datetimeSerDer =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
+    private final boolean decimalToDouble;
+    private final boolean timestampAsLong;
+    private final boolean dateAsInt;
+
+    private final int timeZoneOffset;
+    private final AMutableDate mutableDate = new AMutableDate(0);
+    private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+
+    public DeltaConverterContext(Map<String, String> configuration) {
+        decimalToDouble = Boolean.parseBoolean(configuration
+                .getOrDefault(ExternalDataConstants.DeltaOptions.DECIMAL_TO_DOUBLE, ExternalDataConstants.FALSE));
+        timestampAsLong = Boolean.parseBoolean(configuration
+                .getOrDefault(ExternalDataConstants.DeltaOptions.TIMESTAMP_AS_LONG, ExternalDataConstants.TRUE));
+        dateAsInt = Boolean.parseBoolean(
+                configuration.getOrDefault(ExternalDataConstants.DeltaOptions.DATE_AS_INT, ExternalDataConstants.TRUE));
+        String configuredTimeZoneId = configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE);
+        if (configuredTimeZoneId != null && !configuredTimeZoneId.isEmpty()) {
+            timeZoneOffset = TimeZone.getTimeZone(configuredTimeZoneId).getRawOffset();
+        } else {
+            timeZoneOffset = 0;
+        }
+    }
+
+    public void serializeDate(int value, DataOutput output) {
+        try {
+            mutableDate.setValue(value);
+            dateSerDer.serialize(mutableDate, output);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void serializeDateTime(long timestamp, DataOutput output) {
+        try {
+            mutableDateTime.setValue(timestamp);
+            datetimeSerDer.serialize(mutableDateTime, output);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public boolean isDecimalToDoubleEnabled() {
+        return decimalToDouble;
+    }
+
+    public int getTimeZoneOffset() {
+        return timeZoneOffset;
+    }
+
+    public boolean isTimestampAsLong() {
+        return timestampAsLong;
+    }
+
+    public boolean isDateAsInt() {
+        return dateAsInt;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
new file mode 100644
index 0000000..ea02d77
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ANull;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+import io.delta.kernel.data.ArrayValue;
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampNTZType;
+import io.delta.kernel.types.TimestampType;
+
+public class DeltaDataParser extends AbstractDataParser implements IRecordDataParser<Row> {
+    private final DeltaConverterContext parserContext;
+    private final IExternalFilterValueEmbedder valueEmbedder;
+
+    public DeltaDataParser(IExternalDataRuntimeContext context, Map<String, String> conf) {
+        parserContext = new DeltaConverterContext(conf);
+        valueEmbedder = context.getValueEmbedder();
+    }
+
+    @Override
+    public boolean parse(IRawRecord<? extends Row> record, DataOutput out) throws HyracksDataException {
+        try {
+            parseObject(record.get(), out);
+            valueEmbedder.reset();
+            return true;
+        } catch (AvroRuntimeException | IOException e) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
+        }
+    }
+
+    private void parseObject(Row record, DataOutput out) throws IOException {
+        IMutableValueStorage valueBuffer = parserContext.enterObject();
+        IARecordBuilder objectBuilder = parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+        StructType schema = record.getSchema();
+        valueEmbedder.enterObject();
+        for (int i = 0; i < schema.fields().size(); i++) {
+            DataType fieldSchema = schema.fields().get(i).getDataType();
+            String fieldName = schema.fieldNames().get(i);
+            ATypeTag typeTag = getTypeTag(fieldSchema, record.isNullAt(i));
+            IValueReference value = null;
+            if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
+                value = valueEmbedder.getEmbeddedValue();
+            } else {
+                valueBuffer.reset();
+                parseValue(fieldSchema, record, i, valueBuffer.getDataOutput());
+                value = valueBuffer;
+            }
+
+            if (value != null) {
+                // Ignore missing values
+                objectBuilder.addField(parserContext.getSerializedFieldName(fieldName), value);
+            }
+        }
+
+        embedMissingValues(objectBuilder, parserContext, valueEmbedder);
+        objectBuilder.write(out, true);
+        valueEmbedder.exitObject();
+        parserContext.exitObject(valueBuffer, null, objectBuilder);
+    }
+
+    private void parseObject(StructType schema, ColumnVector column, int index, DataOutput out) throws IOException {
+        IMutableValueStorage valueBuffer = parserContext.enterObject();
+        IARecordBuilder objectBuilder = parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+        valueEmbedder.enterObject();
+        for (int i = 0; i < schema.fields().size(); i++) {
+            DataType fieldSchema = schema.fields().get(i).getDataType();
+            String fieldName = schema.fieldNames().get(i);
+            ATypeTag typeTag = getTypeTag(fieldSchema, column.getChild(i).isNullAt(index));
+            IValueReference value = null;
+            if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
+                value = valueEmbedder.getEmbeddedValue();
+            } else {
+                valueBuffer.reset();
+                parseValue(fieldSchema, column.getChild(i), index, valueBuffer.getDataOutput());
+                value = valueBuffer;
+            }
+
+            if (value != null) {
+                // Ignore missing values
+                objectBuilder.addField(parserContext.getSerializedFieldName(fieldName), value);
+            }
+        }
+
+        embedMissingValues(objectBuilder, parserContext, valueEmbedder);
+        objectBuilder.write(out, true);
+        valueEmbedder.exitObject();
+        parserContext.exitObject(valueBuffer, null, objectBuilder);
+    }
+
+    private void parseArray(ArrayType arraySchema, ArrayValue arrayValue, DataOutput out) throws IOException {
+        DataType elementSchema = arraySchema.getElementType();
+        ColumnVector elements = arrayValue.getElements();
+        final IMutableValueStorage valueBuffer = parserContext.enterCollection();
+        final IAsterixListBuilder arrayBuilder =
+                parserContext.getCollectionBuilder(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE);
+        for (int i = 0; i < elements.getSize(); i++) {
+            valueBuffer.reset();
+            parseValue(elementSchema, elements, i, valueBuffer.getDataOutput());
+            arrayBuilder.addItem(valueBuffer);
+        }
+        arrayBuilder.write(out, true);
+        parserContext.exitCollection(valueBuffer, arrayBuilder);
+    }
+
+    private ATypeTag getTypeTag(DataType schema, boolean isNull) throws HyracksDataException {
+        if (isNull) {
+            return ATypeTag.NULL;
+        }
+        if (schema instanceof BooleanType) {
+            return ATypeTag.BOOLEAN;
+        } else if (schema instanceof ShortType || schema instanceof IntegerType || schema instanceof LongType) {
+            return ATypeTag.BIGINT;
+        } else if (schema instanceof DoubleType) {
+            return ATypeTag.DOUBLE;
+        } else if (schema instanceof StringType) {
+            return ATypeTag.STRING;
+        } else if (schema instanceof DateType) {
+            if (parserContext.isDateAsInt()) {
+                return ATypeTag.INTEGER;
+            }
+            return ATypeTag.DATE;
+        } else if (schema instanceof TimestampType || schema instanceof TimestampNTZType) {
+            if (parserContext.isTimestampAsLong()) {
+                return ATypeTag.BIGINT;
+            }
+            return ATypeTag.DATETIME;
+        } else if (schema instanceof BinaryType) {
+            return ATypeTag.BINARY;
+        } else if (schema instanceof ArrayType) {
+            return ATypeTag.ARRAY;
+        } else if (schema instanceof StructType) {
+            return ATypeTag.OBJECT;
+        } else if (schema instanceof DecimalType) {
+            ensureDecimalToDoubleEnabled(schema, parserContext);
+            return ATypeTag.DOUBLE;
+        } else {
+            throw createUnsupportedException(schema);
+        }
+    }
+
+    private void parseValue(DataType schema, Row row, int index, DataOutput out) throws IOException {
+        if (row.isNullAt(index)) {
+            nullSerde.serialize(ANull.NULL, out);
+        } else if (schema instanceof BooleanType) {
+            if (row.getBoolean(index)) {
+                booleanSerde.serialize(ABoolean.TRUE, out);
+            } else {
+                booleanSerde.serialize(ABoolean.FALSE, out);
+            }
+        } else if (schema instanceof ShortType) {
+            serializeLong(row.getShort(index), out);
+        } else if (schema instanceof IntegerType) {
+            serializeLong(row.getInt(index), out);
+        } else if (schema instanceof LongType) {
+            serializeLong(row.getLong(index), out);
+        } else if (schema instanceof DoubleType) {
+            serializeDouble(row.getDouble(index), out);
+        } else if (schema instanceof StringType) {
+            serializeString(row.getString(index), out);
+        } else if (schema instanceof DateType) {
+            if (parserContext.isDateAsInt()) {
+                serializeLong(row.getInt(index), out);
+            } else {
+                parserContext.serializeDate(row.getInt(index), out);
+            }
+        } else if (schema instanceof TimestampType) {
+            long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(row.getLong(index));
+            int offset = parserContext.getTimeZoneOffset();
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStampInMillis + offset, out);
+            } else {
+                parserContext.serializeDateTime(timeStampInMillis + offset, out);
+            }
+        } else if (schema instanceof TimestampNTZType) {
+            long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(row.getLong(index));
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStampInMillis, out);
+            } else {
+                parserContext.serializeDateTime(timeStampInMillis, out);
+            }
+        } else if (schema instanceof StructType) {
+            parseObject(row.getStruct(index), out);
+        } else if (schema instanceof ArrayType) {
+            parseArray((ArrayType) schema, row.getArray(index), out);
+        } else if (schema instanceof DecimalType) {
+            serializeDecimal(row.getDecimal(index), out);
+        } else {
+            throw createUnsupportedException(schema);
+        }
+    }
+
+    private void parseValue(DataType schema, ColumnVector column, int index, DataOutput out) throws IOException {
+        if (column.isNullAt(index)) {
+            nullSerde.serialize(ANull.NULL, out);
+        } else if (schema instanceof BooleanType) {
+            if (column.getBoolean(index)) {
+                booleanSerde.serialize(ABoolean.TRUE, out);
+            } else {
+                booleanSerde.serialize(ABoolean.FALSE, out);
+            }
+        } else if (schema instanceof ShortType) {
+            serializeLong(column.getShort(index), out);
+        } else if (schema instanceof IntegerType) {
+            serializeLong(column.getInt(index), out);
+        } else if (schema instanceof LongType) {
+            serializeLong(column.getLong(index), out);
+        } else if (schema instanceof DoubleType) {
+            serializeDouble(column.getDouble(index), out);
+        } else if (schema instanceof StringType) {
+            serializeString(column.getString(index), out);
+        } else if (schema instanceof DateType) {
+            if (parserContext.isDateAsInt()) {
+                serializeLong(column.getInt(index), out);
+            } else {
+                parserContext.serializeDate(column.getInt(index), out);
+            }
+        } else if (schema instanceof TimestampType) {
+            long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(column.getLong(index));
+            int offset = parserContext.getTimeZoneOffset();
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStampInMillis + offset, out);
+            } else {
+                parserContext.serializeDateTime(timeStampInMillis + offset, out);
+            }
+        } else if (schema instanceof TimestampNTZType) {
+            long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(column.getLong(index));
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStampInMillis, out);
+            } else {
+                parserContext.serializeDateTime(timeStampInMillis, out);
+            }
+        } else if (schema instanceof ArrayType) {
+            parseArray((ArrayType) schema, column.getArray(index), out);
+        } else if (schema instanceof StructType) {
+            parseObject((StructType) schema, column, index, out);
+        } else if (schema instanceof DecimalType) {
+            serializeDecimal(column.getDecimal(index), out);
+        } else {
+            throw createUnsupportedException(schema);
+        }
+    }
+
+    private void serializeLong(Object value, DataOutput out) throws HyracksDataException {
+        long intValue = ((Number) value).longValue();
+        aInt64.setValue(intValue);
+        int64Serde.serialize(aInt64, out);
+    }
+
+    private void serializeDouble(Object value, DataOutput out) throws HyracksDataException {
+        double doubleValue = ((Number) value).doubleValue();
+        aDouble.setValue(doubleValue);
+        doubleSerde.serialize(aDouble, out);
+    }
+
+    private void serializeString(Object value, DataOutput out) throws HyracksDataException {
+        aString.setValue(value.toString());
+        stringSerde.serialize(aString, out);
+    }
+
+    private void serializeDecimal(BigDecimal value, DataOutput out) throws HyracksDataException {
+        serializeDouble(value.doubleValue(), out);
+    }
+
+    private static HyracksDataException createUnsupportedException(DataType schema) {
+        return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Delta Parser", schema.toString());
+    }
+
+    private static void ensureDecimalToDoubleEnabled(DataType type, DeltaConverterContext context)
+            throws RuntimeDataException {
+        if (!context.isDecimalToDoubleEnabled()) {
+            throw new RuntimeDataException(ErrorCode.PARQUET_SUPPORTED_TYPE_WITH_OPTION, type.toString(),
+                    ExternalDataConstants.ParquetOptions.DECIMAL_TO_DOUBLE);
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
new file mode 100644
index 0000000..7f28105
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.util.List;
+
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.parser.DeltaDataParser;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+
+import io.delta.kernel.data.Row;
+
+public class DeltaTableDataParserFactory extends AbstractGenericDataParserFactory<Row> {
+
+    private static final long serialVersionUID = 1L;
+    private static final List<String> PARSER_FORMAT = List.of(ExternalDataConstants.FORMAT_DELTA);
+
+    @Override
+    public IStreamDataParser createInputStreamParser(IExternalDataRuntimeContext context) {
+        throw new UnsupportedOperationException("Stream parser is not supported");
+    }
+
+    @Override
+    public void setMetaType(ARecordType metaType) {
+        // no MetaType to set.
+    }
+
+    @Override
+    public List<String> getParserFormats() {
+        return PARSER_FORMAT;
+    }
+
+    @Override
+    public IRecordDataParser<Row> createRecordParser(IExternalDataRuntimeContext context) {
+        return createParser(context);
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return Row.class;
+    }
+
+    private DeltaDataParser createParser(IExternalDataRuntimeContext context) {
+        return new DeltaDataParser(context, configuration);
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
index 97b04a4..5f0c018 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/StreamRecordReaderProvider.java
@@ -32,6 +32,7 @@
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.input.record.reader.aws.delta.DeltaFileRecordReader;
 import org.apache.asterix.external.input.record.reader.stream.AvroRecordReader;
 import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -111,8 +112,9 @@
         if (format != null) {
             if (format.equalsIgnoreCase(ExternalDataConstants.FORMAT_AVRO)) {
                 return AvroRecordReader.class;
-            }
-            if (recordReaders.containsKey(format)) {
+            } else if (format.equalsIgnoreCase(ExternalDataConstants.FORMAT_DELTA)) {
+                return DeltaFileRecordReader.class;
+            } else if (recordReaders.containsKey(format)) {
                 return findRecordReaderClazzWithConfig(configuration, format);
             }
             throw new AsterixException(ErrorCode.PROVIDER_STREAM_RECORD_READER_UNKNOWN_FORMAT, format);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 2bc65f2..202e131 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -20,7 +20,6 @@
 
 import java.util.List;
 import java.util.Set;
-import java.util.TimeZone;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
@@ -390,6 +389,16 @@
         WRITER_SUPPORTED_QUOTES = List.of(DEFAULT_QUOTE, DEFAULT_SINGLE_QUOTE, NONE);
     }
 
+    public static class DeltaOptions {
+        private DeltaOptions() {
+        }
+
+        public static final String DECIMAL_TO_DOUBLE = "decimal-to-double";
+        public static final String TIMESTAMP_AS_LONG = "timestamp-to-long";
+        public static final String DATE_AS_INT = "date-to-int";
+        public static final String TIMEZONE = "timezone";
+    }
+
     public static class ParquetOptions {
         private ParquetOptions() {
         }
@@ -419,10 +428,5 @@
          */
         public static final String TIMEZONE = "timezone";
         public static final String HADOOP_TIMEZONE = ASTERIX_HADOOP_PREFIX + TIMEZONE;
-
-        /**
-         * Valid time zones that are supported by Java
-         */
-        public static final Set<String> VALID_TIME_ZONES = Set.of(TimeZone.getAvailableIDs());
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 6ba618d..950f3e2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -49,6 +49,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.TimeZone;
 import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -109,11 +111,9 @@
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
 
-import io.delta.standalone.DeltaLog;
-import io.delta.standalone.Snapshot;
-import io.delta.standalone.actions.AddFile;
-
 public class ExternalDataUtils {
+
+    private static final Set<String> validTimeZones = Set.of(TimeZone.getAvailableIDs());
     private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
     private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
     private static final int HEADER_FUDGE = 64;
@@ -476,8 +476,8 @@
 
         if (configuration.containsKey(ExternalDataConstants.TABLE_FORMAT)) {
             if (isDeltaTable(configuration)) {
-                configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_NOOP);
-                configuration.put(ExternalDataConstants.KEY_FORMAT, ExternalDataConstants.FORMAT_PARQUET);
+                configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
+                configuration.put(ExternalDataConstants.KEY_FORMAT, ExternalDataConstants.FORMAT_DELTA);
             }
             prepareTableFormat(configuration);
         }
@@ -499,23 +499,11 @@
             throw new CompilationException(ErrorCode.INVALID_DELTA_TABLE_FORMAT,
                     configuration.get(ExternalDataConstants.KEY_FORMAT));
         }
-    }
-
-    public static void prepareDeltaTableFormat(Map<String, String> configuration, Configuration conf,
-            String tableMetadataPath) {
-        DeltaLog deltaLog = DeltaLog.forTable(conf, tableMetadataPath);
-        Snapshot snapshot = deltaLog.snapshot();
-        List<AddFile> dataFiles = snapshot.getAllFiles();
-        StringBuilder builder = new StringBuilder();
-        for (AddFile batchFile : dataFiles) {
-            builder.append(",");
-            String path = batchFile.getPath();
-            builder.append(tableMetadataPath).append('/').append(path);
+        if (configuration.containsKey(ExternalDataConstants.DeltaOptions.TIMEZONE)
+                && !validTimeZones.contains(configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE))) {
+            throw new CompilationException(ErrorCode.INVALID_TIMEZONE,
+                    configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE));
         }
-        if (builder.length() > 0) {
-            builder.deleteCharAt(0);
-        }
-        configuration.put(ExternalDataConstants.KEY_PATH, builder.toString());
     }
 
     public static void prepareIcebergTableFormat(Map<String, String> configuration, Configuration conf,
@@ -964,7 +952,7 @@
             if (datasetRecordType.getFieldTypes().length != 0) {
                 throw new CompilationException(ErrorCode.UNSUPPORTED_TYPE_FOR_PARQUET, datasetRecordType.getTypeName());
             } else if (properties.containsKey(ParquetOptions.TIMEZONE)
-                    && !ParquetOptions.VALID_TIME_ZONES.contains(properties.get(ParquetOptions.TIMEZONE))) {
+                    && !validTimeZones.contains(properties.get(ParquetOptions.TIMEZONE))) {
                 //Ensure the configured time zone id is correct
                 throw new CompilationException(ErrorCode.INVALID_TIMEZONE, properties.get(ParquetOptions.TIMEZONE));
             }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index 50c75ff..5274c44 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -103,14 +103,17 @@
 
             try {
                 builder.setCredentials(GoogleCredentials.getApplicationDefault());
-            } catch (IOException ex) {
+            } catch (Exception ex) {
                 throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
             }
         } else if (jsonCredentials != null) {
             try (InputStream credentialsStream = new ByteArrayInputStream(jsonCredentials.getBytes())) {
                 builder.setCredentials(GoogleCredentials.fromStream(credentialsStream));
             } catch (IOException ex) {
-                throw new CompilationException(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+                throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+            } catch (Exception ex) {
+                throw new CompilationException(EXTERNAL_SOURCE_ERROR,
+                        "Encountered an issue while processing the JSON credentials. Please ensure the provided credentials are valid.");
             }
         } else {
             builder.setCredentials(NoCredentials.getInstance());
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
index bbf5195..793516c 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
@@ -22,4 +22,5 @@
 org.apache.asterix.external.parser.factory.RSSParserFactory
 org.apache.asterix.external.parser.factory.TweetParserFactory
 org.apache.asterix.external.parser.factory.NoOpDataParserFactory
-org.apache.asterix.external.parser.factory.AvroDataParserFactory
\ No newline at end of file
+org.apache.asterix.external.parser.factory.AvroDataParserFactory
+org.apache.asterix.external.parser.factory.DeltaTableDataParserFactory
\ No newline at end of file