Refactor ExternalDataUtils/Constants

Change-Id: Ie1f1499f13968d421bee43ec7352aea0c2749423
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15644
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java
index 156b78a..48cf511 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java
@@ -20,7 +20,7 @@
 
 import static java.util.regex.Pattern.CASE_INSENSITIVE;
 import static java.util.regex.Pattern.DOTALL;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.SECRET_ACCESS_KEY_FIELD_NAME;
 
 import java.util.regex.Pattern;
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index f14af53..bbcf9cd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.external.input.record.reader.aws;
 
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.IOException;
@@ -32,7 +31,7 @@
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
@@ -109,7 +108,7 @@
     }
 
     private boolean shouldRetry(String errorCode, int currentRetry) {
-        return currentRetry < MAX_RETRIES && AwsS3.isRetryableError(errorCode);
+        return currentRetry < MAX_RETRIES && S3Utils.isRetryableError(errorCode);
     }
 
     @Override
@@ -134,7 +133,7 @@
 
     private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException {
         try {
-            return ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
+            return S3Utils.buildAwsS3Client(configuration);
         } catch (CompilationException ex) {
             throw HyracksDataException.create(ex);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 89ea39e..a241354 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -26,6 +26,7 @@
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -53,8 +54,7 @@
         IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
 
         //Get a list of S3 objects
-        List<S3Object> filesOnly =
-                ExternalDataUtils.AwsS3.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
+        List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
         // Distribute work load amongst the partitions
         distributeWorkLoad(filesOnly, getPartitionsCount());
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 803e657..ff93a46 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -28,6 +28,8 @@
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -52,7 +54,7 @@
         //Configure Hadoop S3 input splits
         JobConf conf = createHdfsConf(serviceCtx, configuration);
         int numberOfPartitions = getPartitionConstraint().getLocations().length;
-        ExternalDataUtils.AwsS3.configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions);
+        S3Utils.configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions);
         configureHdfsConf(conf, configuration);
     }
 
@@ -89,8 +91,7 @@
             throws CompilationException {
         String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
         IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
-        List<S3Object> filesOnly =
-                ExternalDataUtils.AwsS3.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
+        List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
         StringBuilder builder = new StringBuilder();
 
         if (!filesOnly.isEmpty()) {
@@ -105,7 +106,7 @@
     }
 
     private static void appendFileURI(StringBuilder builder, String container, S3Object file) {
-        builder.append(ExternalDataConstants.AwsS3.HADOOP_S3_PROTOCOL);
+        builder.append(S3Constants.HADOOP_S3_PROTOCOL);
         builder.append("://");
         builder.append(container);
         builder.append('/');
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
index cdb3834..bbfece2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.input.record.reader.azure.blob;
 
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureBlobClient;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.IOException;
@@ -31,7 +32,6 @@
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.util.LogRedactionUtil;
 
@@ -86,7 +86,7 @@
     private BlobServiceClient buildAzureClient(IApplicationContext appCtx, Map<String, String> configuration)
             throws HyracksDataException {
         try {
-            return ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
+            return buildAzureBlobClient(appCtx, configuration);
         } catch (CompilationException ex) {
             throw HyracksDataException.create(ex);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
index 064b319..55c0521 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.input.record.reader.azure.blob;
 
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureBlobClient;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listBlobItems;
+
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -57,9 +60,9 @@
         // Ensure the validity of include/exclude
         ExternalDataUtils.validateIncludeExclude(configuration);
         IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
-        BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
-        List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration,
-                includeExcludeMatcher, warningCollector);
+        BlobServiceClient blobServiceClient = buildAzureBlobClient(appCtx, configuration);
+        List<BlobItem> filesOnly =
+                listBlobItems(blobServiceClient, configuration, includeExcludeMatcher, warningCollector);
 
         // Distribute work load amongst the partitions
         distributeWorkLoad(filesOnly, getPartitionsCount());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
index e34d188..7a95222 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.input.record.reader.azure.datalake;
 
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureDatalakeClient;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.IOException;
@@ -31,7 +32,6 @@
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.util.LogRedactionUtil;
 
@@ -86,7 +86,7 @@
     private DataLakeServiceClient buildAzureClient(IApplicationContext appCtx, Map<String, String> configuration)
             throws HyracksDataException {
         try {
-            return ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);
+            return buildAzureDatalakeClient(appCtx, configuration);
         } catch (CompilationException ex) {
             throw HyracksDataException.create(ex);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
index e9f8d4c..929cb6e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.input.record.reader.azure.datalake;
 
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureDatalakeClient;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listDatalakePathItems;
+
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -57,9 +60,9 @@
         // Ensure the validity of include/exclude
         ExternalDataUtils.validateIncludeExclude(configuration);
         IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
-        DataLakeServiceClient client = ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);
-        List<PathItem> filesOnly = ExternalDataUtils.Azure.listDatalakePathItems(client, configuration,
-                includeExcludeMatcher, warningCollector);
+        DataLakeServiceClient client = buildAzureDatalakeClient(appCtx, configuration);
+        List<PathItem> filesOnly =
+                listDatalakePathItems(client, configuration, includeExcludeMatcher, warningCollector);
 
         // Distribute work load amongst the partitions
         distributeWorkLoad(filesOnly, getPartitionsCount());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index c2251df..e08013c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -18,6 +18,11 @@
  */
 package org.apache.asterix.external.input.record.reader.azure.parquet;
 
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_BLOB_PROTOCOL;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureBlobClient;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.configureAzureHdfsJobConf;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listBlobItems;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -47,7 +52,7 @@
     public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
             IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
         IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
-        BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
+        BlobServiceClient blobServiceClient = buildAzureBlobClient(appCtx, configuration);
         //Get endpoint
         String endPoint = extractEndPoint(blobServiceClient.getAccountUrl());
         //Get path
@@ -57,7 +62,7 @@
 
         //Configure Hadoop Azure input splits
         JobConf conf = createHdfsConf(serviceCtx, configuration);
-        ExternalDataUtils.Azure.configureAzureHdfsJobConf(conf, configuration, endPoint);
+        configureAzureHdfsJobConf(conf, configuration, endPoint);
         configureHdfsConf(conf, configuration);
     }
 
@@ -94,8 +99,8 @@
     private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector,
             BlobServiceClient blobServiceClient, String endPoint) throws CompilationException {
         IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
-        List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration,
-                includeExcludeMatcher, warningCollector);
+        List<BlobItem> filesOnly =
+                listBlobItems(blobServiceClient, configuration, includeExcludeMatcher, warningCollector);
 
         StringBuilder builder = new StringBuilder();
         String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -118,7 +123,7 @@
     }
 
     private static void appendFileURI(StringBuilder builder, String container, String endPoint, BlobItem file) {
-        builder.append(ExternalDataConstants.Azure.HADOOP_AZURE_BLOB_PROTOCOL);
+        builder.append(HADOOP_AZURE_BLOB_PROTOCOL);
         builder.append("://");
         builder.append(container);
         builder.append('@');
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
index db87868..c98fc8b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
@@ -18,6 +18,11 @@
  */
 package org.apache.asterix.external.input.record.reader.azure.parquet;
 
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_DATALAKE_PROTOCOL;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureDatalakeClient;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.configureAzureHdfsJobConf;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listDatalakePathItems;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -47,8 +52,7 @@
     public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
             IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
         IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
-        DataLakeServiceClient dataLakeServiceClient =
-                ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);
+        DataLakeServiceClient dataLakeServiceClient = buildAzureDatalakeClient(appCtx, configuration);
 
         //Get endpoint
         String endPoint = extractEndPoint(dataLakeServiceClient.getAccountUrl());
@@ -61,7 +65,7 @@
 
         //Configure Hadoop Azure input splits
         JobConf conf = createHdfsConf(serviceCtx, configuration);
-        ExternalDataUtils.Azure.configureAzureHdfsJobConf(conf, configuration, endPoint);
+        configureAzureHdfsJobConf(conf, configuration, endPoint);
         configureHdfsConf(conf, configuration);
     }
 
@@ -98,8 +102,8 @@
     private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector,
             DataLakeServiceClient dataLakeServiceClient, String endPoint) throws CompilationException {
         IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
-        List<PathItem> filesOnly = ExternalDataUtils.Azure.listDatalakePathItems(dataLakeServiceClient, configuration,
-                includeExcludeMatcher, warningCollector);
+        List<PathItem> filesOnly =
+                listDatalakePathItems(dataLakeServiceClient, configuration, includeExcludeMatcher, warningCollector);
 
         StringBuilder builder = new StringBuilder();
         String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -122,7 +126,7 @@
     }
 
     private static void appendFileURI(StringBuilder builder, String container, String endPoint, PathItem file) {
-        builder.append(ExternalDataConstants.Azure.HADOOP_AZURE_DATALAKE_PROTOCOL);
+        builder.append(HADOOP_AZURE_DATALAKE_PROTOCOL);
         builder.append("://");
         builder.append(container);
         builder.append('@');
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
index 652fa3e..007e8be 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
@@ -32,7 +32,7 @@
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.google.gcs.GCSUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
@@ -134,7 +134,7 @@
 
     private Storage buildClient(Map<String, String> configuration) throws HyracksDataException {
         try {
-            return ExternalDataUtils.GCS.buildClient(configuration);
+            return GCSUtils.buildClient(configuration);
         } catch (CompilationException ex) {
             throw HyracksDataException.create(ex);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
index 0e7ea90..1bc51f2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.input.record.reader.gcs;
 
 import static org.apache.asterix.external.util.ExternalDataUtils.getIncludeExcludeMatchers;
+import static org.apache.asterix.external.util.google.gcs.GCSUtils.buildClient;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.util.ArrayList;
@@ -67,7 +68,7 @@
         // Prepare to retrieve the objects
         List<Blob> filesOnly = new ArrayList<>();
         String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        Storage gcs = ExternalDataUtils.GCS.buildClient(configuration);
+        Storage gcs = buildClient(configuration);
         Storage.BlobListOption options = Storage.BlobListOption.prefix(ExternalDataUtils.getPrefix(configuration));
         Page<Blob> items;
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index f0b9c90..429706e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -333,107 +333,4 @@
          */
         public static final Set<String> VALID_TIME_ZONES = Set.of(TimeZone.getAvailableIDs());
     }
-
-    public static class AwsS3 {
-        private AwsS3() {
-            throw new AssertionError("do not instantiate");
-        }
-
-        public static final String REGION_FIELD_NAME = "region";
-        public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId";
-        public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey";
-        public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken";
-        public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
-
-        // AWS S3 specific error codes
-        public static final String ERROR_INTERNAL_ERROR = "InternalError";
-        public static final String ERROR_SLOW_DOWN = "SlowDown";
-        public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented";
-
-        public static boolean isRetryableError(String errorCode) {
-            return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
-        }
-
-        /*
-         * Hadoop-AWS
-         * AWS connectors for s3 and s3n are deprecated.
-         */
-        public static final String HADOOP_ACCESS_KEY_ID = "fs.s3a.access.key";
-        public static final String HADOOP_SECRET_ACCESS_KEY = "fs.s3a.secret.key";
-        public static final String HADOOP_SESSION_TOKEN = "fs.s3a.session.token";
-        public static final String HADOOP_REGION = "fs.s3a.region";
-        public static final String HADOOP_SERVICE_END_POINT = "fs.s3a.endpoint";
-
-        /*
-         * Internal configurations
-         */
-        //Allows accessing directories as file system path
-        public static final String HADOOP_PATH_STYLE_ACCESS = "fs.s3a.path.style.access";
-        //The number of maximum HTTP connections in connection pool
-        public static final String HADOOP_S3_CONNECTION_POOL_SIZE = "fs.s3a.connection.maximum";
-        //S3 used protocol
-        public static final String HADOOP_S3_PROTOCOL = "s3a";
-
-        //Hadoop credentials provider key
-        public static final String HADOOP_CREDENTIAL_PROVIDER_KEY = "fs.s3a.aws.credentials.provider";
-        //Anonymous credential provider
-        public static final String HADOOP_ANONYMOUS_ACCESS = "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider";
-        //Temporary credential provider
-        public static final String HADOOP_TEMP_ACCESS = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider";
-
-    }
-
-    /*
-     * Note: Azure Blob and Azure Datalake use identical authentication, so they are using the same properties.
-     * If they end up diverging, then properties for AzureBlob and AzureDataLake need to be created.
-     */
-    public static class Azure {
-        private Azure() {
-            throw new AssertionError("do not instantiate");
-        }
-
-        /*
-         * Asterix Configuration Keys
-         */
-        public static final String MANAGED_IDENTITY_ID_FIELD_NAME = "managedIdentityId";
-        public static final String ACCOUNT_NAME_FIELD_NAME = "accountName";
-        public static final String ACCOUNT_KEY_FIELD_NAME = "accountKey";
-        public static final String SHARED_ACCESS_SIGNATURE_FIELD_NAME = "sharedAccessSignature";
-        public static final String TENANT_ID_FIELD_NAME = "tenantId";
-        public static final String CLIENT_ID_FIELD_NAME = "clientId";
-        public static final String CLIENT_SECRET_FIELD_NAME = "clientSecret";
-        public static final String CLIENT_CERTIFICATE_FIELD_NAME = "clientCertificate";
-        public static final String CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME = "clientCertificatePassword";
-        public static final String ENDPOINT_FIELD_NAME = "endpoint";
-
-        // Specific Azure data lake property
-        /*
-        The behavior of Data Lake (true file system) is to read the files of the specified prefix only, example:
-        storage/myData/personal/file1.json
-        storage/myData/personal/file2.json
-        storage/myData/file3.json
-        If the prefix used is "myData", then only the file file3.json is read. However, if the property "recursive"
-        is set to "true" when creating the external dataset, then it goes recursively overall the paths, and the result
-        is file1.json, file2.json and file3.json.
-         */
-        public static final String RECURSIVE_FIELD_NAME = "recursive";
-
-        /*
-         * Hadoop-Azure
-         */
-        //Used when accountName and accessKey are provided
-        public static final String HADOOP_AZURE_FS_ACCOUNT_KEY = "fs.azure.account.key";
-        //Used when a connectionString is provided
-        public static final String HADOOP_AZURE_FS_SAS = "fs.azure.sas";
-        public static final String HADOOP_AZURE_BLOB_PROTOCOL = "wasbs";
-        public static final String HADOOP_AZURE_DATALAKE_PROTOCOL = "abfss";
-    }
-
-    public static class GCS {
-        private GCS() {
-            throw new AssertionError("do not instantiate");
-        }
-
-        public static final String JSON_CREDENTIALS_FIELD_NAME = "jsonCredentials";
-    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 8e38eed..702ef42 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,43 +18,9 @@
  */
 package org.apache.asterix.external.util;
 
-import static com.google.cloud.storage.Storage.BlobListOption;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
 import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL;
 import static org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_NOT_ALLOWED_AT_SAME_TIME;
 import static org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_REQUIRED;
-import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
-import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
-import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT;
-import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ACCESS_KEY_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_ACCESS_KEY_ID;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_ANONYMOUS_ACCESS;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_CREDENTIAL_PROVIDER_KEY;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_PATH_STYLE_ACCESS;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_S3_CONNECTION_POOL_SIZE;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_S3_PROTOCOL;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_SECRET_ACCESS_KEY;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_SESSION_TOKEN;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_TEMP_ACCESS;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.ACCOUNT_KEY_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.ACCOUNT_NAME_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_CERTIFICATE_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_SECRET_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.ENDPOINT_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_BLOB_PROTOCOL;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_FS_ACCOUNT_KEY;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_FS_SAS;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.MANAGED_IDENTITY_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.RECURSIVE_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.SHARED_ACCESS_SIGNATURE_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.TENANT_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.GCS.JSON_CREDENTIALS_FIELD_NAME;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ADAPTER_NAME_GCS;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ESCAPE;
@@ -65,18 +31,14 @@
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
+import static org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties;
 import static org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Base64;
@@ -85,7 +47,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
@@ -106,20 +67,17 @@
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
 import org.apache.asterix.external.library.JavaLibrary;
 import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.AUnionType;
 import org.apache.asterix.runtime.evaluators.common.NumberUtils;
 import org.apache.asterix.runtime.projection.DataProjectionInfo;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
-import org.apache.hadoop.fs.s3a.Constants;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
-import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.common.data.parsers.BooleanParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
@@ -129,46 +87,6 @@
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import org.apache.hyracks.util.StorageUtil;
 
-import com.azure.core.credential.AzureSasCredential;
-import com.azure.core.http.rest.PagedIterable;
-import com.azure.identity.ClientCertificateCredentialBuilder;
-import com.azure.identity.ClientSecretCredentialBuilder;
-import com.azure.identity.ManagedIdentityCredentialBuilder;
-import com.azure.storage.blob.BlobContainerClient;
-import com.azure.storage.blob.BlobServiceClient;
-import com.azure.storage.blob.BlobServiceClientBuilder;
-import com.azure.storage.blob.models.BlobItem;
-import com.azure.storage.blob.models.ListBlobsOptions;
-import com.azure.storage.common.StorageSharedKeyCredential;
-import com.azure.storage.common.policy.RequestRetryOptions;
-import com.azure.storage.file.datalake.DataLakeFileSystemClient;
-import com.azure.storage.file.datalake.DataLakeServiceClient;
-import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
-import com.azure.storage.file.datalake.models.ListPathsOptions;
-import com.azure.storage.file.datalake.models.PathItem;
-import com.google.api.gax.paging.Page;
-import com.google.auth.oauth2.ServiceAccountCredentials;
-import com.google.cloud.storage.Blob;
-import com.google.cloud.storage.Storage;
-import com.google.cloud.storage.StorageOptions;
-
-import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.exception.SdkException;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3ClientBuilder;
-import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
-import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
-import software.amazon.awssdk.services.s3.model.S3Exception;
-import software.amazon.awssdk.services.s3.model.S3Object;
-import software.amazon.awssdk.services.s3.model.S3Response;
-
 public class ExternalDataUtils {
     private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
     private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
@@ -604,16 +522,16 @@
 
         switch (type) {
             case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
-                AwsS3.validateProperties(configuration, srcLoc, collector);
+                S3Utils.validateProperties(configuration, srcLoc, collector);
                 break;
             case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
-                Azure.validateAzureBlobProperties(configuration, srcLoc, collector, appCtx);
+                validateAzureBlobProperties(configuration, srcLoc, collector, appCtx);
                 break;
             case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE:
-                Azure.validateAzureDataLakeProperties(configuration, srcLoc, collector, appCtx);
+                validateAzureDataLakeProperties(configuration, srcLoc, collector, appCtx);
                 break;
             case KEY_ADAPTER_NAME_GCS:
-                GCS.validateProperties(configuration, srcLoc, collector);
+                validateProperties(configuration, srcLoc, collector);
                 break;
             default:
                 // Nothing needs to be done
@@ -844,7 +762,7 @@
         }
     }
 
-    private static boolean isParquetFormat(Map<String, String> properties) {
+    public static boolean isParquetFormat(Map<String, String> properties) {
         String inputFormat = properties.get(ExternalDataConstants.KEY_INPUT_FORMAT);
         return ExternalDataConstants.CLASS_NAME_PARQUET_INPUT_FORMAT.equals(inputFormat)
                 || ExternalDataConstants.INPUT_FORMAT_PARQUET.equals(inputFormat)
@@ -893,1008 +811,6 @@
         return encoder.encodeToString(byteArrayOutputStream.toByteArray());
     }
 
-    public static class AwsS3 {
-        private AwsS3() {
-            throw new AssertionError("do not instantiate");
-        }
-
-        /**
-         * Builds the S3 client using the provided configuration
-         *
-         * @param configuration properties
-         * @return S3 client
-         * @throws CompilationException CompilationException
-         */
-        public static S3Client buildAwsS3Client(Map<String, String> configuration) throws CompilationException {
-            // TODO(Hussain): Need to ensure that all required parameters are present in a previous step
-            String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-            String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-            String sessionToken = configuration.get(ExternalDataConstants.AwsS3.SESSION_TOKEN_FIELD_NAME);
-            String regionId = configuration.get(ExternalDataConstants.AwsS3.REGION_FIELD_NAME);
-            String serviceEndpoint = configuration.get(ExternalDataConstants.AwsS3.SERVICE_END_POINT_FIELD_NAME);
-
-            S3ClientBuilder builder = S3Client.builder();
-
-            // Credentials
-            AwsCredentialsProvider credentialsProvider;
-
-            // No auth required
-            if (accessKeyId == null) {
-                credentialsProvider = AnonymousCredentialsProvider.create();
-            } else {
-                // auth required, check for temporary or permanent credentials
-                if (sessionToken != null) {
-                    credentialsProvider = StaticCredentialsProvider
-                            .create(AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
-                } else {
-                    credentialsProvider =
-                            StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey));
-                }
-            }
-
-            builder.credentialsProvider(credentialsProvider);
-
-            // Validate the region
-            List<Region> regions = S3Client.serviceMetadata().regions();
-            Optional<Region> selectedRegion =
-                    regions.stream().filter(region -> region.id().equals(regionId)).findFirst();
-
-            if (selectedRegion.isEmpty()) {
-                throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
-            }
-            builder.region(selectedRegion.get());
-
-            // Validate the service endpoint if present
-            if (serviceEndpoint != null) {
-                try {
-                    URI uri = new URI(serviceEndpoint);
-                    try {
-                        builder.endpointOverride(uri);
-                    } catch (NullPointerException ex) {
-                        throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
-                    }
-                } catch (URISyntaxException ex) {
-                    throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
-                            String.format("Invalid service endpoint %s", serviceEndpoint));
-                }
-            }
-
-            return builder.build();
-        }
-
-        /**
-         * Builds the S3 client using the provided configuration
-         *
-         * @param configuration      properties
-         * @param numberOfPartitions number of partitions in the cluster
-         */
-        public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
-                int numberOfPartitions) {
-            String accessKeyId = configuration.get(ExternalDataConstants.AwsS3.ACCESS_KEY_ID_FIELD_NAME);
-            String secretAccessKey = configuration.get(ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME);
-            String sessionToken = configuration.get(ExternalDataConstants.AwsS3.SESSION_TOKEN_FIELD_NAME);
-            String serviceEndpoint = configuration.get(ExternalDataConstants.AwsS3.SERVICE_END_POINT_FIELD_NAME);
-
-            //Disable caching S3 FileSystem
-            HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
-
-            /*
-             * Authentication Methods:
-             * 1- Anonymous: no accessKeyId and no secretAccessKey
-             * 2- Temporary: has to provide accessKeyId, secretAccessKey and sessionToken
-             * 3- Private: has to provide accessKeyId and secretAccessKey
-             */
-            if (accessKeyId == null) {
-                //Tells hadoop-aws it is an anonymous access
-                conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS);
-            } else {
-                conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId);
-                conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey);
-                if (sessionToken != null) {
-                    conf.set(HADOOP_SESSION_TOKEN, sessionToken);
-                    //Tells hadoop-aws it is a temporary access
-                    conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS);
-                }
-            }
-
-            /*
-             * This is to allow S3 definition to have path-style form. Should always be true to match the current
-             * way we access files in S3
-             */
-            conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
-
-            /*
-             * Set the size of S3 connection pool to be the number of partitions
-             */
-            conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
-
-            if (serviceEndpoint != null) {
-                // Validation of the URL should be done at hadoop-aws level
-                conf.set(ExternalDataConstants.AwsS3.HADOOP_SERVICE_END_POINT, serviceEndpoint);
-            } else {
-                //Region is ignored and buckets could be found by the central endpoint
-                conf.set(ExternalDataConstants.AwsS3.HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
-            }
-        }
-
-        /**
-         * Validate external dataset properties
-         *
-         * @param configuration properties
-         * @throws CompilationException Compilation exception
-         */
-        public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
-                IWarningCollector collector) throws CompilationException {
-
-            // check if the format property is present
-            if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
-                throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
-            }
-
-            // Both parameters should be passed, or neither should be passed (for anonymous/no auth)
-            String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-            String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-            if (accessKeyId == null || secretAccessKey == null) {
-                // If one is passed, the other is required
-                if (accessKeyId != null) {
-                    throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
-                            ACCESS_KEY_ID_FIELD_NAME);
-                } else if (secretAccessKey != null) {
-                    throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
-                            SECRET_ACCESS_KEY_FIELD_NAME);
-                }
-            }
-
-            validateIncludeExclude(configuration);
-
-            // Check if the bucket is present
-            S3Client s3Client = buildAwsS3Client(configuration);
-            S3Response response;
-            boolean useOldApi = false;
-            String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-            String prefix = getPrefix(configuration);
-
-            try {
-                response = isBucketEmpty(s3Client, container, prefix, false);
-            } catch (S3Exception ex) {
-                // Method not implemented, try falling back to old API
-                try {
-                    // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-                    if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
-                        useOldApi = true;
-                        response = isBucketEmpty(s3Client, container, prefix, true);
-                    } else {
-                        throw ex;
-                    }
-                } catch (SdkException ex2) {
-                    throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
-                }
-            } catch (SdkException ex) {
-                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
-            } finally {
-                if (s3Client != null) {
-                    CleanupUtils.close(s3Client, null);
-                }
-            }
-
-            boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty()
-                    : ((ListObjectsV2Response) response).contents().isEmpty();
-            if (isEmpty && collector.shouldWarn()) {
-                Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-                collector.warn(warning);
-            }
-
-            // Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to
-            // ensure coverage, check if the result is successful as well and not only catch exceptions
-            if (!response.sdkHttpResponse().isSuccessful()) {
-                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
-            }
-        }
-
-        /**
-         * Checks for a single object in the specified bucket to determine if the bucket is empty or not.
-         *
-         * @param s3Client  s3 client
-         * @param container the container name
-         * @param prefix    Prefix to be used
-         * @param useOldApi flag whether to use the old API or not
-         * @return returns the S3 response
-         */
-        private static S3Response isBucketEmpty(S3Client s3Client, String container, String prefix, boolean useOldApi) {
-            S3Response response;
-            if (useOldApi) {
-                ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder();
-                listObjectsBuilder.prefix(prefix);
-                response = s3Client.listObjects(listObjectsBuilder.bucket(container).maxKeys(1).build());
-            } else {
-                ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder();
-                listObjectsBuilder.prefix(prefix);
-                response = s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build());
-            }
-            return response;
-        }
-
-        /**
-         * Returns the lists of S3 objects.
-         *
-         * @param configuration         properties
-         * @param includeExcludeMatcher include/exclude matchers to apply
-         */
-        public static List<S3Object> listS3Objects(Map<String, String> configuration,
-                IncludeExcludeMatcher includeExcludeMatcher, IWarningCollector warningCollector)
-                throws CompilationException {
-            // Prepare to retrieve the objects
-            List<S3Object> filesOnly;
-            String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-            S3Client s3Client = buildAwsS3Client(configuration);
-            String prefix = getPrefix(configuration);
-
-            try {
-                filesOnly = listS3Objects(s3Client, container, prefix, includeExcludeMatcher);
-            } catch (S3Exception ex) {
-                // New API is not implemented, try falling back to old API
-                try {
-                    // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-                    if (ex.awsErrorDetails().errorCode()
-                            .equals(ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED)) {
-                        filesOnly = oldApiListS3Objects(s3Client, container, prefix, includeExcludeMatcher);
-                    } else {
-                        throw ex;
-                    }
-                } catch (SdkException ex2) {
-                    throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
-                }
-            } catch (SdkException ex) {
-                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
-            } finally {
-                if (s3Client != null) {
-                    CleanupUtils.close(s3Client, null);
-                }
-            }
-
-            // Warn if no files are returned
-            if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
-                Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-                warningCollector.warn(warning);
-            }
-
-            return filesOnly;
-        }
-
-        /**
-         * Uses the latest API to retrieve the objects from the storage.
-         *
-         * @param s3Client              S3 client
-         * @param container             container name
-         * @param prefix                definition prefix
-         * @param includeExcludeMatcher include/exclude matchers to apply
-         */
-        private static List<S3Object> listS3Objects(S3Client s3Client, String container, String prefix,
-                IncludeExcludeMatcher includeExcludeMatcher) {
-            String newMarker = null;
-            List<S3Object> filesOnly = new ArrayList<>();
-
-            ListObjectsV2Response listObjectsResponse;
-            ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container);
-            listObjectsBuilder.prefix(prefix);
-
-            while (true) {
-                // List the objects from the start, or from the last marker in case of truncated result
-                if (newMarker == null) {
-                    listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build());
-                } else {
-                    listObjectsResponse =
-                            s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
-                }
-
-                // Collect the paths to files only
-                collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
-                        includeExcludeMatcher.getMatchersList(), filesOnly);
-
-                // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
-                if (!listObjectsResponse.isTruncated()) {
-                    break;
-                } else {
-                    newMarker = listObjectsResponse.nextContinuationToken();
-                }
-            }
-
-            return filesOnly;
-        }
-
-        /**
-         * Uses the old API (in case the new API is not implemented) to retrieve the objects from the storage
-         *
-         * @param s3Client              S3 client
-         * @param container             container name
-         * @param prefix                definition prefix
-         * @param includeExcludeMatcher include/exclude matchers to apply
-         */
-        private static List<S3Object> oldApiListS3Objects(S3Client s3Client, String container, String prefix,
-                IncludeExcludeMatcher includeExcludeMatcher) {
-            String newMarker = null;
-            List<S3Object> filesOnly = new ArrayList<>();
-
-            ListObjectsResponse listObjectsResponse;
-            ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container);
-            listObjectsBuilder.prefix(prefix);
-
-            while (true) {
-                // List the objects from the start, or from the last marker in case of truncated result
-                if (newMarker == null) {
-                    listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build());
-                } else {
-                    listObjectsResponse = s3Client.listObjects(listObjectsBuilder.marker(newMarker).build());
-                }
-
-                // Collect the paths to files only
-                collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
-                        includeExcludeMatcher.getMatchersList(), filesOnly);
-
-                // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
-                if (!listObjectsResponse.isTruncated()) {
-                    break;
-                } else {
-                    newMarker = listObjectsResponse.nextMarker();
-                }
-            }
-
-            return filesOnly;
-        }
-
-        /**
-         * AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered
-         * a file if it does not end up with a "/" which is the separator in a folder structure.
-         *
-         * @param s3Objects List of returned objects
-         */
-        private static void collectAndFilterFiles(List<S3Object> s3Objects,
-                BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<S3Object> filesOnly) {
-            for (S3Object object : s3Objects) {
-                // skip folders
-                if (object.key().endsWith("/")) {
-                    continue;
-                }
-
-                // No filter, add file
-                if (predicate.test(matchers, object.key())) {
-                    filesOnly.add(object);
-                }
-            }
-        }
-    }
-
-    /*
-     * Note: Azure Blob and Azure Datalake use identical authentication, so they are using the same properties.
-     * If they end up diverging, then properties for AzureBlob and AzureDataLake need to be created.
-     */
-    public static class Azure {
-        private Azure() {
-            throw new AssertionError("do not instantiate");
-        }
-
-        /**
-         * Builds the Azure storage account using the provided configuration
-         *
-         * @param configuration properties
-         * @return client
-         */
-        public static BlobServiceClient buildAzureBlobClient(IApplicationContext appCtx,
-                Map<String, String> configuration) throws CompilationException {
-            String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
-            String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
-            String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
-            String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
-            String tenantId = configuration.get(TENANT_ID_FIELD_NAME);
-            String clientId = configuration.get(CLIENT_ID_FIELD_NAME);
-            String clientSecret = configuration.get(CLIENT_SECRET_FIELD_NAME);
-            String clientCertificate = configuration.get(CLIENT_CERTIFICATE_FIELD_NAME);
-            String clientCertificatePassword = configuration.get(CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME);
-            String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
-
-            // Client builder
-            BlobServiceClientBuilder builder = new BlobServiceClientBuilder();
-            int timeout = appCtx.getExternalProperties().getAzureRequestTimeout();
-            RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null);
-            builder.retryOptions(requestRetryOptions);
-
-            // Endpoint is required
-            if (endpoint == null) {
-                throw new CompilationException(PARAMETERS_REQUIRED, ENDPOINT_FIELD_NAME);
-            }
-            builder.endpoint(endpoint);
-
-            // Shared Key
-            if (accountName != null || accountKey != null) {
-                if (accountName == null) {
-                    throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_NAME_FIELD_NAME,
-                            ACCOUNT_KEY_FIELD_NAME);
-                }
-
-                if (accountKey == null) {
-                    throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_KEY_FIELD_NAME,
-                            ACCOUNT_NAME_FIELD_NAME);
-                }
-
-                Optional<String> provided = getFirstNotNull(configuration, SHARED_ACCESS_SIGNATURE_FIELD_NAME,
-                        MANAGED_IDENTITY_ID_FIELD_NAME, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME,
-                        CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
-                if (provided.isPresent()) {
-                    throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
-                            ACCOUNT_KEY_FIELD_NAME);
-                }
-                StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
-                builder.credential(credential);
-            }
-
-            // Shared access signature
-            if (sharedAccessSignature != null) {
-                Optional<String> provided = getFirstNotNull(configuration, MANAGED_IDENTITY_ID_FIELD_NAME,
-                        CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME,
-                        CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
-                if (provided.isPresent()) {
-                    throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
-                            SHARED_ACCESS_SIGNATURE_FIELD_NAME);
-                }
-                AzureSasCredential credential = new AzureSasCredential(sharedAccessSignature);
-                builder.credential(credential);
-            }
-
-            // Managed Identity auth
-            if (managedIdentityId != null) {
-                Optional<String> provided = getFirstNotNull(configuration, CLIENT_ID_FIELD_NAME,
-                        CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME,
-                        TENANT_ID_FIELD_NAME);
-                if (provided.isPresent()) {
-                    throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
-                            MANAGED_IDENTITY_ID_FIELD_NAME);
-                }
-                builder.credential(new ManagedIdentityCredentialBuilder().clientId(managedIdentityId).build());
-            }
-
-            // Client secret & certificate auth
-            if (clientId != null) {
-                // Both (or neither) client secret and client secret were provided, only one is allowed
-                if ((clientSecret == null) == (clientCertificate == null)) {
-                    if (clientSecret != null) {
-                        throw new CompilationException(PARAMETERS_NOT_ALLOWED_AT_SAME_TIME, CLIENT_SECRET_FIELD_NAME,
-                                CLIENT_CERTIFICATE_FIELD_NAME);
-                    } else {
-                        throw new CompilationException(REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT,
-                                CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_ID_FIELD_NAME);
-                    }
-                }
-
-                // Tenant ID is required
-                if (tenantId == null) {
-                    throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, TENANT_ID_FIELD_NAME,
-                            CLIENT_ID_FIELD_NAME);
-                }
-
-                // Client certificate password is not allowed if client secret is used
-                if (clientCertificatePassword != null && clientSecret != null) {
-                    throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
-                            CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, CLIENT_SECRET_FIELD_NAME);
-                }
-
-                // Use AD authentication
-                if (clientSecret != null) {
-                    ClientSecretCredentialBuilder secret = new ClientSecretCredentialBuilder();
-                    secret.clientId(clientId);
-                    secret.tenantId(tenantId);
-                    secret.clientSecret(clientSecret);
-                    builder.credential(secret.build());
-                } else {
-                    // Certificate
-                    ClientCertificateCredentialBuilder certificate = new ClientCertificateCredentialBuilder();
-                    certificate.clientId(clientId);
-                    certificate.tenantId(tenantId);
-                    try {
-                        InputStream certificateContent = new ByteArrayInputStream(clientCertificate.getBytes(UTF_8));
-                        if (clientCertificatePassword == null) {
-                            Method pemCertificate = ClientCertificateCredentialBuilder.class
-                                    .getDeclaredMethod("pemCertificate", InputStream.class);
-                            pemCertificate.setAccessible(true);
-                            pemCertificate.invoke(certificate, certificateContent);
-                        } else {
-                            Method pemCertificate = ClientCertificateCredentialBuilder.class
-                                    .getDeclaredMethod("pfxCertificate", InputStream.class, String.class);
-                            pemCertificate.setAccessible(true);
-                            pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword);
-                        }
-                    } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
-                        throw new CompilationException(EXTERNAL_SOURCE_ERROR, ex.getMessage());
-                    }
-                    builder.credential(certificate.build());
-                }
-            }
-
-            // If client id is not present, ensure client secret, certificate, tenant id and client certificate
-            // password are not present
-            if (clientId == null) {
-                Optional<String> provided = getFirstNotNull(configuration, CLIENT_SECRET_FIELD_NAME,
-                        CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
-                if (provided.isPresent()) {
-                    throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
-                            SHARED_ACCESS_SIGNATURE_FIELD_NAME);
-                }
-            }
-
-            try {
-                return builder.buildClient();
-            } catch (Exception ex) {
-                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
-            }
-        }
-
-        /**
-         * Builds the Azure data lake storage account using the provided configuration
-         *
-         * @param configuration properties
-         * @return client
-         */
-        public static DataLakeServiceClient buildAzureDatalakeClient(IApplicationContext appCtx,
-                Map<String, String> configuration) throws CompilationException {
-            String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
-            String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
-            String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
-            String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
-            String tenantId = configuration.get(TENANT_ID_FIELD_NAME);
-            String clientId = configuration.get(CLIENT_ID_FIELD_NAME);
-            String clientSecret = configuration.get(CLIENT_SECRET_FIELD_NAME);
-            String clientCertificate = configuration.get(CLIENT_CERTIFICATE_FIELD_NAME);
-            String clientCertificatePassword = configuration.get(CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME);
-            String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
-
-            // Client builder
-            DataLakeServiceClientBuilder builder = new DataLakeServiceClientBuilder();
-            int timeout = appCtx.getExternalProperties().getAzureRequestTimeout();
-            RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null);
-            builder.retryOptions(requestRetryOptions);
-
-            // Endpoint is required
-            if (endpoint == null) {
-                throw new CompilationException(PARAMETERS_REQUIRED, ENDPOINT_FIELD_NAME);
-            }
-            builder.endpoint(endpoint);
-
-            // Shared Key
-            if (accountName != null || accountKey != null) {
-                if (accountName == null) {
-                    throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_NAME_FIELD_NAME,
-                            ACCOUNT_KEY_FIELD_NAME);
-                }
-
-                if (accountKey == null) {
-                    throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_KEY_FIELD_NAME,
-                            ACCOUNT_NAME_FIELD_NAME);
-                }
-
-                Optional<String> provided = getFirstNotNull(configuration, SHARED_ACCESS_SIGNATURE_FIELD_NAME,
-                        MANAGED_IDENTITY_ID_FIELD_NAME, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME,
-                        CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
-                if (provided.isPresent()) {
-                    throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
-                            ACCOUNT_KEY_FIELD_NAME);
-                }
-                StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
-                builder.credential(credential);
-            }
-
-            // Shared access signature
-            if (sharedAccessSignature != null) {
-                Optional<String> provided = getFirstNotNull(configuration, MANAGED_IDENTITY_ID_FIELD_NAME,
-                        CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME,
-                        CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
-                if (provided.isPresent()) {
-                    throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
-                            SHARED_ACCESS_SIGNATURE_FIELD_NAME);
-                }
-                AzureSasCredential credential = new AzureSasCredential(sharedAccessSignature);
-                builder.credential(credential);
-            }
-
-            // Managed Identity auth
-            if (managedIdentityId != null) {
-                Optional<String> provided = getFirstNotNull(configuration, CLIENT_ID_FIELD_NAME,
-                        CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME,
-                        TENANT_ID_FIELD_NAME);
-                if (provided.isPresent()) {
-                    throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
-                            MANAGED_IDENTITY_ID_FIELD_NAME);
-                }
-                builder.credential(new ManagedIdentityCredentialBuilder().clientId(managedIdentityId).build());
-            }
-
-            // Client secret & certificate auth
-            if (clientId != null) {
-                // Both (or neither) client secret and client secret were provided, only one is allowed
-                if ((clientSecret == null) == (clientCertificate == null)) {
-                    if (clientSecret != null) {
-                        throw new CompilationException(PARAMETERS_NOT_ALLOWED_AT_SAME_TIME, CLIENT_SECRET_FIELD_NAME,
-                                CLIENT_CERTIFICATE_FIELD_NAME);
-                    } else {
-                        throw new CompilationException(REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT,
-                                CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_ID_FIELD_NAME);
-                    }
-                }
-
-                // Tenant ID is required
-                if (tenantId == null) {
-                    throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, TENANT_ID_FIELD_NAME,
-                            CLIENT_ID_FIELD_NAME);
-                }
-
-                // Client certificate password is not allowed if client secret is used
-                if (clientCertificatePassword != null && clientSecret != null) {
-                    throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
-                            CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, CLIENT_SECRET_FIELD_NAME);
-                }
-
-                // Use AD authentication
-                if (clientSecret != null) {
-                    ClientSecretCredentialBuilder secret = new ClientSecretCredentialBuilder();
-                    secret.clientId(clientId);
-                    secret.tenantId(tenantId);
-                    secret.clientSecret(clientSecret);
-                    builder.credential(secret.build());
-                } else {
-                    // Certificate
-                    ClientCertificateCredentialBuilder certificate = new ClientCertificateCredentialBuilder();
-                    certificate.clientId(clientId);
-                    certificate.tenantId(tenantId);
-                    try {
-                        InputStream certificateContent = new ByteArrayInputStream(clientCertificate.getBytes(UTF_8));
-                        if (clientCertificatePassword == null) {
-                            Method pemCertificate = ClientCertificateCredentialBuilder.class
-                                    .getDeclaredMethod("pemCertificate", InputStream.class);
-                            pemCertificate.setAccessible(true);
-                            pemCertificate.invoke(certificate, certificateContent);
-                        } else {
-                            Method pemCertificate = ClientCertificateCredentialBuilder.class
-                                    .getDeclaredMethod("pfxCertificate", InputStream.class, String.class);
-                            pemCertificate.setAccessible(true);
-                            pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword);
-                        }
-                    } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
-                        throw new CompilationException(EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
-                    }
-                    builder.credential(certificate.build());
-                }
-            }
-
-            // If client id is not present, ensure client secret, certificate, tenant id and client certificate
-            // password are not present
-            if (clientId == null) {
-                Optional<String> provided = getFirstNotNull(configuration, CLIENT_SECRET_FIELD_NAME,
-                        CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
-                if (provided.isPresent()) {
-                    throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
-                            SHARED_ACCESS_SIGNATURE_FIELD_NAME);
-                }
-            }
-
-            try {
-                return builder.buildClient();
-            } catch (Exception ex) {
-                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
-            }
-        }
-
-        public static List<BlobItem> listBlobItems(BlobServiceClient blobServiceClient,
-                Map<String, String> configuration, IncludeExcludeMatcher includeExcludeMatcher,
-                IWarningCollector warningCollector) throws CompilationException {
-            String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-
-            List<BlobItem> filesOnly = new ArrayList<>();
-
-            // Ensure the validity of include/exclude
-            ExternalDataUtils.validateIncludeExclude(configuration);
-
-            BlobContainerClient blobContainer;
-            try {
-                blobContainer = blobServiceClient.getBlobContainerClient(container);
-
-                // Get all objects in a container and extract the paths to files
-                ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
-                listBlobsOptions.setPrefix(ExternalDataUtils.getPrefix(configuration));
-                Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
-
-                // Collect the paths to files only
-                collectAndFilterBlobFiles(blobItems, includeExcludeMatcher.getPredicate(),
-                        includeExcludeMatcher.getMatchersList(), filesOnly);
-
-                // Warn if no files are returned
-                if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
-                    Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-                    warningCollector.warn(warning);
-                }
-            } catch (Exception ex) {
-                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
-            }
-
-            return filesOnly;
-        }
-
-        /**
-         * Collects and filters the files only, and excludes any folders
-         *
-         * @param items     storage items
-         * @param predicate predicate to test with for file filtration
-         * @param matchers  include/exclude matchers to test against
-         * @param filesOnly List containing the files only (excluding folders)
-         */
-        private static void collectAndFilterBlobFiles(Iterable<BlobItem> items,
-                BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<BlobItem> filesOnly) {
-            for (BlobItem item : items) {
-                String uri = item.getName();
-
-                // skip folders
-                if (uri.endsWith("/")) {
-                    continue;
-                }
-
-                // No filter, add file
-                if (predicate.test(matchers, uri)) {
-                    filesOnly.add(item);
-                }
-            }
-        }
-
-        public static List<PathItem> listDatalakePathItems(DataLakeServiceClient client,
-                Map<String, String> configuration, IncludeExcludeMatcher includeExcludeMatcher,
-                IWarningCollector warningCollector) throws CompilationException {
-            String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-
-            List<PathItem> filesOnly = new ArrayList<>();
-
-            // Ensure the validity of include/exclude
-            ExternalDataUtils.validateIncludeExclude(configuration);
-
-            DataLakeFileSystemClient fileSystemClient;
-            try {
-                fileSystemClient = client.getFileSystemClient(container);
-
-                // Get all objects in a container and extract the paths to files
-                ListPathsOptions listOptions = new ListPathsOptions();
-                boolean recursive = Boolean.parseBoolean(configuration.get(RECURSIVE_FIELD_NAME));
-                listOptions.setRecursive(recursive);
-                listOptions.setPath(ExternalDataUtils.getPrefix(configuration, false));
-                PagedIterable<PathItem> pathItems = fileSystemClient.listPaths(listOptions, null);
-
-                // Collect the paths to files only
-                collectAndFilterDatalakeFiles(pathItems, includeExcludeMatcher.getPredicate(),
-                        includeExcludeMatcher.getMatchersList(), filesOnly);
-
-                // Warn if no files are returned
-                if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
-                    Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-                    warningCollector.warn(warning);
-                }
-            } catch (Exception ex) {
-                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
-            }
-
-            return filesOnly;
-        }
-
-        /**
-         * Collects and filters the files only, and excludes any folders
-         *
-         * @param items     storage items
-         * @param predicate predicate to test with for file filtration
-         * @param matchers  include/exclude matchers to test against
-         * @param filesOnly List containing the files only (excluding folders)
-         */
-        private static void collectAndFilterDatalakeFiles(Iterable<PathItem> items,
-                BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<PathItem> filesOnly) {
-            for (PathItem item : items) {
-                String uri = item.getName();
-
-                // skip folders
-                if (uri.endsWith("/")) {
-                    continue;
-                }
-
-                // No filter, add file
-                if (predicate.test(matchers, uri)) {
-                    filesOnly.add(item);
-                }
-            }
-        }
-
-        /**
-         * Validate external dataset properties
-         *
-         * @param configuration properties
-         * @throws CompilationException Compilation exception
-         */
-        public static void validateAzureBlobProperties(Map<String, String> configuration, SourceLocation srcLoc,
-                IWarningCollector collector, IApplicationContext appCtx) throws CompilationException {
-
-            // check if the format property is present
-            if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
-                throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
-            }
-
-            validateIncludeExclude(configuration);
-
-            // Check if the bucket is present
-            BlobServiceClient blobServiceClient;
-            try {
-                String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-                blobServiceClient = buildAzureBlobClient(appCtx, configuration);
-                BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container);
-
-                // Get all objects in a container and extract the paths to files
-                ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
-                listBlobsOptions.setPrefix(getPrefix(configuration));
-                Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
-
-                if (!blobItems.iterator().hasNext() && collector.shouldWarn()) {
-                    Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-                    collector.warn(warning);
-                }
-            } catch (CompilationException ex) {
-                throw ex;
-            } catch (Exception ex) {
-                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
-            }
-        }
-
-        /**
-         * Validate external dataset properties
-         *
-         * @param configuration properties
-         * @throws CompilationException Compilation exception
-         */
-        public static void validateAzureDataLakeProperties(Map<String, String> configuration, SourceLocation srcLoc,
-                IWarningCollector collector, IApplicationContext appCtx) throws CompilationException {
-
-            // check if the format property is present
-            if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
-                throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
-            }
-
-            validateIncludeExclude(configuration);
-
-            // Check if the bucket is present
-            DataLakeServiceClient dataLakeServiceClient;
-            try {
-                String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-                dataLakeServiceClient = buildAzureDatalakeClient(appCtx, configuration);
-                DataLakeFileSystemClient fileSystemClient = dataLakeServiceClient.getFileSystemClient(container);
-
-                // Get all objects in a container and extract the paths to files
-                ListPathsOptions listPathsOptions = new ListPathsOptions();
-                listPathsOptions.setPath(getPrefix(configuration));
-                Iterable<PathItem> blobItems = fileSystemClient.listPaths(listPathsOptions, null);
-
-                if (!blobItems.iterator().hasNext() && collector.shouldWarn()) {
-                    Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-                    collector.warn(warning);
-                }
-            } catch (CompilationException ex) {
-                throw ex;
-            } catch (Exception ex) {
-                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
-            }
-        }
-
-        /**
-         * Builds the Azure Blob storage client using the provided configuration
-         *
-         * @param configuration properties
-         * @see <a href="https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/azure/azure-storage">Azure
-         * Blob storage</a>
-         */
-        public static void configureAzureHdfsJobConf(JobConf conf, Map<String, String> configuration, String endPoint) {
-            String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-            String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
-            String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
-
-            //Disable caching S3 FileSystem
-            HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_AZURE_BLOB_PROTOCOL);
-
-            //Key for Hadoop configuration
-            StringBuilder hadoopKey = new StringBuilder();
-            //Value for Hadoop configuration
-            String hadoopValue;
-            if (accountKey != null || sharedAccessSignature != null) {
-                if (accountKey != null) {
-                    hadoopKey.append(HADOOP_AZURE_FS_ACCOUNT_KEY).append('.');
-                    //Set only the AccountKey
-                    hadoopValue = accountKey;
-                } else {
-                    //Use SAS for Hadoop FS as connectionString is provided
-                    hadoopKey.append(HADOOP_AZURE_FS_SAS).append('.');
-                    //Setting the container is required for SAS
-                    hadoopKey.append(container).append('.');
-                    //Set the connection string for SAS
-                    hadoopValue = sharedAccessSignature;
-                }
-                //Set the endPoint, which includes the AccountName
-                hadoopKey.append(endPoint);
-                //Tells Hadoop we are reading from Blob Storage
-                conf.set(hadoopKey.toString(), hadoopValue);
-            }
-        }
-    }
-
-    public static class GCS {
-        private GCS() {
-            throw new AssertionError("do not instantiate");
-
-        }
-
-        //TODO(htowaileb): Add validation step similar to other externals, which also checks if empty bucket
-        //upon creating the external dataset
-
-        /**
-         * Builds the client using the provided configuration
-         *
-         * @param configuration properties
-         * @return clientasterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
-         * @throws CompilationException CompilationException
-         */
-        public static Storage buildClient(Map<String, String> configuration) throws CompilationException {
-            String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
-
-            StorageOptions.Builder builder = StorageOptions.newBuilder();
-
-            // Use credentials if available
-            if (jsonCredentials != null) {
-                try (InputStream credentialsStream = new ByteArrayInputStream(jsonCredentials.getBytes())) {
-                    builder.setCredentials(ServiceAccountCredentials.fromStream(credentialsStream));
-                } catch (IOException ex) {
-                    throw new CompilationException(EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
-                }
-            }
-
-            return builder.build().getService();
-        }
-
-        /**
-         * Validate external dataset properties
-         *
-         * @param configuration properties
-         * @throws CompilationException Compilation exception
-         */
-        public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
-                IWarningCollector collector) throws CompilationException {
-
-            // check if the format property is present
-            if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
-                throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
-            }
-
-            // parquet is not supported for google cloud storage
-            if (isParquetFormat(configuration)) {
-                throw new CompilationException(INVALID_REQ_PARAM_VAL, srcLoc, KEY_FORMAT,
-                        configuration.get(KEY_FORMAT));
-            }
-
-            validateIncludeExclude(configuration);
-            String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-
-            try {
-                BlobListOption limitOption = BlobListOption.pageSize(1);
-                BlobListOption prefixOption = BlobListOption.prefix(getPrefix(configuration));
-                Storage storage = buildClient(configuration);
-                Page<Blob> items = storage.list(container, limitOption, prefixOption);
-
-                if (!items.iterateAll().iterator().hasNext() && collector.shouldWarn()) {
-                    Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-                    collector.warn(warning);
-                }
-            } catch (CompilationException ex) {
-                throw ex;
-            } catch (Exception ex) {
-                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
-            }
-        }
-    }
-
     public static int roundUpToNearestFrameSize(int size, int framesize) {
         return ((size / framesize) + 1) * framesize;
     }
@@ -1911,7 +827,7 @@
         return maxArgSz;
     }
 
-    private static Optional<String> getFirstNotNull(Map<String, String> configuration, String... parameters) {
+    public static Optional<String> getFirstNotNull(Map<String, String> configuration, String... parameters) {
         return Arrays.stream(parameters).filter(field -> configuration.get(field) != null).findFirst();
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
new file mode 100644
index 0000000..e1c10ad
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.aws.s3;
+
+public class S3Constants {
+    private S3Constants() {
+        throw new AssertionError("do not instantiate");
+    }
+
+    public static final String REGION_FIELD_NAME = "region";
+    public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId";
+    public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey";
+    public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken";
+    public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
+
+    // AWS S3 specific error codes
+    public static final String ERROR_INTERNAL_ERROR = "InternalError";
+    public static final String ERROR_SLOW_DOWN = "SlowDown";
+    public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented";
+
+    /*
+     * Hadoop-AWS
+     * AWS connectors for s3 and s3n are deprecated.
+     */
+    public static final String HADOOP_ACCESS_KEY_ID = "fs.s3a.access.key";
+    public static final String HADOOP_SECRET_ACCESS_KEY = "fs.s3a.secret.key";
+    public static final String HADOOP_SESSION_TOKEN = "fs.s3a.session.token";
+    public static final String HADOOP_REGION = "fs.s3a.region";
+    public static final String HADOOP_SERVICE_END_POINT = "fs.s3a.endpoint";
+
+    /*
+     * Internal configurations
+     */
+    //Allows accessing directories as file system path
+    public static final String HADOOP_PATH_STYLE_ACCESS = "fs.s3a.path.style.access";
+    //The number of maximum HTTP connections in connection pool
+    public static final String HADOOP_S3_CONNECTION_POOL_SIZE = "fs.s3a.connection.maximum";
+    //S3 used protocol
+    public static final String HADOOP_S3_PROTOCOL = "s3a";
+
+    //Hadoop credentials provider key
+    public static final String HADOOP_CREDENTIAL_PROVIDER_KEY = "fs.s3a.aws.credentials.provider";
+    //Anonymous credential provider
+    public static final String HADOOP_ANONYMOUS_ACCESS = "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider";
+    //Temporary credential provider
+    public static final String HADOOP_TEMP_ACCESS = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider";
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
new file mode 100644
index 0000000..a88d59b
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.aws.s3;
+
+import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
+import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
+import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.*;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiPredicate;
+import java.util.regex.Matcher;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.util.CleanupUtils;
+
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.model.S3Response;
+
+public class S3Utils {
+    private S3Utils() {
+        throw new AssertionError("do not instantiate");
+    }
+
+    public static boolean isRetryableError(String errorCode) {
+        return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
+    }
+
+    /**
+     * Builds the S3 client using the provided configuration
+     *
+     * @param configuration properties
+     * @return S3 client
+     * @throws CompilationException CompilationException
+     */
+    public static S3Client buildAwsS3Client(Map<String, String> configuration) throws CompilationException {
+        // TODO(Hussain): Need to ensure that all required parameters are present in a previous step
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+        String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
+        String regionId = configuration.get(REGION_FIELD_NAME);
+        String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
+
+        S3ClientBuilder builder = S3Client.builder();
+
+        // Credentials
+        AwsCredentialsProvider credentialsProvider;
+
+        // No auth required
+        if (accessKeyId == null) {
+            credentialsProvider = AnonymousCredentialsProvider.create();
+        } else {
+            // auth required, check for temporary or permanent credentials
+            if (sessionToken != null) {
+                credentialsProvider = StaticCredentialsProvider
+                        .create(AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
+            } else {
+                credentialsProvider =
+                        StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey));
+            }
+        }
+
+        builder.credentialsProvider(credentialsProvider);
+
+        // Validate the region
+        List<Region> regions = S3Client.serviceMetadata().regions();
+        Optional<Region> selectedRegion = regions.stream().filter(region -> region.id().equals(regionId)).findFirst();
+
+        if (selectedRegion.isEmpty()) {
+            throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
+        }
+        builder.region(selectedRegion.get());
+
+        // Validate the service endpoint if present
+        if (serviceEndpoint != null) {
+            try {
+                URI uri = new URI(serviceEndpoint);
+                try {
+                    builder.endpointOverride(uri);
+                } catch (NullPointerException ex) {
+                    throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+                }
+            } catch (URISyntaxException ex) {
+                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
+                        String.format("Invalid service endpoint %s", serviceEndpoint));
+            }
+        }
+
+        return builder.build();
+    }
+
+    /**
+     * Builds the S3 client using the provided configuration
+     *
+     * @param configuration      properties
+     * @param numberOfPartitions number of partitions in the cluster
+     */
+    public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
+            int numberOfPartitions) {
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+        String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
+        String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
+
+        //Disable caching S3 FileSystem
+        HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
+
+        /*
+         * Authentication Methods:
+         * 1- Anonymous: no accessKeyId and no secretAccessKey
+         * 2- Temporary: has to provide accessKeyId, secretAccessKey and sessionToken
+         * 3- Private: has to provide accessKeyId and secretAccessKey
+         */
+        if (accessKeyId == null) {
+            //Tells hadoop-aws it is an anonymous access
+            conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS);
+        } else {
+            conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId);
+            conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey);
+            if (sessionToken != null) {
+                conf.set(HADOOP_SESSION_TOKEN, sessionToken);
+                //Tells hadoop-aws it is a temporary access
+                conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS);
+            }
+        }
+
+        /*
+         * This is to allow S3 definition to have path-style form. Should always be true to match the current
+         * way we access files in S3
+         */
+        conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
+
+        /*
+         * Set the size of S3 connection pool to be the number of partitions
+         */
+        conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
+
+        if (serviceEndpoint != null) {
+            // Validation of the URL should be done at hadoop-aws level
+            conf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
+        } else {
+            //Region is ignored and buckets could be found by the central endpoint
+            conf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
+        }
+    }
+
+    /**
+     * Validate external dataset properties
+     *
+     * @param configuration properties
+     * @throws CompilationException Compilation exception
+     */
+    public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
+            IWarningCollector collector) throws CompilationException {
+
+        // check if the format property is present
+        if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
+        }
+
+        // Both parameters should be passed, or neither should be passed (for anonymous/no auth)
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+        if (accessKeyId == null || secretAccessKey == null) {
+            // If one is passed, the other is required
+            if (accessKeyId != null) {
+                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
+                        ACCESS_KEY_ID_FIELD_NAME);
+            } else if (secretAccessKey != null) {
+                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
+                        SECRET_ACCESS_KEY_FIELD_NAME);
+            }
+        }
+
+        validateIncludeExclude(configuration);
+
+        // Check if the bucket is present
+        S3Client s3Client = buildAwsS3Client(configuration);
+        S3Response response;
+        boolean useOldApi = false;
+        String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+        String prefix = getPrefix(configuration);
+
+        try {
+            response = isBucketEmpty(s3Client, container, prefix, false);
+        } catch (S3Exception ex) {
+            // Method not implemented, try falling back to old API
+            try {
+                // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
+                if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
+                    useOldApi = true;
+                    response = isBucketEmpty(s3Client, container, prefix, true);
+                } else {
+                    throw ex;
+                }
+            } catch (SdkException ex2) {
+                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+            }
+        } catch (SdkException ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+        } finally {
+            if (s3Client != null) {
+                CleanupUtils.close(s3Client, null);
+            }
+        }
+
+        boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty()
+                : ((ListObjectsV2Response) response).contents().isEmpty();
+        if (isEmpty && collector.shouldWarn()) {
+            Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+            collector.warn(warning);
+        }
+
+        // Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to
+        // ensure coverage, check if the result is successful as well and not only catch exceptions
+        if (!response.sdkHttpResponse().isSuccessful()) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
+        }
+    }
+
+    /**
+     * Checks for a single object in the specified bucket to determine if the bucket is empty or not.
+     *
+     * @param s3Client  s3 client
+     * @param container the container name
+     * @param prefix    Prefix to be used
+     * @param useOldApi flag whether to use the old API or not
+     * @return returns the S3 response
+     */
+    private static S3Response isBucketEmpty(S3Client s3Client, String container, String prefix, boolean useOldApi) {
+        S3Response response;
+        if (useOldApi) {
+            ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder();
+            listObjectsBuilder.prefix(prefix);
+            response = s3Client.listObjects(listObjectsBuilder.bucket(container).maxKeys(1).build());
+        } else {
+            ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder();
+            listObjectsBuilder.prefix(prefix);
+            response = s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build());
+        }
+        return response;
+    }
+
+    /**
+     * Returns the lists of S3 objects.
+     *
+     * @param configuration         properties
+     * @param includeExcludeMatcher include/exclude matchers to apply
+     */
+    public static List<S3Object> listS3Objects(Map<String, String> configuration,
+            AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
+            IWarningCollector warningCollector) throws CompilationException {
+        // Prepare to retrieve the objects
+        List<S3Object> filesOnly;
+        String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+        S3Client s3Client = buildAwsS3Client(configuration);
+        String prefix = getPrefix(configuration);
+
+        try {
+            filesOnly = listS3Objects(s3Client, container, prefix, includeExcludeMatcher);
+        } catch (S3Exception ex) {
+            // New API is not implemented, try falling back to old API
+            try {
+                // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
+                if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
+                    filesOnly = oldApiListS3Objects(s3Client, container, prefix, includeExcludeMatcher);
+                } else {
+                    throw ex;
+                }
+            } catch (SdkException ex2) {
+                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+            }
+        } catch (SdkException ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+        } finally {
+            if (s3Client != null) {
+                CleanupUtils.close(s3Client, null);
+            }
+        }
+
+        // Warn if no files are returned
+        if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
+            Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+            warningCollector.warn(warning);
+        }
+
+        return filesOnly;
+    }
+
+    /**
+     * Uses the latest API to retrieve the objects from the storage.
+     *
+     * @param s3Client              S3 client
+     * @param container             container name
+     * @param prefix                definition prefix
+     * @param includeExcludeMatcher include/exclude matchers to apply
+     */
+    private static List<S3Object> listS3Objects(S3Client s3Client, String container, String prefix,
+            AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher) {
+        String newMarker = null;
+        List<S3Object> filesOnly = new ArrayList<>();
+
+        ListObjectsV2Response listObjectsResponse;
+        ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container);
+        listObjectsBuilder.prefix(prefix);
+
+        while (true) {
+            // List the objects from the start, or from the last marker in case of truncated result
+            if (newMarker == null) {
+                listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build());
+            } else {
+                listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
+            }
+
+            // Collect the paths to files only
+            collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
+                    includeExcludeMatcher.getMatchersList(), filesOnly);
+
+            // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
+            if (!listObjectsResponse.isTruncated()) {
+                break;
+            } else {
+                newMarker = listObjectsResponse.nextContinuationToken();
+            }
+        }
+
+        return filesOnly;
+    }
+
+    /**
+     * Uses the old API (in case the new API is not implemented) to retrieve the objects from the storage
+     *
+     * @param s3Client              S3 client
+     * @param container             container name
+     * @param prefix                definition prefix
+     * @param includeExcludeMatcher include/exclude matchers to apply
+     */
+    private static List<S3Object> oldApiListS3Objects(S3Client s3Client, String container, String prefix,
+            AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher) {
+        String newMarker = null;
+        List<S3Object> filesOnly = new ArrayList<>();
+
+        ListObjectsResponse listObjectsResponse;
+        ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container);
+        listObjectsBuilder.prefix(prefix);
+
+        while (true) {
+            // List the objects from the start, or from the last marker in case of truncated result
+            if (newMarker == null) {
+                listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build());
+            } else {
+                listObjectsResponse = s3Client.listObjects(listObjectsBuilder.marker(newMarker).build());
+            }
+
+            // Collect the paths to files only
+            collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
+                    includeExcludeMatcher.getMatchersList(), filesOnly);
+
+            // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
+            if (!listObjectsResponse.isTruncated()) {
+                break;
+            } else {
+                newMarker = listObjectsResponse.nextMarker();
+            }
+        }
+
+        return filesOnly;
+    }
+
+    /**
+     * AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered
+     * a file if it does not end up with a "/" which is the separator in a folder structure.
+     *
+     * @param s3Objects List of returned objects
+     */
+    private static void collectAndFilterFiles(List<S3Object> s3Objects, BiPredicate<List<Matcher>, String> predicate,
+            List<Matcher> matchers, List<S3Object> filesOnly) {
+        for (S3Object object : s3Objects) {
+            // skip folders
+            if (object.key().endsWith("/")) {
+                continue;
+            }
+
+            // No filter, add file
+            if (predicate.test(matchers, object.key())) {
+                filesOnly.add(object);
+            }
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureConstants.java
new file mode 100644
index 0000000..9ade27b
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureConstants.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.azure.blob_storage;
+
+/*
+ * Note: Azure Blob and Azure Datalake use identical authentication, so they are using the same properties.
+ * If they end up diverging, then properties for AzureBlob and AzureDataLake need to be created.
+ */
+public class AzureConstants {
+    private AzureConstants() {
+        throw new AssertionError("do not instantiate");
+    }
+
+    /*
+     * Asterix Configuration Keys
+     */
+    public static final String MANAGED_IDENTITY_ID_FIELD_NAME = "managedIdentityId";
+    public static final String ACCOUNT_NAME_FIELD_NAME = "accountName";
+    public static final String ACCOUNT_KEY_FIELD_NAME = "accountKey";
+    public static final String SHARED_ACCESS_SIGNATURE_FIELD_NAME = "sharedAccessSignature";
+    public static final String TENANT_ID_FIELD_NAME = "tenantId";
+    public static final String CLIENT_ID_FIELD_NAME = "clientId";
+    public static final String CLIENT_SECRET_FIELD_NAME = "clientSecret";
+    public static final String CLIENT_CERTIFICATE_FIELD_NAME = "clientCertificate";
+    public static final String CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME = "clientCertificatePassword";
+    public static final String ENDPOINT_FIELD_NAME = "endpoint";
+
+    // Specific Azure data lake property
+    /*
+    The behavior of Data Lake (true file system) is to read the files of the specified prefix only, example:
+    storage/myData/personal/file1.json
+    storage/myData/personal/file2.json
+    storage/myData/file3.json
+    If the prefix used is "myData", then only the file file3.json is read. However, if the property "recursive"
+    is set to "true" when creating the external dataset, then it goes recursively overall the paths, and the result
+    is file1.json, file2.json and file3.json.
+     */
+    public static final String RECURSIVE_FIELD_NAME = "recursive";
+
+    /*
+     * Hadoop-Azure
+     */
+    //Used when accountName and accessKey are provided
+    public static final String HADOOP_AZURE_FS_ACCOUNT_KEY = "fs.azure.account.key";
+    //Used when a connectionString is provided
+    public static final String HADOOP_AZURE_FS_SAS = "fs.azure.sas";
+    public static final String HADOOP_AZURE_BLOB_PROTOCOL = "wasbs";
+    public static final String HADOOP_AZURE_DATALAKE_PROTOCOL = "abfss";
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureUtils.java
new file mode 100644
index 0000000..0dc9ad2
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureUtils.java
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.azure.blob_storage;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
+import static org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_NOT_ALLOWED_AT_SAME_TIME;
+import static org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_REQUIRED;
+import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
+import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
+import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT;
+import static org.apache.asterix.external.util.ExternalDataUtils.getFirstNotNull;
+import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.ACCOUNT_KEY_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.ACCOUNT_NAME_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.CLIENT_CERTIFICATE_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.CLIENT_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.CLIENT_SECRET_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.ENDPOINT_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_BLOB_PROTOCOL;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_FS_ACCOUNT_KEY;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_FS_SAS;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.MANAGED_IDENTITY_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.RECURSIVE_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.SHARED_ACCESS_SIGNATURE_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.TENANT_ID_FIELD_NAME;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiPredicate;
+import java.util.regex.Matcher;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+
+import com.azure.core.credential.AzureSasCredential;
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.identity.ClientCertificateCredentialBuilder;
+import com.azure.identity.ClientSecretCredentialBuilder;
+import com.azure.identity.ManagedIdentityCredentialBuilder;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.common.policy.RequestRetryOptions;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import com.azure.storage.file.datalake.models.ListPathsOptions;
+import com.azure.storage.file.datalake.models.PathItem;
+
+public class AzureUtils {
+    private AzureUtils() {
+        throw new AssertionError("do not instantiate");
+    }
+
+    /**
+     * Builds the Azure storage account using the provided configuration
+     *
+     * @param configuration properties
+     * @return client
+     */
+    public static BlobServiceClient buildAzureBlobClient(IApplicationContext appCtx, Map<String, String> configuration)
+            throws CompilationException {
+        String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
+        String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
+        String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
+        String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+        String tenantId = configuration.get(TENANT_ID_FIELD_NAME);
+        String clientId = configuration.get(CLIENT_ID_FIELD_NAME);
+        String clientSecret = configuration.get(CLIENT_SECRET_FIELD_NAME);
+        String clientCertificate = configuration.get(CLIENT_CERTIFICATE_FIELD_NAME);
+        String clientCertificatePassword = configuration.get(CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME);
+        String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
+
+        // Client builder
+        BlobServiceClientBuilder builder = new BlobServiceClientBuilder();
+        int timeout = appCtx.getExternalProperties().getAzureRequestTimeout();
+        RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null);
+        builder.retryOptions(requestRetryOptions);
+
+        // Endpoint is required
+        if (endpoint == null) {
+            throw new CompilationException(PARAMETERS_REQUIRED, ENDPOINT_FIELD_NAME);
+        }
+        builder.endpoint(endpoint);
+
+        // Shared Key
+        if (accountName != null || accountKey != null) {
+            if (accountName == null) {
+                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_NAME_FIELD_NAME,
+                        ACCOUNT_KEY_FIELD_NAME);
+            }
+
+            if (accountKey == null) {
+                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_KEY_FIELD_NAME,
+                        ACCOUNT_NAME_FIELD_NAME);
+            }
+
+            Optional<String> provided = getFirstNotNull(configuration, SHARED_ACCESS_SIGNATURE_FIELD_NAME,
+                    MANAGED_IDENTITY_ID_FIELD_NAME, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME,
+                    CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+            if (provided.isPresent()) {
+                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+                        ACCOUNT_KEY_FIELD_NAME);
+            }
+            StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
+            builder.credential(credential);
+        }
+
+        // Shared access signature
+        if (sharedAccessSignature != null) {
+            Optional<String> provided = getFirstNotNull(configuration, MANAGED_IDENTITY_ID_FIELD_NAME,
+                    CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME,
+                    CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+            if (provided.isPresent()) {
+                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+                        SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+            }
+            AzureSasCredential credential = new AzureSasCredential(sharedAccessSignature);
+            builder.credential(credential);
+        }
+
+        // Managed Identity auth
+        if (managedIdentityId != null) {
+            Optional<String> provided = getFirstNotNull(configuration, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME,
+                    CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+            if (provided.isPresent()) {
+                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+                        MANAGED_IDENTITY_ID_FIELD_NAME);
+            }
+            builder.credential(new ManagedIdentityCredentialBuilder().clientId(managedIdentityId).build());
+        }
+
+        // Client secret & certificate auth
+        if (clientId != null) {
+            // Both (or neither) client secret and client secret were provided, only one is allowed
+            if ((clientSecret == null) == (clientCertificate == null)) {
+                if (clientSecret != null) {
+                    throw new CompilationException(PARAMETERS_NOT_ALLOWED_AT_SAME_TIME, CLIENT_SECRET_FIELD_NAME,
+                            CLIENT_CERTIFICATE_FIELD_NAME);
+                } else {
+                    throw new CompilationException(REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT,
+                            CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_ID_FIELD_NAME);
+                }
+            }
+
+            // Tenant ID is required
+            if (tenantId == null) {
+                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, TENANT_ID_FIELD_NAME,
+                        CLIENT_ID_FIELD_NAME);
+            }
+
+            // Client certificate password is not allowed if client secret is used
+            if (clientCertificatePassword != null && clientSecret != null) {
+                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
+                        CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, CLIENT_SECRET_FIELD_NAME);
+            }
+
+            // Use AD authentication
+            if (clientSecret != null) {
+                ClientSecretCredentialBuilder secret = new ClientSecretCredentialBuilder();
+                secret.clientId(clientId);
+                secret.tenantId(tenantId);
+                secret.clientSecret(clientSecret);
+                builder.credential(secret.build());
+            } else {
+                // Certificate
+                ClientCertificateCredentialBuilder certificate = new ClientCertificateCredentialBuilder();
+                certificate.clientId(clientId);
+                certificate.tenantId(tenantId);
+                try {
+                    InputStream certificateContent = new ByteArrayInputStream(clientCertificate.getBytes(UTF_8));
+                    if (clientCertificatePassword == null) {
+                        Method pemCertificate = ClientCertificateCredentialBuilder.class
+                                .getDeclaredMethod("pemCertificate", InputStream.class);
+                        pemCertificate.setAccessible(true);
+                        pemCertificate.invoke(certificate, certificateContent);
+                    } else {
+                        Method pemCertificate = ClientCertificateCredentialBuilder.class
+                                .getDeclaredMethod("pfxCertificate", InputStream.class, String.class);
+                        pemCertificate.setAccessible(true);
+                        pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword);
+                    }
+                } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
+                    throw new CompilationException(EXTERNAL_SOURCE_ERROR, ex.getMessage());
+                }
+                builder.credential(certificate.build());
+            }
+        }
+
+        // If client id is not present, ensure client secret, certificate, tenant id and client certificate
+        // password are not present
+        if (clientId == null) {
+            Optional<String> provided = getFirstNotNull(configuration, CLIENT_SECRET_FIELD_NAME,
+                    CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+            if (provided.isPresent()) {
+                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+                        SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+            }
+        }
+
+        try {
+            return builder.buildClient();
+        } catch (Exception ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+        }
+    }
+
+    /**
+     * Builds the Azure data lake storage account using the provided configuration
+     *
+     * @param configuration properties
+     * @return client
+     */
+    public static DataLakeServiceClient buildAzureDatalakeClient(IApplicationContext appCtx,
+            Map<String, String> configuration) throws CompilationException {
+        String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
+        String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
+        String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
+        String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+        String tenantId = configuration.get(TENANT_ID_FIELD_NAME);
+        String clientId = configuration.get(CLIENT_ID_FIELD_NAME);
+        String clientSecret = configuration.get(CLIENT_SECRET_FIELD_NAME);
+        String clientCertificate = configuration.get(CLIENT_CERTIFICATE_FIELD_NAME);
+        String clientCertificatePassword = configuration.get(CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME);
+        String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
+
+        // Client builder
+        DataLakeServiceClientBuilder builder = new DataLakeServiceClientBuilder();
+        int timeout = appCtx.getExternalProperties().getAzureRequestTimeout();
+        RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null);
+        builder.retryOptions(requestRetryOptions);
+
+        // Endpoint is required
+        if (endpoint == null) {
+            throw new CompilationException(PARAMETERS_REQUIRED, ENDPOINT_FIELD_NAME);
+        }
+        builder.endpoint(endpoint);
+
+        // Shared Key
+        if (accountName != null || accountKey != null) {
+            if (accountName == null) {
+                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_NAME_FIELD_NAME,
+                        ACCOUNT_KEY_FIELD_NAME);
+            }
+
+            if (accountKey == null) {
+                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_KEY_FIELD_NAME,
+                        ACCOUNT_NAME_FIELD_NAME);
+            }
+
+            Optional<String> provided = getFirstNotNull(configuration, SHARED_ACCESS_SIGNATURE_FIELD_NAME,
+                    MANAGED_IDENTITY_ID_FIELD_NAME, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME,
+                    CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+            if (provided.isPresent()) {
+                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+                        ACCOUNT_KEY_FIELD_NAME);
+            }
+            StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
+            builder.credential(credential);
+        }
+
+        // Shared access signature
+        if (sharedAccessSignature != null) {
+            Optional<String> provided = getFirstNotNull(configuration, MANAGED_IDENTITY_ID_FIELD_NAME,
+                    CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME,
+                    CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+            if (provided.isPresent()) {
+                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+                        SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+            }
+            AzureSasCredential credential = new AzureSasCredential(sharedAccessSignature);
+            builder.credential(credential);
+        }
+
+        // Managed Identity auth
+        if (managedIdentityId != null) {
+            Optional<String> provided = getFirstNotNull(configuration, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME,
+                    CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+            if (provided.isPresent()) {
+                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+                        MANAGED_IDENTITY_ID_FIELD_NAME);
+            }
+            builder.credential(new ManagedIdentityCredentialBuilder().clientId(managedIdentityId).build());
+        }
+
+        // Client secret & certificate auth
+        if (clientId != null) {
+            // Both (or neither) client secret and client secret were provided, only one is allowed
+            if ((clientSecret == null) == (clientCertificate == null)) {
+                if (clientSecret != null) {
+                    throw new CompilationException(PARAMETERS_NOT_ALLOWED_AT_SAME_TIME, CLIENT_SECRET_FIELD_NAME,
+                            CLIENT_CERTIFICATE_FIELD_NAME);
+                } else {
+                    throw new CompilationException(REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT,
+                            CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_ID_FIELD_NAME);
+                }
+            }
+
+            // Tenant ID is required
+            if (tenantId == null) {
+                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, TENANT_ID_FIELD_NAME,
+                        CLIENT_ID_FIELD_NAME);
+            }
+
+            // Client certificate password is not allowed if client secret is used
+            if (clientCertificatePassword != null && clientSecret != null) {
+                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
+                        CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, CLIENT_SECRET_FIELD_NAME);
+            }
+
+            // Use AD authentication
+            if (clientSecret != null) {
+                ClientSecretCredentialBuilder secret = new ClientSecretCredentialBuilder();
+                secret.clientId(clientId);
+                secret.tenantId(tenantId);
+                secret.clientSecret(clientSecret);
+                builder.credential(secret.build());
+            } else {
+                // Certificate
+                ClientCertificateCredentialBuilder certificate = new ClientCertificateCredentialBuilder();
+                certificate.clientId(clientId);
+                certificate.tenantId(tenantId);
+                try {
+                    InputStream certificateContent = new ByteArrayInputStream(clientCertificate.getBytes(UTF_8));
+                    if (clientCertificatePassword == null) {
+                        Method pemCertificate = ClientCertificateCredentialBuilder.class
+                                .getDeclaredMethod("pemCertificate", InputStream.class);
+                        pemCertificate.setAccessible(true);
+                        pemCertificate.invoke(certificate, certificateContent);
+                    } else {
+                        Method pemCertificate = ClientCertificateCredentialBuilder.class
+                                .getDeclaredMethod("pfxCertificate", InputStream.class, String.class);
+                        pemCertificate.setAccessible(true);
+                        pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword);
+                    }
+                } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
+                    throw new CompilationException(EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+                }
+                builder.credential(certificate.build());
+            }
+        }
+
+        // If client id is not present, ensure client secret, certificate, tenant id and client certificate
+        // password are not present
+        if (clientId == null) {
+            Optional<String> provided = getFirstNotNull(configuration, CLIENT_SECRET_FIELD_NAME,
+                    CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+            if (provided.isPresent()) {
+                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+                        SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+            }
+        }
+
+        try {
+            return builder.buildClient();
+        } catch (Exception ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+        }
+    }
+
+    public static List<BlobItem> listBlobItems(BlobServiceClient blobServiceClient, Map<String, String> configuration,
+            AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
+            IWarningCollector warningCollector) throws CompilationException {
+        String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+        List<BlobItem> filesOnly = new ArrayList<>();
+
+        // Ensure the validity of include/exclude
+        validateIncludeExclude(configuration);
+
+        BlobContainerClient blobContainer;
+        try {
+            blobContainer = blobServiceClient.getBlobContainerClient(container);
+
+            // Get all objects in a container and extract the paths to files
+            ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
+            listBlobsOptions.setPrefix(getPrefix(configuration));
+            Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
+
+            // Collect the paths to files only
+            collectAndFilterBlobFiles(blobItems, includeExcludeMatcher.getPredicate(),
+                    includeExcludeMatcher.getMatchersList(), filesOnly);
+
+            // Warn if no files are returned
+            if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
+                Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+                warningCollector.warn(warning);
+            }
+        } catch (Exception ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+        }
+
+        return filesOnly;
+    }
+
+    /**
+     * Collects and filters the files only, and excludes any folders
+     *
+     * @param items     storage items
+     * @param predicate predicate to test with for file filtration
+     * @param matchers  include/exclude matchers to test against
+     * @param filesOnly List containing the files only (excluding folders)
+     */
+    private static void collectAndFilterBlobFiles(Iterable<BlobItem> items,
+            BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<BlobItem> filesOnly) {
+        for (BlobItem item : items) {
+            String uri = item.getName();
+
+            // skip folders
+            if (uri.endsWith("/")) {
+                continue;
+            }
+
+            // No filter, add file
+            if (predicate.test(matchers, uri)) {
+                filesOnly.add(item);
+            }
+        }
+    }
+
+    public static List<PathItem> listDatalakePathItems(DataLakeServiceClient client, Map<String, String> configuration,
+            AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
+            IWarningCollector warningCollector) throws CompilationException {
+        String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+        List<PathItem> filesOnly = new ArrayList<>();
+
+        // Ensure the validity of include/exclude
+        validateIncludeExclude(configuration);
+
+        DataLakeFileSystemClient fileSystemClient;
+        try {
+            fileSystemClient = client.getFileSystemClient(container);
+
+            // Get all objects in a container and extract the paths to files
+            ListPathsOptions listOptions = new ListPathsOptions();
+            boolean recursive = Boolean.parseBoolean(configuration.get(RECURSIVE_FIELD_NAME));
+            listOptions.setRecursive(recursive);
+            listOptions.setPath(getPrefix(configuration, false));
+            PagedIterable<PathItem> pathItems = fileSystemClient.listPaths(listOptions, null);
+
+            // Collect the paths to files only
+            collectAndFilterDatalakeFiles(pathItems, includeExcludeMatcher.getPredicate(),
+                    includeExcludeMatcher.getMatchersList(), filesOnly);
+
+            // Warn if no files are returned
+            if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
+                Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+                warningCollector.warn(warning);
+            }
+        } catch (Exception ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+        }
+
+        return filesOnly;
+    }
+
+    /**
+     * Collects and filters the files only, and excludes any folders
+     *
+     * @param items     storage items
+     * @param predicate predicate to test with for file filtration
+     * @param matchers  include/exclude matchers to test against
+     * @param filesOnly List containing the files only (excluding folders)
+     */
+    private static void collectAndFilterDatalakeFiles(Iterable<PathItem> items,
+            BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<PathItem> filesOnly) {
+        for (PathItem item : items) {
+            String uri = item.getName();
+
+            // skip folders
+            if (uri.endsWith("/")) {
+                continue;
+            }
+
+            // No filter, add file
+            if (predicate.test(matchers, uri)) {
+                filesOnly.add(item);
+            }
+        }
+    }
+
+    /**
+     * Validate external dataset properties
+     *
+     * @param configuration properties
+     * @throws CompilationException Compilation exception
+     */
+    public static void validateAzureBlobProperties(Map<String, String> configuration, SourceLocation srcLoc,
+            IWarningCollector collector, IApplicationContext appCtx) throws CompilationException {
+
+        // check if the format property is present
+        if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
+        }
+
+        validateIncludeExclude(configuration);
+
+        // Check if the bucket is present
+        BlobServiceClient blobServiceClient;
+        try {
+            String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+            blobServiceClient = buildAzureBlobClient(appCtx, configuration);
+            BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container);
+
+            // Get all objects in a container and extract the paths to files
+            ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
+            listBlobsOptions.setPrefix(getPrefix(configuration));
+            Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
+
+            if (!blobItems.iterator().hasNext() && collector.shouldWarn()) {
+                Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+                collector.warn(warning);
+            }
+        } catch (CompilationException ex) {
+            throw ex;
+        } catch (Exception ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+        }
+    }
+
+    /**
+     * Validate external dataset properties
+     *
+     * @param configuration properties
+     * @throws CompilationException Compilation exception
+     */
+    public static void validateAzureDataLakeProperties(Map<String, String> configuration, SourceLocation srcLoc,
+            IWarningCollector collector, IApplicationContext appCtx) throws CompilationException {
+
+        // check if the format property is present
+        if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
+        }
+
+        validateIncludeExclude(configuration);
+
+        // Check if the bucket is present
+        DataLakeServiceClient dataLakeServiceClient;
+        try {
+            String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+            dataLakeServiceClient = buildAzureDatalakeClient(appCtx, configuration);
+            DataLakeFileSystemClient fileSystemClient = dataLakeServiceClient.getFileSystemClient(container);
+
+            // Get all objects in a container and extract the paths to files
+            ListPathsOptions listPathsOptions = new ListPathsOptions();
+            listPathsOptions.setPath(getPrefix(configuration));
+            Iterable<PathItem> blobItems = fileSystemClient.listPaths(listPathsOptions, null);
+
+            if (!blobItems.iterator().hasNext() && collector.shouldWarn()) {
+                Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+                collector.warn(warning);
+            }
+        } catch (CompilationException ex) {
+            throw ex;
+        } catch (Exception ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+        }
+    }
+
+    /**
+     * Builds the Azure Blob storage client using the provided configuration
+     *
+     * @param configuration properties
+     * @see <a href="https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/azure/azure-storage">Azure
+     * Blob storage</a>
+     */
+    public static void configureAzureHdfsJobConf(JobConf conf, Map<String, String> configuration, String endPoint) {
+        String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+        String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
+        String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+
+        //Disable caching S3 FileSystem
+        HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_AZURE_BLOB_PROTOCOL);
+
+        //Key for Hadoop configuration
+        StringBuilder hadoopKey = new StringBuilder();
+        //Value for Hadoop configuration
+        String hadoopValue;
+        if (accountKey != null || sharedAccessSignature != null) {
+            if (accountKey != null) {
+                hadoopKey.append(HADOOP_AZURE_FS_ACCOUNT_KEY).append('.');
+                //Set only the AccountKey
+                hadoopValue = accountKey;
+            } else {
+                //Use SAS for Hadoop FS as connectionString is provided
+                hadoopKey.append(HADOOP_AZURE_FS_SAS).append('.');
+                //Setting the container is required for SAS
+                hadoopKey.append(container).append('.');
+                //Set the connection string for SAS
+                hadoopValue = sharedAccessSignature;
+            }
+            //Set the endPoint, which includes the AccountName
+            hadoopKey.append(endPoint);
+            //Tells Hadoop we are reading from Blob Storage
+            conf.set(hadoopKey.toString(), hadoopValue);
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
new file mode 100644
index 0000000..8a0be99
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.google.gcs;
+
+public class GCSConstants {
+    private GCSConstants() {
+        throw new AssertionError("do not instantiate");
+    }
+
+    public static final String JSON_CREDENTIALS_FIELD_NAME = "jsonCredentials";
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
new file mode 100644
index 0000000..553733f
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.google.gcs;
+
+import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
+import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL;
+import static org.apache.asterix.external.util.ExternalDataConstants.KEY_FORMAT;
+import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.isParquetFormat;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.JSON_CREDENTIALS_FIELD_NAME;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+
+import com.google.api.gax.paging.Page;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+
+public class GCSUtils {
+    private GCSUtils() {
+        throw new AssertionError("do not instantiate");
+
+    }
+
+    //TODO(htowaileb): Add validation step similar to other externals, which also checks if empty bucket
+    //upon creating the external dataset
+
+    /**
+     * Builds the client using the provided configuration
+     *
+     * @param configuration properties
+     * @return clientasterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+     * @throws CompilationException CompilationException
+     */
+    public static Storage buildClient(Map<String, String> configuration) throws CompilationException {
+        String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
+
+        StorageOptions.Builder builder = StorageOptions.newBuilder();
+
+        // Use credentials if available
+        if (jsonCredentials != null) {
+            try (InputStream credentialsStream = new ByteArrayInputStream(jsonCredentials.getBytes())) {
+                builder.setCredentials(ServiceAccountCredentials.fromStream(credentialsStream));
+            } catch (IOException ex) {
+                throw new CompilationException(EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+            }
+        }
+
+        return builder.build().getService();
+    }
+
+    /**
+     * Validate external dataset properties
+     *
+     * @param configuration properties
+     * @throws CompilationException Compilation exception
+     */
+    public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
+            IWarningCollector collector) throws CompilationException {
+
+        // check if the format property is present
+        if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
+        }
+
+        // parquet is not supported for google cloud storage
+        if (isParquetFormat(configuration)) {
+            throw new CompilationException(INVALID_REQ_PARAM_VAL, srcLoc, KEY_FORMAT, configuration.get(KEY_FORMAT));
+        }
+
+        validateIncludeExclude(configuration);
+        String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+        try {
+            Storage.BlobListOption limitOption = Storage.BlobListOption.pageSize(1);
+            Storage.BlobListOption prefixOption = Storage.BlobListOption.prefix(getPrefix(configuration));
+            Storage storage = buildClient(configuration);
+            Page<Blob> items = storage.list(container, limitOption, prefixOption);
+
+            if (!items.iterateAll().iterator().hasNext() && collector.shouldWarn()) {
+                Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+                collector.warn(warning);
+            }
+        } catch (CompilationException ex) {
+            throw ex;
+        } catch (Exception ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
index 90ea04b..91afbd8 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.external.input.record.reader.awss3;
 
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_INTERNAL_ERROR;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_SLOW_DOWN;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_SLOW_DOWN;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;