[ASTERIXDB-3565][EXT]: Add impersonate service account auth for GCS

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Add support to impersonating a service account as
an authentication method for GCS links. This works
by providing a target service account to impersonate
and provide source credentials (ours) to use to
impersonate the account. There is no need to store
any temporary credentials/tokens as the SDK automatically
picks up the token generated, and if expired, automatically
refresh it for subsequent requests.

Ext-ref: MB-65121
Change-Id: Ie1d69faa45a03550c8e0fe66eb18c2ae53a8454a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19444
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
index 4c382d0..10f536b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
@@ -78,14 +78,15 @@
 
         String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
         if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(type)) {
-            credentials = generateAwsCredentials(configuration);
+            return generateAwsCredentials(configuration);
+        } else {
+            // this should never happen
+            throw new IllegalArgumentException("Unsupported external source type: " + type);
         }
-
-        return credentials;
     }
 
     // TODO: this can probably be refactored out into something that is AWS-specific
-    private Object generateAwsCredentials(Map<String, String> configuration)
+    private AwsSessionCredentials generateAwsCredentials(Map<String, String> configuration)
             throws HyracksDataException, CompilationException {
         String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
         AwsSessionCredentials credentials;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 3ffb080..c6d7ba9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -61,6 +61,7 @@
     "compiler\.textsearchmemory" : 163840,
     "compiler\.windowmemory" : 196608,
     "default\.dir" : "target/io/dir/asterixdb",
+    "gcp.impersonate.service.account.duration" : 900,
     "library\.deploy\.timeout" : 1800,
     "log\.dir" : "logs/",
     "log\.level" : "DEBUG",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index aeb3cea..562e195 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -61,6 +61,7 @@
     "compiler\.textsearchmemory" : 163840,
     "compiler\.windowmemory" : 196608,
     "default\.dir" : "target/io/dir/asterixdb",
+    "gcp.impersonate.service.account.duration" : 900,
     "library\.deploy\.timeout" : 1800,
     "log\.dir" : "logs/",
     "log\.level" : "WARN",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index b475612..132fa5b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -61,6 +61,7 @@
     "compiler\.textsearchmemory" : 163840,
     "compiler\.windowmemory" : 196608,
     "default\.dir" : "target/io/dir/asterixdb",
+    "gcp.impersonate.service.account.duration" : 900,
     "library\.deploy\.timeout" : 1800,
     "log\.dir" : "logs/",
     "log\.level" : "WARN",
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 3a34365..02e2881 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -27,7 +27,7 @@
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.google.gcs.GCSUtils;
+import org.apache.asterix.external.util.google.gcs.GCSAuthUtils;
 import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
 import org.apache.asterix.runtime.writer.IExternalFileWriter;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
@@ -66,7 +66,7 @@
     @Override
     ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException {
         GCSClientConfig config = GCSClientConfig.of(configuration, writeBufferSize);
-        return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
+        return new GCSCloudClient(config, GCSAuthUtils.buildClient(appCtx, configuration),
                 ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
index 62437b2..d233606 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
@@ -64,7 +64,12 @@
                 75,
                 "Percentage of duration passed before assume role credentials need to be refreshed, the value ranges "
                         + "from 25 to 90, default is 75. For example, if the value is set to 65, this means the "
-                        + "credentials need to be refreshed if 65% of the total expiration duration is already passed");
+                        + "credentials need to be refreshed if 65% of the total expiration duration is already passed"),
+        GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION(
+                getRangedIntegerType(60, 3600),
+                900,
+                "GCS impersonating service account duration in seconds. "
+                        + "Range from 60 seconds (1 min) to 3600 seconds (1 hour)");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -94,6 +99,7 @@
                 case AZURE_REQUEST_TIMEOUT:
                 case AWS_ASSUME_ROLE_DURATION:
                 case AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE:
+                case GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION:
                     return Section.COMMON;
                 case CC_JAVA_OPTS:
                 case NC_JAVA_OPTS:
@@ -182,4 +188,8 @@
     public int getAwsRefreshAssumeRoleThresholdPercentage() {
         return accessor.getInt(Option.AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE);
     }
+
+    public int getGcpImpersonateServiceAccountDuration() {
+        return accessor.getInt(Option.GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 46d63bb..463695e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -326,6 +326,8 @@
     CSV_INVALID_FORCE_QUOTE(1218),
     CSV_INVALID_ESCAPE(1219),
     CANNOT_TRUNCATE_DATASET_TYPE(1220),
+    NO_VALID_AUTHENTICATION_PARAMS_PROVIDED(1221),
+    NO_VALID_AUTHENTICATION_PARAMS_PROVIDED_TO_IMPERSONATE_SERVICE_ACCOUNT(1222),
 
     // Feed errors
     DATAFLOW_ILLEGAL_STATE(3001),
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index d6a171f..df08016 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -328,6 +328,8 @@
 1218 = '%1$s' is not a valid force-quote input. The length of a force-quote input should be 1 character
 1219 = '%1$s' is not a valid escape. The length of a escape should be 1
 1220 = Cannot truncate %1$s '%2$s'
+1221 = No valid authentication parameters were provided
+1222 = No valid authentication parameters were provided to impersonate service account
 
 # Feed Errors
 3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
index 89da065..1ef3fcd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
@@ -27,13 +27,14 @@
 import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPInputStream;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.google.gcs.GCSUtils;
+import org.apache.asterix.external.util.google.gcs.GCSAuthUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
@@ -46,13 +47,15 @@
 
 public class GCSInputStream extends AbstractExternalInputStream {
 
+    private final IApplicationContext ncAppCtx;
     private final Storage client;
     private final String container;
     private static final int MAX_ATTEMPTS = 5; // We try a total of 5 times in case of retryable errors
 
-    public GCSInputStream(Map<String, String> configuration, List<String> filePaths,
+    public GCSInputStream(IApplicationContext ncAppCtx, Map<String, String> configuration, List<String> filePaths,
             IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException {
         super(configuration, filePaths, valueEmbedder);
+        this.ncAppCtx = ncAppCtx;
         this.client = buildClient(configuration);
         this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
     }
@@ -136,7 +139,7 @@
 
     private Storage buildClient(Map<String, String> configuration) throws HyracksDataException {
         try {
-            return GCSUtils.buildClient(configuration);
+            return GCSAuthUtils.buildClient(ncAppCtx, configuration);
         } catch (CompilationException ex) {
             throw HyracksDataException.create(ex);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
index b6ad3cd..2de55a0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
@@ -23,6 +23,7 @@
 import java.util.Map;
 import java.util.PriorityQueue;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.AsterixInputStream;
@@ -47,7 +48,9 @@
     public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
         IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder();
         int partition = context.getPartition();
-        return new GCSInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(),
+        IApplicationContext ncAppCtx = (IApplicationContext) context.getTaskContext().getJobletContext()
+                .getServiceContext().getApplicationContext();
+        return new GCSInputStream(ncAppCtx, configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(),
                 valueEmbedder);
     }
 
@@ -65,7 +68,8 @@
         configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
 
         // get the items
-        List<Blob> filesOnly = GCSUtils.listItems(configuration, includeExcludeMatcher, warningCollector,
+        IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext();
+        List<Blob> filesOnly = GCSUtils.listItems(appCtx, configuration, includeExcludeMatcher, warningCollector,
                 externalDataPrefix, evaluator);
 
         // Distribute work load amongst the partitions
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
index db7673c..301bfc4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
@@ -25,6 +25,7 @@
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.google.gcs.GCSAuthUtils;
 import org.apache.asterix.external.util.google.gcs.GCSUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -37,7 +38,8 @@
     @Override
     protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration)
             throws AlgebricksException {
-        GCSUtils.configureHdfsJobConf(conf, configuration);
+        int numberOfPartitions = getPartitionConstraint().getLocations().length;
+        GCSAuthUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
index 17cad3e..874c3bd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
@@ -23,6 +23,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
@@ -30,6 +31,7 @@
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.google.gcs.GCSAuthUtils;
 import org.apache.asterix.external.util.google.gcs.GCSConstants;
 import org.apache.asterix.external.util.google.gcs.GCSUtils;
 import org.apache.hadoop.mapred.JobConf;
@@ -59,7 +61,8 @@
         configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
 
         String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        List<Blob> filesOnly = GCSUtils.listItems(configuration, includeExcludeMatcher, warningCollector,
+        IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
+        List<Blob> filesOnly = GCSUtils.listItems(appCtx, configuration, includeExcludeMatcher, warningCollector,
                 externalDataPrefix, evaluator);
 
         // get path
@@ -71,7 +74,7 @@
         // configure hadoop input splits
         JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
         int numberOfPartitions = getPartitionConstraint().getLocations().length;
-        GCSUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions);
+        GCSAuthUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions);
         configureHdfsConf(conf, configuration);
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 0fed43f..591829b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -35,7 +35,7 @@
 import static org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
 import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties;
 import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
-import static org.apache.asterix.external.util.google.gcs.GCSUtils.configureHdfsJobConf;
+import static org.apache.asterix.external.util.google.gcs.GCSAuthUtils.configureHdfsJobConf;
 import static org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties;
 import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
@@ -727,7 +727,7 @@
                 validateAzureDataLakeProperties(configuration, srcLoc, collector, appCtx);
                 break;
             case ExternalDataConstants.KEY_ADAPTER_NAME_GCS:
-                validateProperties(configuration, srcLoc, collector);
+                validateProperties(appCtx, configuration, srcLoc, collector);
                 break;
             case ExternalDataConstants.KEY_ADAPTER_NAME_HDFS:
                 HDFSUtils.validateProperties(configuration, srcLoc, collector);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index f6ca480..035415d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -105,7 +105,7 @@
 public class S3AuthUtils {
     enum AuthenticationType {
         ANONYMOUS,
-        ARN,
+        ARN_ASSUME_ROLE,
         INSTANCE_PROFILE,
         ACCESS_KEYS,
         BAD_AUTHENTICATION
@@ -120,7 +120,8 @@
     }
 
     public static boolean isArnAssumedRoleExpiredToken(Map<String, String> configuration, String errorCode) {
-        return ERROR_EXPIRED_TOKEN.equals(errorCode) && getAuthenticationType(configuration) == AuthenticationType.ARN;
+        return ERROR_EXPIRED_TOKEN.equals(errorCode)
+                && getAuthenticationType(configuration) == AuthenticationType.ARN_ASSUME_ROLE;
     }
 
     /**
@@ -168,7 +169,7 @@
         switch (authenticationType) {
             case ANONYMOUS:
                 return AnonymousCredentialsProvider.create();
-            case ARN:
+            case ARN_ASSUME_ROLE:
                 return getTrustAccountCredentials(appCtx, configuration);
             case INSTANCE_PROFILE:
                 return getInstanceProfileCredentials(configuration);
@@ -206,7 +207,7 @@
         if (noAuth(configuration)) {
             return AuthenticationType.ANONYMOUS;
         } else if (roleArn != null) {
-            return AuthenticationType.ARN;
+            return AuthenticationType.ARN_ASSUME_ROLE;
         } else if (instanceProfile != null) {
             return AuthenticationType.INSTANCE_PROFILE;
         } else if (accessKeyId != null || secretAccessKey != null) {
@@ -232,11 +233,11 @@
     }
 
     /**
-     * Returns the cached credentials if valid, otherwise, generates new credentials by assume a role
+     * Returns the cached credentials if valid, otherwise, generates new credentials
      *
      * @param appCtx application context
      * @param configuration configuration
-     * @return returns the cached credentials if valid, otherwise, generates new credentials by assume a role
+     * @return returns the cached credentials if valid, otherwise, generates new credentials
      * @throws CompilationException CompilationException
      */
     public static AwsCredentialsProvider getTrustAccountCredentials(IApplicationContext appCtx,
@@ -278,20 +279,8 @@
         if (externalId != null) {
             builder.externalId(externalId);
         }
-
-        // credentials to be used to assume the role
-        AwsCredentialsProvider credentialsProvider;
         AssumeRoleRequest request = builder.build();
-        String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-        if ("true".equalsIgnoreCase(instanceProfile)) {
-            credentialsProvider = getInstanceProfileCredentials(configuration, true);
-        } else if (accessKeyId != null && secretAccessKey != null) {
-            credentialsProvider = getAccessKeyCredentials(configuration, true);
-        } else {
-            throw new CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION);
-        }
+        AwsCredentialsProvider credentialsProvider = getCredentialsToAssumeRole(configuration);
 
         // assume the role from the provided arn
         try (StsClient stsClient =
@@ -305,13 +294,22 @@
         }
     }
 
-    private static AwsCredentialsProvider getInstanceProfileCredentials(Map<String, String> configuration)
+    private static AwsCredentialsProvider getCredentialsToAssumeRole(Map<String, String> configuration)
             throws CompilationException {
-        return getInstanceProfileCredentials(configuration, false);
+        String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+        if (instanceProfile != null) {
+            return getInstanceProfileCredentials(configuration);
+        } else if (accessKeyId != null || secretAccessKey != null) {
+            return getAccessKeyCredentials(configuration);
+        } else {
+            throw new CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION);
+        }
     }
 
-    private static AwsCredentialsProvider getInstanceProfileCredentials(Map<String, String> configuration,
-            boolean assumeRoleAuthentication) throws CompilationException {
+    private static AwsCredentialsProvider getInstanceProfileCredentials(Map<String, String> configuration)
+            throws CompilationException {
         String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
 
         // only "true" value is allowed
@@ -319,24 +317,17 @@
             throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, INSTANCE_PROFILE_FIELD_NAME, "true");
         }
 
-        if (!assumeRoleAuthentication) {
-            String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
-                    SESSION_TOKEN_FIELD_NAME);
-            if (notAllowed != null) {
-                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                        INSTANCE_PROFILE_FIELD_NAME);
-            }
+        String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
+                SESSION_TOKEN_FIELD_NAME);
+        if (notAllowed != null) {
+            throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
+                    INSTANCE_PROFILE_FIELD_NAME);
         }
         return InstanceProfileCredentialsProvider.create();
     }
 
     private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, String> configuration)
             throws CompilationException {
-        return getAccessKeyCredentials(configuration, false);
-    }
-
-    private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, String> configuration,
-            boolean assumeRoleAuthentication) throws CompilationException {
         String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
         String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
         String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
@@ -350,14 +341,6 @@
                     ACCESS_KEY_ID_FIELD_NAME);
         }
 
-        if (!assumeRoleAuthentication) {
-            String notAllowed = getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, EXTERNAL_ID_FIELD_NAME);
-            if (notAllowed != null) {
-                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                        INSTANCE_PROFILE_FIELD_NAME);
-            }
-        }
-
         // use session token if provided
         if (sessionToken != null) {
             return StaticCredentialsProvider
@@ -423,13 +406,13 @@
      * @param configuration external details configuration
      */
     private static void setHadoopCredentials(IApplicationContext appCtx, JobConf jobConf,
-            Map<String, String> configuration) {
+            Map<String, String> configuration) throws CompilationException {
         AuthenticationType authenticationType = getAuthenticationType(configuration);
         switch (authenticationType) {
             case ANONYMOUS:
                 jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS);
                 break;
-            case ARN:
+            case ARN_ASSUME_ROLE:
                 jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ASSUMED_ROLE);
                 jobConf.set(HADOOP_ASSUME_ROLE_ARN, configuration.get(ROLE_ARN_FIELD_NAME));
                 jobConf.set(HADOOP_ASSUME_ROLE_EXTERNAL_ID, configuration.get(EXTERNAL_ID_FIELD_NAME));
@@ -458,6 +441,7 @@
                 }
                 break;
             case BAD_AUTHENTICATION:
+                throw new CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED);
         }
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSAuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSAuthUtils.java
new file mode 100644
index 0000000..4473ad7
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSAuthUtils.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.google.gcs;
+
+import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
+import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_TYPE;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_UNAUTHENTICATED;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_ENDPOINT;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_GCS_PROTOCOL;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.JSON_CREDENTIALS_FIELD_NAME;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.HadoopAuthServiceAccount.IMPERSONATE_SERVICE_ACCOUNT;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.json.JsonReadFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ImpersonatedCredentials;
+import com.google.cloud.NoCredentials;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+
+public class GCSAuthUtils {
+    enum AuthenticationType {
+        ANONYMOUS,
+        IMPERSONATE_SERVICE_ACCOUNT,
+        APPLICATION_DEFAULT_CREDENTIALS,
+        SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS,
+        BAD_AUTHENTICATION
+    }
+
+    private static final ObjectMapper JSON_CREDENTIALS_OBJECT_MAPPER = new ObjectMapper();
+    private static final List<String> READ_WRITE_SCOPE_PERMISSION =
+            Collections.singletonList("https://www.googleapis.com/auth/devstorage.read_write");
+    static {
+        JSON_CREDENTIALS_OBJECT_MAPPER.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
+    }
+
+    private GCSAuthUtils() {
+        throw new AssertionError("do not instantiate");
+    }
+
+    /**
+     * Builds the client using the provided configuration
+     *
+     * @param appCtx application context
+     * @param configuration properties
+     * @return Storage client
+     * @throws CompilationException CompilationException
+     */
+    public static Storage buildClient(IApplicationContext appCtx, Map<String, String> configuration)
+            throws CompilationException {
+        String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
+
+        Credentials credentials = buildCredentials(appCtx, configuration);
+        StorageOptions.Builder builder = StorageOptions.newBuilder();
+        builder.setCredentials(credentials);
+
+        if (endpoint != null) {
+            builder.setHost(endpoint);
+        }
+
+        return builder.build().getService();
+    }
+
+    public static Credentials buildCredentials(IApplicationContext appCtx, Map<String, String> configuration) throws CompilationException {
+        AuthenticationType authenticationType = getAuthenticationType(configuration);
+        return switch (authenticationType) {
+            case ANONYMOUS -> NoCredentials.getInstance();
+            case IMPERSONATE_SERVICE_ACCOUNT -> getImpersonatedServiceAccountCredentials(appCtx, configuration);
+            case APPLICATION_DEFAULT_CREDENTIALS -> getApplicationDefaultCredentials(configuration);
+            case SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS -> getServiceAccountKeyCredentials(configuration);
+            case BAD_AUTHENTICATION -> throw new CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED);
+        };
+    }
+
+    private static AuthenticationType getAuthenticationType(Map<String, String> configuration) {
+        String impersonateServiceAccount = configuration.get(IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME);
+        String applicationDefaultCredentials = configuration.get(APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
+        String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
+
+        if (noAuth(configuration)) {
+            return AuthenticationType.ANONYMOUS;
+        } else if (impersonateServiceAccount != null) {
+            return AuthenticationType.IMPERSONATE_SERVICE_ACCOUNT;
+        } else if (applicationDefaultCredentials != null) {
+            return AuthenticationType.APPLICATION_DEFAULT_CREDENTIALS;
+        } else if (jsonCredentials != null) {
+            return AuthenticationType.SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS;
+        } else {
+            return AuthenticationType.BAD_AUTHENTICATION;
+        }
+    }
+
+    private static boolean noAuth(Map<String, String> configuration) {
+        return getNonNull(configuration, APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME, JSON_CREDENTIALS_FIELD_NAME,
+                IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME) == null;
+    }
+
+    /**
+     * Returns the cached credentials if valid, otherwise, generates new credentials
+     *
+     * @param appCtx application context
+     * @param configuration configuration
+     * @return returns the cached credentials if valid, otherwise, generates new credentials
+     * @throws CompilationException CompilationException
+     */
+    public static GoogleCredentials getImpersonatedServiceAccountCredentials(IApplicationContext appCtx,
+            Map<String, String> configuration) throws CompilationException {
+        GoogleCredentials sourceCredentials = getCredentialsToImpersonateServiceAccount(configuration);
+        String impersonateServiceAccount = configuration.get(IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME);
+        int duration = appCtx.getExternalProperties().getGcpImpersonateServiceAccountDuration();
+
+        // Create impersonated credentials
+        return ImpersonatedCredentials.create(sourceCredentials, impersonateServiceAccount, null,
+                READ_WRITE_SCOPE_PERMISSION, duration);
+    }
+
+    private static GoogleCredentials getCredentialsToImpersonateServiceAccount(Map<String, String> configuration)
+            throws CompilationException {
+        String applicationDefaultCredentials = configuration.get(APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
+        String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
+
+        if (applicationDefaultCredentials != null) {
+            return getApplicationDefaultCredentials(configuration);
+        } else if (jsonCredentials != null) {
+            return getServiceAccountKeyCredentials(configuration);
+        } else {
+            throw new CompilationException(
+                    ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED_TO_IMPERSONATE_SERVICE_ACCOUNT);
+        }
+    }
+
+    private static GoogleCredentials getApplicationDefaultCredentials(Map<String, String> configuration)
+            throws CompilationException {
+        try {
+            String notAllowed = getNonNull(configuration, JSON_CREDENTIALS_FIELD_NAME);
+            if (notAllowed != null) {
+                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
+                        APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
+            }
+            return GoogleCredentials.getApplicationDefault();
+        } catch (Exception ex) {
+            throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+        }
+    }
+
+    private static GoogleCredentials getServiceAccountKeyCredentials(Map<String, String> configuration)
+            throws CompilationException {
+        String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
+        try (InputStream credentialsStream = new ByteArrayInputStream(jsonCredentials.getBytes())) {
+            return GoogleCredentials.fromStream(credentialsStream);
+        } catch (IOException ex) {
+            throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+        } catch (Exception ex) {
+            throw new CompilationException(EXTERNAL_SOURCE_ERROR,
+                    "Encountered an issue while processing the JSON credentials. Please ensure the provided credentials are valid.");
+        }
+    }
+
+    public static void configureHdfsJobConf(JobConf jobConf, Map<String, String> configuration)
+            throws AlgebricksException {
+        configureHdfsJobConf(jobConf, configuration, 0);
+    }
+
+    /**
+     * Builds the client using the provided configuration
+     *
+     * @param configuration      properties
+     * @param numberOfPartitions number of partitions
+     */
+    public static void configureHdfsJobConf(JobConf jobConf, Map<String, String> configuration, int numberOfPartitions)
+            throws AlgebricksException {
+        setHadoopCredentials(jobConf, configuration);
+
+        // set endpoint if provided, default is https://storage.googleapis.com/
+        String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
+        if (endpoint != null) {
+            jobConf.set(HADOOP_ENDPOINT, endpoint);
+        }
+
+        // disable caching FileSystem
+        HDFSUtils.disableHadoopFileSystemCache(jobConf, HADOOP_GCS_PROTOCOL);
+
+        // TODO(htowaileb): make configurable, in case we hit rate limits then we can reduce it, default is 15
+        if (numberOfPartitions != 0) {
+            jobConf.set(GCSConstants.MAX_BATCH_THREADS, String.valueOf(numberOfPartitions));
+        }
+
+        // recommended to be disabled by GCP hadoop team
+        jobConf.set(GCSConstants.HADOOP_SUPPORT_COMPRESSED, ExternalDataConstants.FALSE);
+    }
+
+    /**
+     * Sets the credentials provider type and the credentials to hadoop based on the provided configuration
+     *
+     * @param jobConf hadoop job config
+     * @param configuration external details configuration
+     * @throws CompilationException CompilationException
+     */
+    private static void setHadoopCredentials(JobConf jobConf, Map<String, String> configuration)
+            throws CompilationException {
+        AuthenticationType authenticationType = getAuthenticationType(configuration);
+        switch (authenticationType) {
+            case ANONYMOUS:
+                jobConf.set(HADOOP_AUTH_TYPE, HADOOP_AUTH_UNAUTHENTICATED);
+                break;
+            case IMPERSONATE_SERVICE_ACCOUNT:
+                String impersonateServiceAccount = configuration.get(IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME);
+                jobConf.set(IMPERSONATE_SERVICE_ACCOUNT, impersonateServiceAccount);
+                setJsonCredentials(jobConf, configuration);
+                break;
+            case SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS:
+                setJsonCredentials(jobConf, configuration);
+                break;
+            case BAD_AUTHENTICATION:
+                throw new CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED);
+        }
+    }
+
+    /**
+     * Sets the Json credentials to hadoop job configuration
+     * Note:
+     * Setting these values instead of HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH is supported
+     * in com.google.cloud.bigdataoss:util-hadoop only up to version hadoop3-2.2.x and is removed in
+     * version 3.x.y, which also removed support for hadoop-2
+     *
+     * @param jobConf hadoop job config
+     * @param configuration external details configuration
+     * @throws CompilationException CompilationException
+     */
+    private static void setJsonCredentials(JobConf jobConf, Map<String, String> configuration)
+            throws CompilationException {
+        try {
+            String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
+            JsonNode jsonCreds = JSON_CREDENTIALS_OBJECT_MAPPER.readTree(jsonCredentials);
+            jobConf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY_ID,
+                    jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY_ID).asText());
+            jobConf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY,
+                    jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY).asText());
+            jobConf.set(GCSConstants.HadoopAuthServiceAccount.CLIENT_EMAIL,
+                    jsonCreds.get(GCSConstants.JsonCredentials.CLIENT_EMAIL).asText());
+        } catch (JsonProcessingException e) {
+            throw CompilationException.create(EXTERNAL_SOURCE_ERROR, e, "Unable to parse Json Credentials",
+                    getMessageOrToString(e));
+        }
+    }
+
+    private static String getNonNull(Map<String, String> configuration, String... fieldNames) {
+        for (String fieldName : fieldNames) {
+            if (configuration.get(fieldName) != null) {
+                return fieldName;
+            }
+        }
+        return null;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
index 739dbde..19fd74f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
@@ -40,23 +40,21 @@
     }
 
     public static final String APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME = "applicationDefaultCredentials";
+    public static final String IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME = "impersonateServiceAccount";
     public static final String JSON_CREDENTIALS_FIELD_NAME = "jsonCredentials";
     public static final String ENDPOINT_FIELD_NAME = "endpoint";
     public static final String STORAGE_PREFIX = "prefix";
 
-    /*
-     * Hadoop internal configuration
-     */
+    // hadoop internal configuration
     public static final String HADOOP_GCS_PROTOCOL = "gs";
+    public static final String MAX_BATCH_THREADS = "fs.gs.batch.threads";
+    public static final String HADOOP_ENDPOINT = "fs.gs.storage.root.url";
+    public static final String HADOOP_SUPPORT_COMPRESSED = "fs.gs.inputstream.support.gzip.encoding.enable";
 
     // hadoop credentials
     public static final String HADOOP_AUTH_TYPE = "fs.gs.auth.type";
     public static final String HADOOP_AUTH_UNAUTHENTICATED = "UNAUTHENTICATED";
 
-    // gs hadoop parameters
-    public static final String HADOOP_SUPPORT_COMPRESSED = "fs.gs.inputstream.support.gzip.encoding.enable";
-    public static final String HADOOP_ENDPOINT = "fs.gs.storage.root.url";
-
     public static class JsonCredentials {
         public static final String PRIVATE_KEY_ID = "private_key_id";
         public static final String PRIVATE_KEY = "private_key";
@@ -64,6 +62,7 @@
     }
 
     public static class HadoopAuthServiceAccount {
+        public static final String IMPERSONATE_SERVICE_ACCOUNT = "fs.gs.auth.impersonation.service.account";
         public static final String PRIVATE_KEY_ID = "fs.gs.auth.service.account.private.key.id";
         public static final String PRIVATE_KEY = "fs.gs.auth.service.account.private.key";
         public static final String CLIENT_EMAIL = "fs.gs.auth.service.account.email";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index d768c23..032f537 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -18,32 +18,20 @@
  */
 package org.apache.asterix.external.util.google.gcs;
 
-import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
-import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
-import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
 import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
 import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
 import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_TYPE;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_UNAUTHENTICATED;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_ENDPOINT;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_GCS_PROTOCOL;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.JSON_CREDENTIALS_FIELD_NAME;
+import static org.apache.asterix.external.util.google.gcs.GCSAuthUtils.buildClient;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
@@ -51,25 +39,15 @@
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.exceptions.Warning;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.json.JsonReadFeature;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.api.gax.paging.Page;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.BaseServiceException;
-import com.google.cloud.NoCredentials;
 import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.Storage;
-import com.google.cloud.storage.StorageOptions;
 
 public class GCSUtils {
     private GCSUtils() {
@@ -77,73 +55,17 @@
 
     }
 
-    private static final ObjectMapper JSON_CREDENTIALS_OBJECT_MAPPER = new ObjectMapper();
-    static {
-        JSON_CREDENTIALS_OBJECT_MAPPER.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
-    }
-
-    /**
-     * Builds the client using the provided configuration
-     *
-     * @param configuration properties
-     * @return clientasterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
-     * @throws CompilationException CompilationException
-     */
-    public static Storage buildClient(Map<String, String> configuration) throws CompilationException {
-        String applicationDefaultCredentials = configuration.get(APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
-        String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
-        String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
-
-        StorageOptions.Builder builder = StorageOptions.newBuilder();
-        builder.setStorageRetryStrategy(DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY);
-
-        // default credentials provider
-        if (applicationDefaultCredentials != null) {
-            // only "true" value is allowed
-            if (!applicationDefaultCredentials.equalsIgnoreCase("true")) {
-                throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE,
-                        APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME, "true");
-            }
-
-            // no other authentication parameters are allowed
-            if (jsonCredentials != null) {
-                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, JSON_CREDENTIALS_FIELD_NAME,
-                        APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
-            }
-
-            try {
-                builder.setCredentials(GoogleCredentials.getApplicationDefault());
-            } catch (Exception ex) {
-                throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
-            }
-        } else if (jsonCredentials != null) {
-            try (InputStream credentialsStream = new ByteArrayInputStream(jsonCredentials.getBytes())) {
-                builder.setCredentials(GoogleCredentials.fromStream(credentialsStream));
-            } catch (IOException ex) {
-                throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
-            } catch (Exception ex) {
-                throw new CompilationException(EXTERNAL_SOURCE_ERROR,
-                        "Encountered an issue while processing the JSON credentials. Please ensure the provided credentials are valid.");
-            }
-        } else {
-            builder.setCredentials(NoCredentials.getInstance());
-        }
-
-        if (endpoint != null) {
-            builder.setHost(endpoint);
-        }
-
-        return builder.build().getService();
-    }
-
     /**
      * Validate external dataset properties
      *
+     * @param appCtx application context
      * @param configuration properties
+     * @param srcLoc source location
+     * @param collector warning collector
      * @throws CompilationException Compilation exception
      */
-    public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
-            IWarningCollector collector) throws CompilationException {
+    public static void validateProperties(IApplicationContext appCtx, Map<String, String> configuration,
+            SourceLocation srcLoc, IWarningCollector collector) throws CompilationException {
         if (isDeltaTable(configuration)) {
             validateDeltaTableProperties(configuration);
         }
@@ -154,7 +76,6 @@
 
         validateIncludeExclude(configuration);
         try {
-            // TODO(htowaileb): maybe something better, this will check to ensure type is supported before creation
             new ExternalDataPrefix(configuration);
         } catch (AlgebricksException ex) {
             throw new CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex);
@@ -162,36 +83,33 @@
 
         String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
 
-        try {
+        try (Storage storage = buildClient(appCtx, configuration)) {
             Storage.BlobListOption limitOption = Storage.BlobListOption.pageSize(1);
             Storage.BlobListOption prefixOption = Storage.BlobListOption.prefix(getPrefix(configuration));
-            Storage storage = buildClient(configuration);
             Page<Blob> items = storage.list(container, limitOption, prefixOption);
 
             if (!items.iterateAll().iterator().hasNext() && collector.shouldWarn()) {
                 Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
                 collector.warn(warning);
             }
-        } catch (CompilationException ex) {
-            throw ex;
         } catch (Exception ex) {
             throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
         }
     }
 
-    public static List<Blob> listItems(Map<String, String> configuration, IncludeExcludeMatcher includeExcludeMatcher,
-            IWarningCollector warningCollector, ExternalDataPrefix externalDataPrefix,
-            IExternalFilterEvaluator evaluator) throws CompilationException, HyracksDataException {
+    public static List<Blob> listItems(IApplicationContext appCtx, Map<String, String> configuration,
+            IncludeExcludeMatcher includeExcludeMatcher, IWarningCollector warningCollector,
+            ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator)
+            throws CompilationException, HyracksDataException {
         // Prepare to retrieve the objects
         List<Blob> filesOnly = new ArrayList<>();
         String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        Storage gcs = buildClient(configuration);
         Storage.BlobListOption options = Storage.BlobListOption.prefix(ExternalDataUtils.getPrefix(configuration));
         Page<Blob> items;
 
-        try {
+        try (Storage gcs = buildClient(appCtx, configuration)) {
             items = gcs.list(container, options);
-        } catch (BaseServiceException ex) {
+        } catch (Exception ex) {
             throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
         }
 
@@ -224,61 +142,6 @@
         }
     }
 
-    public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration)
-            throws AlgebricksException {
-        configureHdfsJobConf(conf, configuration, 0);
-    }
-
-    /**
-     * Builds the client using the provided configuration
-     *
-     * @param configuration      properties
-     * @param numberOfPartitions number of partitions in the cluster
-     */
-    public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration, int numberOfPartitions)
-            throws AlgebricksException {
-        String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
-        String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
-
-        // disable caching FileSystem
-        HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_GCS_PROTOCOL);
-
-        // TODO(htowaileb): needs further testing, recommended to disable by gcs-hadoop team
-        conf.set(GCSConstants.HADOOP_SUPPORT_COMPRESSED, ExternalDataConstants.FALSE);
-
-        // TODO(htowaileb): needs further testing
-        // set number of threads
-        //        conf.set(GCSConstants.HADOOP_MAX_REQUESTS_PER_BATCH, String.valueOf(numberOfPartitions));
-        //        conf.set(GCSConstants.HADOOP_BATCH_THREADS, String.valueOf(numberOfPartitions));
-
-        // authentication method
-        if (jsonCredentials == null) {
-            // anonymous access
-            conf.set(HADOOP_AUTH_TYPE, HADOOP_AUTH_UNAUTHENTICATED);
-        } else {
-            try {
-                JsonNode jsonCreds = JSON_CREDENTIALS_OBJECT_MAPPER.readTree(jsonCredentials);
-                // Setting these values instead of HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH is supported
-                // in com.google.cloud.bigdataoss:util-hadoop only up to version hadoop3-2.2.x and is removed in
-                // version 3.x.y, which also removed support for hadoop-2
-                conf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY_ID,
-                        jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY_ID).asText());
-                conf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY,
-                        jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY).asText());
-                conf.set(GCSConstants.HadoopAuthServiceAccount.CLIENT_EMAIL,
-                        jsonCreds.get(GCSConstants.JsonCredentials.CLIENT_EMAIL).asText());
-            } catch (JsonProcessingException e) {
-                throw CompilationException.create(EXTERNAL_SOURCE_ERROR, "Unable to parse Json Credentials",
-                        getMessageOrToString(e));
-            }
-        }
-
-        // set endpoint if provided, default is https://storage.googleapis.com/
-        if (endpoint != null) {
-            conf.set(HADOOP_ENDPOINT, endpoint);
-        }
-    }
-
     public static String getPath(Map<String, String> configuration) {
         return GCSConstants.HADOOP_GCS_PROTOCOL + "://"
                 + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'