[ASTERIXDB-3503][EXT] Introduce Delta Lake Support for Google Cloud Storage (GCS)

- user model changes: no
- storage format changes: no
- interface changes: yes

Ext-ref: MB-64376

Change-Id: I4cd44ba31a22cc124e346b077a1c2798ba9ab747
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19140
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index d29cd40..ba0d0f4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -18,245 +18,34 @@
  */
 package org.apache.asterix.external.input.record.reader.aws.delta;
 
-import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+import static org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
 
-import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
-import org.apache.asterix.external.api.IExternalDataRuntimeContext;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.projection.FunctionCallInformation;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.hadoop.mapred.JobConf;
 
-import io.delta.kernel.Scan;
-import io.delta.kernel.Snapshot;
-import io.delta.kernel.data.FilteredColumnarBatch;
-import io.delta.kernel.data.Row;
-import io.delta.kernel.defaults.engine.DefaultEngine;
-import io.delta.kernel.engine.Engine;
-import io.delta.kernel.exceptions.KernelException;
-import io.delta.kernel.internal.InternalScanFileUtils;
-import io.delta.kernel.types.StructType;
-import io.delta.kernel.utils.CloseableIterator;
-import io.delta.kernel.utils.FileStatus;
-
-public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
-
+public class AwsS3DeltaReaderFactory extends DeltaReaderFactory {
     private static final long serialVersionUID = 1L;
-    private static final List<String> recordReaderNames =
+    private static final List<String> RECORD_READER_NAMES =
             Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
-    private static final Logger LOGGER = LogManager.getLogger();
-    private transient AlgebricksAbsolutePartitionConstraint locationConstraints;
-    private String scanState;
-    private Map<String, String> configuration;
-    protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
 
     @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        return locationConstraints;
+    protected void configureJobConf(JobConf conf, Map<String, String> configuration) {
+        configureAwsS3HdfsJobConf(conf, configuration);
     }
 
     @Override
-    public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
-            IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
-            throws AlgebricksException, HyracksDataException {
-        this.configuration = configuration;
-        Configuration conf = new Configuration();
-        applyConfiguration(configuration, conf);
-        String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
-                + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
-                + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
-
-        ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
-
-        Engine engine = DefaultEngine.create(conf);
-        io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
-        Snapshot snapshot;
-        try {
-            snapshot = table.getLatestSnapshot(engine);
-        } catch (KernelException e) {
-            LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e);
-            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
-        }
-
-        List<Warning> warnings = new ArrayList<>();
-        DeltaConverterContext converterContext = new DeltaConverterContext(configuration, warnings);
-        AsterixTypeToDeltaTypeVisitor visitor = new AsterixTypeToDeltaTypeVisitor(converterContext);
-        StructType requiredSchema;
-        try {
-            ARecordType expectedType = HDFSUtils.getExpectedType(conf);
-            Map<String, FunctionCallInformation> functionCallInformationMap =
-                    HDFSUtils.getFunctionCallInformationMap(conf);
-            StructType fileSchema = snapshot.getSchema(engine);
-            requiredSchema = visitor.clipType(expectedType, fileSchema, functionCallInformationMap);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        } catch (AsterixDeltaRuntimeException e) {
-            throw e.getHyracksDataException();
-        }
-        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
-        scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
-        CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
-
-        List<Row> scanFiles = new ArrayList<>();
-        while (iter.hasNext()) {
-            FilteredColumnarBatch batch = iter.next();
-            CloseableIterator<Row> rowIter = batch.getRows();
-            while (rowIter.hasNext()) {
-                Row row = rowIter.next();
-                scanFiles.add(row);
-            }
-        }
-        locationConstraints = configureLocationConstraints(appCtx, scanFiles);
-        configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
-        distributeFiles(scanFiles);
-        issueWarnings(warnings, warningCollector);
-    }
-
-    private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) {
-        if (!warnings.isEmpty()) {
-            for (Warning warning : warnings) {
-                if (warningCollector.shouldWarn()) {
-                    warningCollector.warn(warning);
-                }
-            }
-        }
-        warnings.clear();
-    }
-
-    private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx,
-            List<Row> scanFiles) {
-        IClusterStateManager csm = appCtx.getClusterStateManager();
-
-        String[] locations = csm.getClusterLocations().getLocations();
-        if (scanFiles.size() == 0) {
-            return AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
-        } else if (locations.length > scanFiles.size()) {
-            LOGGER.debug(
-                    "analytics partitions ({}) exceeds total partition count ({}); limiting ingestion partitions to total partition count",
-                    locations.length, scanFiles.size());
-            final String[] locationCopy = locations.clone();
-            ArrayUtils.shuffle(locationCopy);
-            locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
-        }
-        return new AlgebricksAbsolutePartitionConstraint(locations);
-    }
-
-    private void distributeFiles(List<Row> scanFiles) {
-        final int partitionsCount = getPartitionConstraint().getLocations().length;
-        PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount,
-                Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
-
-        // Prepare the workloads based on the number of partitions
-        for (int i = 0; i < partitionsCount; i++) {
-            workloadQueue.add(new PartitionWorkLoadBasedOnSize());
-        }
-        for (Row scanFileRow : scanFiles) {
-            PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
-            FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow);
-            workload.addScanFile(RowSerDe.serializeRowToJson(scanFileRow), fileStatus.getSize());
-            workloadQueue.add(workload);
-        }
-        partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
-    }
-
-    public static void applyConfiguration(Map<String, String> configuration, Configuration conf) {
-        conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
-        conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
-        if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
-            conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
-        }
-        conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME));
-        String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
-        if (serviceEndpoint != null) {
-            conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        }
-        conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
-                configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
-        conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
-                configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, ""));
-    }
-
-    @Override
-    public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException {
-        try {
-            int partition = context.getPartition();
-            return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState,
-                    configuration, context);
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    @Override
-    public Class<?> getRecordClass() throws AsterixException {
-        return Row.class;
+    protected String getTablePath(Map<String, String> configuration) {
+        return S3Utils.getPath(configuration);
     }
 
     @Override
     public List<String> getRecordReaderNames() {
-        return recordReaderNames;
-    }
-
-    @Override
-    public Set<String> getReaderSupportedFormats() {
-        return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
-    }
-
-    public static class PartitionWorkLoadBasedOnSize implements Serializable {
-        private static final long serialVersionUID = 1L;
-        private final List<String> scanFiles = new ArrayList<>();
-        private long totalSize = 0;
-
-        public PartitionWorkLoadBasedOnSize() {
-        }
-
-        public List<String> getScanFiles() {
-            return scanFiles;
-        }
-
-        public void addScanFile(String scanFile, long size) {
-            this.scanFiles.add(scanFile);
-            this.totalSize += size;
-        }
-
-        public long getTotalSize() {
-            return totalSize;
-        }
-
-        @Override
-        public String toString() {
-            return "Files: " + scanFiles.size() + ", Total Size: " + totalSize;
-        }
+        return RECORD_READER_NAMES;
     }
 
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index a5b21b6..a094c22 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -19,24 +19,21 @@
 package org.apache.asterix.external.input.record.reader.aws.delta;
 
 import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 
-import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.asterix.external.util.IFeedLogManager;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
 
 import io.delta.kernel.Scan;
 import io.delta.kernel.data.ColumnarBatch;
@@ -69,20 +66,10 @@
     private Row scanFile;
     private CloseableIterator<Row> rows;
 
-    public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, Map<String, String> conf,
-            IExternalDataRuntimeContext context) {
-        Configuration config = new Configuration();
-        config.set(S3Constants.HADOOP_ACCESS_KEY_ID, conf.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
-        config.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, conf.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
-        if (conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
-            config.set(S3Constants.HADOOP_SESSION_TOKEN, conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
-        }
-        config.set(S3Constants.HADOOP_REGION, conf.get(S3Constants.REGION_FIELD_NAME));
-        String serviceEndpoint = conf.get(SERVICE_END_POINT_FIELD_NAME);
-        if (serviceEndpoint != null) {
-            config.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        }
-        this.engine = DefaultEngine.create(config);
+    public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, ConfFactory config)
+            throws HyracksDataException {
+        JobConf conf = config.getConf();
+        this.engine = DefaultEngine.create(conf);
         this.scanFiles = new ArrayList<>();
         for (String scanFile : serScanFiles) {
             this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
new file mode 100644
index 0000000..dc4c310
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.delta.kernel.Scan;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
+
+public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = LogManager.getLogger();
+    private transient AlgebricksAbsolutePartitionConstraint locationConstraints;
+    private String scanState;
+    protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
+    protected ConfFactory confFactory;
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        return locationConstraints;
+    }
+
+    protected abstract void configureJobConf(JobConf conf, Map<String, String> configuration)
+            throws AlgebricksException;
+
+    protected abstract String getTablePath(Map<String, String> configuration) throws AlgebricksException;
+
+    @Override
+    public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
+            IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+            throws AlgebricksException, HyracksDataException {
+        JobConf conf = new JobConf();
+        ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
+        configureJobConf(conf, configuration);
+        confFactory = new ConfFactory(conf);
+        String tableMetadataPath = getTablePath(configuration);
+        Engine engine = DefaultEngine.create(conf);
+        io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
+        Snapshot snapshot;
+        try {
+            snapshot = table.getLatestSnapshot(engine);
+        } catch (KernelException e) {
+            LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e);
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
+        }
+
+        List<Warning> warnings = new ArrayList<>();
+        DeltaConverterContext converterContext = new DeltaConverterContext(configuration, warnings);
+        AsterixTypeToDeltaTypeVisitor visitor = new AsterixTypeToDeltaTypeVisitor(converterContext);
+        StructType requiredSchema;
+        try {
+            ARecordType expectedType = HDFSUtils.getExpectedType(conf);
+            Map<String, FunctionCallInformation> functionCallInformationMap =
+                    HDFSUtils.getFunctionCallInformationMap(conf);
+            StructType fileSchema = snapshot.getSchema(engine);
+            requiredSchema = visitor.clipType(expectedType, fileSchema, functionCallInformationMap);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } catch (AsterixDeltaRuntimeException e) {
+            throw e.getHyracksDataException();
+        }
+        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build();
+        scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
+        CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine);
+
+        List<Row> scanFiles = new ArrayList<>();
+        while (iter.hasNext()) {
+            FilteredColumnarBatch batch = iter.next();
+            CloseableIterator<Row> rowIter = batch.getRows();
+            while (rowIter.hasNext()) {
+                Row row = rowIter.next();
+                scanFiles.add(row);
+            }
+        }
+        locationConstraints = configureLocationConstraints(appCtx, scanFiles);
+        configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA);
+        distributeFiles(scanFiles);
+        issueWarnings(warnings, warningCollector);
+    }
+
+    private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) {
+        if (!warnings.isEmpty()) {
+            for (Warning warning : warnings) {
+                if (warningCollector.shouldWarn()) {
+                    warningCollector.warn(warning);
+                }
+            }
+        }
+        warnings.clear();
+    }
+
+    private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx,
+            List<Row> scanFiles) {
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+
+        String[] locations = csm.getClusterLocations().getLocations();
+        if (scanFiles.size() == 0) {
+            return AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
+        } else if (locations.length > scanFiles.size()) {
+            LOGGER.debug(
+                    "configured partitions ({}) exceeds total partition count ({}); limiting configured partitions to total partition count",
+                    locations.length, scanFiles.size());
+            final String[] locationCopy = locations.clone();
+            ArrayUtils.shuffle(locationCopy);
+            locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
+        }
+        return new AlgebricksAbsolutePartitionConstraint(locations);
+    }
+
+    private void distributeFiles(List<Row> scanFiles) {
+        final int partitionsCount = getPartitionConstraint().getLocations().length;
+        PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount,
+                Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+
+        // Prepare the workloads based on the number of partitions
+        for (int i = 0; i < partitionsCount; i++) {
+            workloadQueue.add(new PartitionWorkLoadBasedOnSize());
+        }
+        for (Row scanFileRow : scanFiles) {
+            PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+            FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow);
+            workload.addScanFile(RowSerDe.serializeRowToJson(scanFileRow), fileStatus.getSize());
+            workloadQueue.add(workload);
+        }
+        partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
+    }
+
+    @Override
+    public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException {
+        try {
+            int partition = context.getPartition();
+            return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState,
+                    confFactory);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public Class<?> getRecordClass() throws AsterixException {
+        return Row.class;
+    }
+
+    @Override
+    public Set<String> getReaderSupportedFormats() {
+        return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
+    }
+
+    public static class PartitionWorkLoadBasedOnSize implements Serializable {
+        private static final long serialVersionUID = 1L;
+        private final List<String> scanFiles = new ArrayList<>();
+        private long totalSize = 0;
+
+        public PartitionWorkLoadBasedOnSize() {
+        }
+
+        public List<String> getScanFiles() {
+            return scanFiles;
+        }
+
+        public void addScanFile(String scanFile, long size) {
+            this.scanFiles.add(scanFile);
+            this.totalSize += size;
+        }
+
+        public long getTotalSize() {
+            return totalSize;
+        }
+
+        @Override
+        public String toString() {
+            return "Files: " + scanFiles.size() + ", Total Size: " + totalSize;
+        }
+    }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
new file mode 100644
index 0000000..ee88569
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.gcs.delta;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.google.gcs.GCSUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public class GCSDeltaReaderFactory extends DeltaReaderFactory {
+    private static final long serialVersionUID = 1L;
+    private static final List<String> RECORD_READER_NAMES =
+            Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_GCS);
+
+    @Override
+    protected void configureJobConf(JobConf conf, Map<String, String> configuration) throws AlgebricksException {
+        GCSUtils.configureHdfsJobConf(conf, configuration);
+    }
+
+    @Override
+    protected String getTablePath(Map<String, String> configuration) throws AlgebricksException {
+        return GCSUtils.getPath(configuration);
+    }
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return RECORD_READER_NAMES;
+    }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index c7deb7c..6767f93 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -29,8 +29,10 @@
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START;
+import static org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
 import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties;
 import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
+import static org.apache.asterix.external.util.google.gcs.GCSUtils.configureHdfsJobConf;
 import static org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties;
 import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
@@ -72,14 +74,15 @@
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
-import org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory;
 import org.apache.asterix.external.library.JavaLibrary;
 import org.apache.asterix.external.library.msgpack.MessagePackUtils;
 import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
 import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.asterix.external.util.azure.blob_storage.AzureConstants;
 import org.apache.asterix.external.util.google.gcs.GCSConstants;
+import org.apache.asterix.external.util.google.gcs.GCSUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.AUnionType;
@@ -90,6 +93,7 @@
 import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -515,21 +519,22 @@
         }
     }
 
-    public static void validateDeltaTableExists(Map<String, String> configuration) throws CompilationException {
-        Configuration conf = new Configuration();
+    public static void validateDeltaTableExists(Map<String, String> configuration) throws AlgebricksException {
         String tableMetadataPath = null;
+        JobConf conf = new JobConf();
         if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
                 .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
-            AwsS3DeltaReaderFactory.applyConfiguration(configuration, conf);
-            tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
-                    + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
-                    + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+            configureAwsS3HdfsJobConf(conf, configuration);
+            tableMetadataPath = S3Utils.getPath(configuration);
+        } else if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
+                .equals(ExternalDataConstants.KEY_ADAPTER_NAME_GCS)) {
+            configureHdfsJobConf(conf, configuration);
+            tableMetadataPath = GCSUtils.getPath(configuration);
         } else {
             throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
                     "Delta format is not supported for the external source type: "
                             + configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE));
         }
-
         Engine engine = DefaultEngine.create(conf);
         io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
         try {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index f36d25d..45988e8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -333,6 +333,10 @@
      * @param configuration      properties
      * @param numberOfPartitions number of partitions in the cluster
      */
+    public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration) {
+        configureAwsS3HdfsJobConf(conf, configuration, 0);
+    }
+
     public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
             int numberOfPartitions) {
         String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
@@ -371,7 +375,9 @@
         /*
          * Set the size of S3 connection pool to be the number of partitions
          */
-        conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
+        if (numberOfPartitions != 0) {
+            conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
+        }
 
         if (serviceEndpoint != null) {
             // Validation of the URL should be done at hadoop-aws level
@@ -470,7 +476,11 @@
             throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
         }
         if (isDeltaTable(configuration)) {
-            validateDeltaTableExists(configuration);
+            try {
+                validateDeltaTableExists(configuration);
+            } catch (AlgebricksException e) {
+                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, e);
+            }
         }
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index a2b50e1..d8dd478 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -292,4 +292,10 @@
         allObjects.put("folders", folders);
         return allObjects;
     }
+
+    public static String getPath(Map<String, String> configuration) {
+        return S3Constants.HADOOP_S3_PROTOCOL + "://"
+                + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+                + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index 74a664d..bfd35fc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -22,6 +22,8 @@
 import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
 import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
 import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
 import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
@@ -140,9 +142,11 @@
      */
     public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
             IWarningCollector collector) throws CompilationException {
-
+        if (isDeltaTable(configuration)) {
+            validateDeltaTableProperties(configuration);
+        }
         // check if the format property is present
-        if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+        else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
             throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
         }
 
@@ -224,6 +228,11 @@
      * @param configuration      properties
      * @param numberOfPartitions number of partitions in the cluster
      */
+    public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration)
+            throws AlgebricksException {
+        configureHdfsJobConf(conf, configuration, 0);
+    }
+
     public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration, int numberOfPartitions)
             throws AlgebricksException {
         String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
@@ -267,4 +276,10 @@
             conf.set(HADOOP_ENDPOINT, endpoint);
         }
     }
+
+    public static String getPath(Map<String, String> configuration) {
+        return GCSConstants.HADOOP_GCS_PROTOCOL + "://"
+                + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+                + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 2c15b5a..1f25c4b 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -28,4 +28,5 @@
 org.apache.asterix.external.input.record.reader.azure.datalake.AzureDataLakeReaderFactory
 org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory
 org.apache.asterix.external.input.record.reader.azure.parquet.AzureDataLakeParquetReaderFactory
-org.apache.asterix.external.input.record.reader.gcs.parquet.GCSParquetReaderFactory
\ No newline at end of file
+org.apache.asterix.external.input.record.reader.gcs.parquet.GCSParquetReaderFactory
+org.apache.asterix.external.input.record.reader.gcs.delta.GCSDeltaReaderFactory
\ No newline at end of file