[ASTERIXDB-3514][EXT]: Assume role only when temporary credentials expire

- user model changes: no
- storage format changes: no
- interface changes: yes

When using temporary credentials from assuming
a role, cache the credentials and only refresh
them when they expired.

Ext-ref: MB-63505
Change-Id: I622853b794f81cd4bda84964a6cf7041d889d20f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19067
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index 6ea7aeb..e892d04 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -24,6 +24,8 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 
+import org.apache.asterix.app.external.ExternalCredentialsCache;
+import org.apache.asterix.app.external.ExternalCredentialsCacheUpdater;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.api.IConfigValidator;
 import org.apache.asterix.common.api.IConfigValidatorFactory;
@@ -55,6 +57,8 @@
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
 import org.apache.asterix.common.external.IAdapterFactoryService;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.metadata.IMetadataLockUtil;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
@@ -127,6 +131,8 @@
     private final IOManager ioManager;
     private final INamespacePathResolver namespacePathResolver;
     private final INamespaceResolver namespaceResolver;
+    private final IExternalCredentialsCache externalCredentialsCache;
+    private final IExternalCredentialsCacheUpdater externalCredentialsCacheUpdater;
 
     public CcApplicationContext(ICCServiceContext ccServiceCtx, HyracksConnection hcc,
             Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
@@ -177,6 +183,8 @@
         this.globalTxManager = globalTxManager;
         this.ioManager = ioManager;
         dataPartitioningProvider = DataPartitioningProvider.create(this);
+        externalCredentialsCache = new ExternalCredentialsCache();
+        externalCredentialsCacheUpdater = new ExternalCredentialsCacheUpdater(this);
     }
 
     @Override
@@ -415,4 +423,14 @@
     public IOManager getIoManager() {
         return ioManager;
     }
+
+    @Override
+    public IExternalCredentialsCache getExternalCredentialsCache() {
+        return externalCredentialsCache;
+    }
+
+    @Override
+    public IExternalCredentialsCacheUpdater getExternalCredentialsCacheUpdater() {
+        return externalCredentialsCacheUpdater;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
new file mode 100644
index 0000000..0ddca4e
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.metadata.MetadataConstants;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.util.Span;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+
+public class ExternalCredentialsCache implements IExternalCredentialsCache {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final ConcurrentMap<String, Pair<Span, Object>> cache = new ConcurrentHashMap<>();
+
+    public ExternalCredentialsCache() {
+    }
+
+    @Override
+    public synchronized Object getCredentials(Map<String, String> configuration) {
+        String name = getName(configuration);
+        if (cache.containsKey(name) && !needsRefresh(cache.get(name).getLeft())) {
+            return cache.get(name).getRight();
+        }
+        return null;
+    }
+
+    @Override
+    public synchronized void updateCache(Map<String, String> configuration, Map<String, String> credentials) {
+        String type = configuration.get(ExternalDataConstants.KEY_READER);
+        if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equalsIgnoreCase(type)) {
+            updateAwsCache(configuration, credentials);
+        }
+    }
+
+    @Override
+    public String getName(Map<String, String> configuration) {
+        String database = configuration.get(ExternalDataConstants.KEY_DATASET_DATABASE);
+        if (database == null) {
+            database = MetadataConstants.DEFAULT_DATABASE;
+        }
+        String dataverse = configuration.get(ExternalDataConstants.KEY_DATASET_DATAVERSE);
+        String dataset = configuration.get(ExternalDataConstants.KEY_DATASET);
+        return String.join(".", database, dataverse, dataset);
+    }
+
+    private void updateAwsCache(Map<String, String> configuration, Map<String, String> credentials) {
+        String accessKeyId = credentials.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = credentials.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
+        String sessionToken = credentials.get(S3Constants.SESSION_TOKEN_FIELD_NAME);
+        doUpdateAwsCache(configuration, AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
+    }
+
+    private void doUpdateAwsCache(Map<String, String> configuration, AwsSessionCredentials credentials) {
+        // TODO(htowaileb): Set default expiration value
+        String name = getName(configuration);
+        cache.put(name, Pair.of(Span.start(15, TimeUnit.MINUTES), credentials));
+        LOGGER.info("Received and cached new credentials for {}", name);
+    }
+
+    /**
+     * Refresh if the remaining time is half or less than the total expiration time
+     *
+     * @param span expiration span
+     * @return true if the remaining time is half or less than the total expiration time, false otherwise
+     */
+    private boolean needsRefresh(Span span) {
+        // TODO(htowaileb): At what % (and should be configurable?) do we decide it's better to refresh credentials
+        return (double) span.remaining(TimeUnit.MINUTES) / span.getSpan(TimeUnit.MINUTES) < 0.5;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
new file mode 100644
index 0000000..f07caaa
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
+import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.ACTIVE;
+import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.REBALANCE_REQUIRED;
+import static org.apache.asterix.common.exceptions.ErrorCode.REJECT_BAD_CLUSTER_STATE;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.app.message.RefreshAwsCredentialsRequest;
+import org.apache.asterix.app.message.RefreshAwsCredentialsResponse;
+import org.apache.asterix.app.message.UpdateAwsCredentialsCacheRequest;
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+
+public class ExternalCredentialsCacheUpdater implements IExternalCredentialsCacheUpdater {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final IApplicationContext appCtx;
+
+    public ExternalCredentialsCacheUpdater(IApplicationContext appCtx) {
+        this.appCtx = appCtx;
+    }
+
+    @Override
+    public synchronized Object generateAndCacheCredentials(Map<String, String> configuration)
+            throws HyracksDataException, CompilationException {
+        IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
+        Object credentials = cache.getCredentials(configuration);
+        if (credentials != null) {
+            return credentials;
+        }
+
+        /*
+         * if we are the CC, generate new creds and ask all NCs to update their cache
+         * if we are the NC, send a message to the CC to generate new creds and ask all NCs to update their cache
+         */
+        String name = cache.getName(configuration);
+        if (appCtx instanceof ICcApplicationContext) {
+            ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx;
+            IClusterManagementWork.ClusterState state = ccAppCtx.getClusterStateManager().getState();
+            if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) {
+                throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, state);
+            }
+
+            String accessKeyId;
+            String secretAccessKey;
+            String sessionToken;
+            Map<String, String> credentialsMap = new HashMap<>();
+            try {
+                LOGGER.info("attempting to update credentials for {}", name);
+                AwsCredentialsProvider newCredentials = S3AuthUtils.assumeRoleAndGetCredentials(configuration);
+                LOGGER.info("updated credentials successfully for {}", name);
+                AwsSessionCredentials sessionCredentials = (AwsSessionCredentials) newCredentials.resolveCredentials();
+                accessKeyId = sessionCredentials.accessKeyId();
+                secretAccessKey = sessionCredentials.secretAccessKey();
+                sessionToken = sessionCredentials.sessionToken();
+            } catch (CompilationException ex) {
+                LOGGER.info("failed to refresh credentials for {}", name, ex);
+                throw ex;
+            }
+
+            // credentials need refreshing
+            credentialsMap.put(S3Constants.ACCESS_KEY_ID_FIELD_NAME, accessKeyId);
+            credentialsMap.put(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME, secretAccessKey);
+            credentialsMap.put(S3Constants.SESSION_TOKEN_FIELD_NAME, sessionToken);
+
+            // request all NCs to update their credentials cache with the latest creds
+            updateNcsCredentialsCache(ccAppCtx, name, credentialsMap, configuration);
+            cache.updateCache(configuration, credentialsMap);
+            credentials = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken);
+        } else {
+            NCMessageBroker broker = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+            MessageFuture messageFuture = broker.registerMessageFuture();
+            String nodeId = ((INCServiceContext) appCtx.getServiceContext()).getNodeId();
+            long futureId = messageFuture.getFutureId();
+            RefreshAwsCredentialsRequest request = new RefreshAwsCredentialsRequest(nodeId, futureId, configuration);
+            try {
+                LOGGER.info("no valid credentials found for {}, requesting credentials from CC", name);
+                broker.sendMessageToPrimaryCC(request);
+                RefreshAwsCredentialsResponse response = (RefreshAwsCredentialsResponse) messageFuture
+                        .get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+                if (response.getFailure() != null) {
+                    throw HyracksDataException.create(response.getFailure());
+                }
+                credentials = AwsSessionCredentials.create(response.getAccessKeyId(), response.getSecretAccessKey(),
+                        response.getSessionToken());
+            } catch (Exception ex) {
+                LOGGER.info("failed to refresh credentials for {}", name, ex);
+                throw HyracksDataException.create(ex);
+            } finally {
+                broker.deregisterMessageFuture(futureId);
+            }
+        }
+        return credentials;
+    }
+
+    private void updateNcsCredentialsCache(ICcApplicationContext appCtx, String name, Map<String, String> credentials,
+            Map<String, String> configuration) throws HyracksDataException {
+        final List<String> ncs = new ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes());
+        CCMessageBroker broker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        UpdateAwsCredentialsCacheRequest request = new UpdateAwsCredentialsCacheRequest(configuration, credentials);
+
+        try {
+            LOGGER.info("requesting all NCs to update their credentials for {}", name);
+            for (String nc : ncs) {
+                broker.sendApplicationMessageToNC(request, nc);
+            }
+        } catch (Exception e) {
+            LOGGER.info("failed to send message to nc", e);
+            throw HyracksDataException.create(e);
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsRequest.java
new file mode 100644
index 0000000..32de92d
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsRequest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.Map;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+
+public class RefreshAwsCredentialsRequest implements ICcAddressedMessage {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final String nodeId;
+    private final long reqId;
+    private final Map<String, String> configuration;
+
+    public RefreshAwsCredentialsRequest(String nodeId, long reqId, Map<String, String> configuration) {
+        this.nodeId = nodeId;
+        this.reqId = reqId;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public final void handle(ICcApplicationContext appCtx) throws HyracksDataException {
+        try {
+            IExternalCredentialsCacheUpdater cacheUpdater = appCtx.getExternalCredentialsCacheUpdater();
+            Object credentials = cacheUpdater.generateAndCacheCredentials(configuration);
+            AwsSessionCredentials sessionCredentials = (AwsSessionCredentials) credentials;
+
+            // respond with the credentials
+            RefreshAwsCredentialsResponse response =
+                    new RefreshAwsCredentialsResponse(reqId, sessionCredentials.accessKeyId(),
+                            sessionCredentials.secretAccessKey(), sessionCredentials.sessionToken(), null);
+            respond(appCtx, response);
+        } catch (Exception e) {
+            LOGGER.info("failed to refresh credentials", e);
+            RefreshAwsCredentialsResponse response = new RefreshAwsCredentialsResponse(reqId, null, null, null, e);
+            respond(appCtx, response);
+        }
+    }
+
+    private void respond(ICcApplicationContext appCtx, RefreshAwsCredentialsResponse response)
+            throws HyracksDataException {
+        CCMessageBroker broker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        try {
+            broker.sendApplicationMessageToNC(response, nodeId);
+        } catch (Exception e) {
+            LOGGER.info("failed to send reply to nc", e);
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public boolean isWhispered() {
+        return true;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsResponse.java
new file mode 100644
index 0000000..9ea0e11
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsResponse.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+
+public class RefreshAwsCredentialsResponse implements INcAddressedMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final long reqId;
+    private final String accessKeyId;
+    private final String secretAccessKey;
+    private final String sessionToken;
+    private final Throwable failure;
+
+    public RefreshAwsCredentialsResponse(long reqId, String accessKeyId, String secretAccessKey, String sessionToken,
+            Throwable failure) {
+        this.reqId = reqId;
+        this.accessKeyId = accessKeyId;
+        this.secretAccessKey = secretAccessKey;
+        this.sessionToken = sessionToken;
+        this.failure = failure;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) {
+        NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        MessageFuture future = mb.deregisterMessageFuture(reqId);
+        if (future != null) {
+            future.complete(this);
+        }
+    }
+
+    public String getAccessKeyId() {
+        return accessKeyId;
+    }
+
+    public String getSecretAccessKey() {
+        return secretAccessKey;
+    }
+
+    public String getSessionToken() {
+        return sessionToken;
+    }
+
+    public Throwable getFailure() {
+        return failure;
+    }
+
+    @Override
+    public boolean isWhispered() {
+        return true;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
new file mode 100644
index 0000000..44d4c21
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.Map;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+
+public class UpdateAwsCredentialsCacheRequest implements INcAddressedMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Map<String, String> configuration;
+    private final Map<String, String> credentials;
+
+    public UpdateAwsCredentialsCacheRequest(Map<String, String> configuration, Map<String, String> credentials) {
+        this.configuration = configuration;
+        this.credentials = credentials;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) {
+        appCtx.getExternalCredentialsCache().updateCache(configuration, credentials);
+    }
+
+    @Override
+    public boolean isWhispered() {
+        return true;
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 7da3838..8c2a0ab 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -32,6 +32,8 @@
 import java.util.concurrent.ExecutorService;
 
 import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.app.external.ExternalCredentialsCache;
+import org.apache.asterix.app.external.ExternalCredentialsCacheUpdater;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.cloud.CloudConfigurator;
 import org.apache.asterix.cloud.LocalPartitionBootstrapper;
@@ -63,6 +65,8 @@
 import org.apache.asterix.common.context.DiskWriteRateLimiterProvider;
 import org.apache.asterix.common.context.GlobalVirtualBufferCache;
 import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
@@ -186,6 +190,8 @@
     private final INamespacePathResolver namespacePathResolver;
     private final INamespaceResolver namespaceResolver;
     private IDiskCacheMonitoringService diskCacheService;
+    protected IExternalCredentialsCache externalCredentialsCache;
+    protected IExternalCredentialsCacheUpdater externalCredentialsCacheUpdater;
 
     public NCAppRuntimeContext(INCServiceContext ncServiceContext, NCExtensionManager extensionManager,
             IPropertiesFactory propertiesFactory, INamespaceResolver namespaceResolver,
@@ -210,6 +216,8 @@
         cacheManager = new CacheManager();
         this.namespacePathResolver = namespacePathResolver;
         this.namespaceResolver = namespaceResolver;
+        this.externalCredentialsCache = new ExternalCredentialsCache();
+        this.externalCredentialsCacheUpdater = new ExternalCredentialsCacheUpdater(this);
     }
 
     @Override
@@ -748,4 +756,14 @@
         return isCloudDeployment() ? storageProperties.getStoragePartitionsCount()
                 : ncServiceContext.getIoManager().getIODevices().size();
     }
+
+    @Override
+    public IExternalCredentialsCache getExternalCredentialsCache() {
+        return externalCredentialsCache;
+    }
+
+    @Override
+    public IExternalCredentialsCacheUpdater getExternalCredentialsCacheUpdater() {
+        return externalCredentialsCacheUpdater;
+    }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
index 589ee79..198b3ad 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
@@ -30,6 +30,7 @@
 import org.apache.asterix.cloud.WriterSingleBufferProvider;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.clients.ICloudWriter;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -60,17 +61,17 @@
         writeBufferSize = externalConfig.getWriteBufferSize();
     }
 
-    abstract ICloudClient createCloudClient() throws CompilationException;
+    abstract ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException;
 
     abstract boolean isNoContainerFoundException(IOException e);
 
     abstract boolean isSdkException(Throwable e);
 
-    final void buildClient() throws HyracksDataException {
+    final void buildClient(IApplicationContext appCtx) throws HyracksDataException {
         try {
             synchronized (this) {
                 if (cloudClient == null) {
-                    cloudClient = createCloudClient();
+                    cloudClient = createCloudClient(appCtx);
                 }
             }
         } catch (CompilationException e) {
@@ -79,8 +80,8 @@
     }
 
     @Override
-    public final void validate() throws AlgebricksException {
-        ICloudClient testClient = createCloudClient();
+    public final void validate(IApplicationContext appCtx) throws AlgebricksException {
+        ICloudClient testClient = createCloudClient(appCtx);
         String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
 
         if (bucket == null || bucket.isEmpty()) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 886f20d..63a8366 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
 import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.google.gcs.GCSUtils;
@@ -61,7 +62,7 @@
     }
 
     @Override
-    ICloudClient createCloudClient() throws CompilationException {
+    ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException {
         GCSClientConfig config = GCSClientConfig.of(configuration, writeBufferSize);
         return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
                 ICloudGuardian.NoOpCloudGuardian.INSTANCE);
@@ -80,7 +81,7 @@
     @Override
     public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
             throws HyracksDataException {
-        buildClient();
+        buildClient(((IApplicationContext) context.getJobletContext().getServiceContext().getApplicationContext()));
         String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
         IExternalPrinter printer = printerFactory.createPrinter();
         IWarningCollector warningCollector = context.getWarningCollector();
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
index e07acc0..d268efd 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -24,9 +24,10 @@
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
 import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
 import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
 import org.apache.asterix.runtime.writer.IExternalFileWriter;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
@@ -61,9 +62,9 @@
     }
 
     @Override
-    ICloudClient createCloudClient() throws CompilationException {
+    ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException {
         S3ClientConfig config = S3ClientConfig.of(configuration, writeBufferSize);
-        return new S3CloudClient(config, S3Utils.buildAwsS3Client(configuration),
+        return new S3CloudClient(config, S3AuthUtils.buildAwsS3Client(appCtx, configuration),
                 ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }
 
@@ -80,7 +81,7 @@
     @Override
     public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
             throws HyracksDataException {
-        buildClient();
+        buildClient(((IApplicationContext) context.getJobletContext().getServiceContext().getApplicationContext()));
         String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
         IExternalPrinter printer = printerFactory.createPrinter();
         IWarningCollector warningCollector = context.getWarningCollector();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index eabebf7..19e4ad7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -29,6 +29,8 @@
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -106,4 +108,14 @@
     INamespaceResolver getNamespaceResolver();
 
     INamespacePathResolver getNamespacePathResolver();
+
+    /**
+     * @return external credentials cache
+     */
+    IExternalCredentialsCache getExternalCredentialsCache();
+
+    /**
+     * @return external credentials cache updater
+     */
+    IExternalCredentialsCacheUpdater getExternalCredentialsCacheUpdater();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 0cf6eb2..35e0699 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -314,6 +314,8 @@
     MAXIMUM_VALUE_ALLOWED_FOR_PARAM(1209),
     STORAGE_SIZE_NOT_APPLICABLE_TO_TYPE(1210),
     COULD_NOT_CREATE_TOKENS(1211),
+    NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION(1212),
+    FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION(1213),
 
     // Feed errors
     DATAFLOW_ILLEGAL_STATE(3001),
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
new file mode 100644
index 0000000..245b350
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import java.util.Map;
+
+public interface IExternalCredentialsCache {
+
+    /**
+     * Returns the cached credentials. Can be of any supported external credentials type
+     *
+     * @param configuration configuration containing external collection details
+     * @return credentials if present, null otherwise
+     */
+    Object getCredentials(Map<String, String> configuration);
+
+    /**
+     * Updates the credentials cache with the provided credentials for the specified name
+     *
+     * @param configuration configuration containing external collection details
+     * @param credentials credentials to cache
+     */
+    void updateCache(Map<String, String> configuration, Map<String, String> credentials);
+
+    /**
+     * Returns the name of the entity which the cached credentials belong to
+     *
+     * @param configuration configuration containing external collection details
+     * @return name of entity which credentials belong to
+     */
+    String getName(Map<String, String> configuration);
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCacheUpdater.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCacheUpdater.java
new file mode 100644
index 0000000..48553c0
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCacheUpdater.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IExternalCredentialsCacheUpdater {
+
+    /**
+     * Generates new credentials and caches them
+     *
+     * @param configuration configuration containing external collection details
+     */
+    Object generateAndCacheCredentials(Map<String, String> configuration)
+            throws HyracksDataException, CompilationException;
+}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index adb26fe..d15a751 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -316,6 +316,8 @@
 1209 = Maximum value allowed for '%1$s' is %2$s. Found %3$s
 1210 = Retrieving storage size is not applicable to type: %1$s.
 1211 = Could not create delegation tokens
+1212 = No credentials found for cross-account authentication. Expected instance profile or access key id & secret access key for assuming role
+1213 = Failed to perform cross-account authentication. Encountered error : '%1$s'
 
 # Feed Errors
 3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 7a1bdad..138b364 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -26,13 +26,14 @@
 import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPInputStream;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
@@ -48,14 +49,16 @@
 public class AwsS3InputStream extends AbstractExternalInputStream {
 
     // Configuration
+    private final IApplicationContext ncAppCtx;
     private final String bucket;
     private final S3Client s3Client;
     private ResponseInputStream<?> s3InStream;
     private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service
 
-    public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths,
+    public AwsS3InputStream(IApplicationContext ncAppCtx, Map<String, String> configuration, List<String> filePaths,
             IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException {
         super(configuration, filePaths, valueEmbedder);
+        this.ncAppCtx = ncAppCtx;
         this.s3Client = buildAwsS3Client(configuration);
         this.bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
     }
@@ -113,7 +116,7 @@
     }
 
     private boolean shouldRetry(String errorCode, int currentRetry) {
-        return currentRetry < MAX_RETRIES && S3Utils.isRetryableError(errorCode);
+        return currentRetry < MAX_RETRIES && S3AuthUtils.isRetryableError(errorCode);
     }
 
     @Override
@@ -141,7 +144,7 @@
 
     private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException {
         try {
-            return S3Utils.buildAwsS3Client(configuration);
+            return S3AuthUtils.buildAwsS3Client(ncAppCtx, configuration);
         } catch (CompilationException ex) {
             throw HyracksDataException.create(ex);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 36d21d1..e9c72e3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -23,6 +23,7 @@
 import java.util.Map;
 import java.util.PriorityQueue;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.AsterixInputStream;
@@ -47,8 +48,10 @@
     public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
         IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder();
         int partition = context.getPartition();
-        return new AwsS3InputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(),
-                valueEmbedder);
+        IApplicationContext ncAppCtx = (IApplicationContext) context.getTaskContext().getJobletContext()
+                .getServiceContext().getApplicationContext();
+        return new AwsS3InputStream(ncAppCtx, configuration,
+                partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(), valueEmbedder);
     }
 
     @Override
@@ -65,7 +68,8 @@
         configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
 
         // get the items
-        List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector,
+        IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext();
+        List<S3Object> filesOnly = S3Utils.listS3Objects(appCtx, configuration, includeExcludeMatcher, warningCollector,
                 externalDataPrefix, evaluator);
 
         // Distribute work load amongst the partitions
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index abed33a..7ddbab91 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.external.input.record.reader.aws.parquet;
 
-import static org.apache.asterix.external.util.aws.s3.S3Utils.configureAwsS3HdfsJobConf;
+import static org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
 import static org.apache.asterix.external.util.aws.s3.S3Utils.listS3Objects;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
@@ -73,7 +74,8 @@
             configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
 
             String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-            List<S3Object> filesOnly = listS3Objects(configuration, includeExcludeMatcher, warningCollector,
+            IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
+            List<S3Object> filesOnly = listS3Objects(appCtx, configuration, includeExcludeMatcher, warningCollector,
                     externalDataPrefix, evaluator);
             path = buildPathURIs(container, filesOnly);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index e758f64..1de2cd2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -39,6 +39,7 @@
     // used to specify the stream factory for an adapter that has a stream data source
     public static final String KEY_STREAM = "stream";
     //TODO(DB): check adapter configuration
+    public static final String KEY_DATASET = "dataset";
     public static final String KEY_DATASET_DATABASE = "dataset-database";
     // used to specify the dataverse of the adapter
     public static final String KEY_DATASET_DATAVERSE = "dataset-dataverse";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 8c72a8c..1d5f2ed 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -74,8 +74,8 @@
 import org.apache.asterix.external.library.JavaLibrary;
 import org.apache.asterix.external.library.msgpack.MessagePackUtils;
 import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.asterix.external.util.azure.blob_storage.AzureConstants;
 import org.apache.asterix.external.util.google.gcs.GCSConstants;
 import org.apache.asterix.om.types.ARecordType;
@@ -675,7 +675,7 @@
 
         switch (type) {
             case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
-                S3Utils.validateProperties(configuration, srcLoc, collector);
+                S3AuthUtils.validateProperties(appCtx, configuration, srcLoc, collector);
                 break;
             case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
                 validateAzureBlobProperties(configuration, srcLoc, collector, appCtx);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
new file mode 100644
index 0000000..2ba1844
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.aws.s3;
+
+import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
+import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
+import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
+import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
+import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_METHOD_NOT_IMPLEMENTED;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_SLOW_DOWN;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.EXTERNAL_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ACCESS_KEY_ID;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS_ACCESS;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_CONNECTION_POOL_SIZE;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_PROTOCOL;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SECRET_ACCESS_KEY;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SERVICE_END_POINT;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SESSION_TOKEN;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMP_ACCESS;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.INSTANCE_PROFILE_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.REGION_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ROLE_ARN_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.SECRET_ACCESS_KEY_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.SESSION_TOKEN_FIELD_NAME;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataPrefix;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.util.CleanupUtils;
+
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Response;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+public class S3AuthUtils {
+    private S3AuthUtils() {
+        throw new AssertionError("do not instantiate");
+    }
+
+    public static boolean isRetryableError(String errorCode) {
+        return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
+    }
+
+    /**
+     * Builds the S3 client using the provided configuration
+     *
+     * @param configuration properties
+     * @return S3 client
+     * @throws CompilationException CompilationException
+     */
+    public static S3Client buildAwsS3Client(IApplicationContext appCtx, Map<String, String> configuration)
+            throws CompilationException {
+        String regionId = configuration.get(REGION_FIELD_NAME);
+        String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
+
+        Region region = validateAndGetRegion(regionId);
+        AwsCredentialsProvider credentialsProvider = buildCredentialsProvider(appCtx, configuration);
+
+        S3ClientBuilder builder = S3Client.builder();
+        builder.region(region);
+        builder.crossRegionAccessEnabled(true);
+        builder.credentialsProvider(credentialsProvider);
+
+        // Validate the service endpoint if present
+        if (serviceEndpoint != null) {
+            try {
+                URI uri = new URI(serviceEndpoint);
+                try {
+                    builder.endpointOverride(uri);
+                } catch (NullPointerException ex) {
+                    throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+                }
+            } catch (URISyntaxException ex) {
+                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
+                        String.format("Invalid service endpoint %s", serviceEndpoint));
+            }
+        }
+
+        return builder.build();
+    }
+
+    public static AwsCredentialsProvider buildCredentialsProvider(IApplicationContext appCtx,
+            Map<String, String> configuration) throws CompilationException {
+        String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
+        String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
+        String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+
+        if (noAuth(configuration)) {
+            return AnonymousCredentialsProvider.create();
+        } else if (arnRole != null) {
+            return getTrustAccountCredentials(appCtx, configuration);
+        } else if (instanceProfile != null) {
+            return getInstanceProfileCredentials(configuration);
+        } else if (accessKeyId != null || secretAccessKey != null) {
+            return getAccessKeyCredentials(configuration);
+        } else {
+            if (externalId != null) {
+                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
+                        EXTERNAL_ID_FIELD_NAME);
+            } else {
+                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
+                        SESSION_TOKEN_FIELD_NAME);
+            }
+        }
+    }
+
+    public static Region validateAndGetRegion(String regionId) throws CompilationException {
+        List<Region> regions = S3Client.serviceMetadata().regions();
+        Optional<Region> selectedRegion = regions.stream().filter(region -> region.id().equals(regionId)).findFirst();
+
+        if (selectedRegion.isEmpty()) {
+            throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
+        }
+        return selectedRegion.get();
+    }
+
+    private static boolean noAuth(Map<String, String> configuration) {
+        return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME,
+                ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, SESSION_TOKEN_FIELD_NAME) == null;
+    }
+
+    /**
+     * Returns the cached credentials if valid, otherwise, generates new credentials by assume a role
+     *
+     * @param appCtx application context
+     * @param configuration configuration
+     * @return returns the cached credentials if valid, otherwise, generates new credentials by assume a role
+     * @throws CompilationException CompilationException
+     */
+    public static AwsCredentialsProvider getTrustAccountCredentials(IApplicationContext appCtx,
+            Map<String, String> configuration) throws CompilationException {
+        IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
+        Object credentialsObject = cache.getCredentials(configuration);
+        if (credentialsObject != null) {
+            return () -> (AwsSessionCredentials) credentialsObject;
+        }
+        IExternalCredentialsCacheUpdater cacheUpdater = appCtx.getExternalCredentialsCacheUpdater();
+        AwsSessionCredentials credentials;
+        try {
+            credentials = (AwsSessionCredentials) cacheUpdater.generateAndCacheCredentials(configuration);
+        } catch (HyracksDataException ex) {
+            throw new CompilationException(ErrorCode.FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION, ex, ex.getMessage());
+        }
+
+        return () -> credentials;
+    }
+
+    /**
+     * Assume role using provided credentials and return the new credentials
+     *
+     * @param configuration configuration
+     * @return return credentials from the assume role
+     * @throws CompilationException CompilationException
+     */
+    public static AwsCredentialsProvider assumeRoleAndGetCredentials(Map<String, String> configuration)
+            throws CompilationException {
+        String regionId = configuration.get(REGION_FIELD_NAME);
+        String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
+        String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
+        Region region = validateAndGetRegion(regionId);
+
+        AssumeRoleRequest.Builder builder = AssumeRoleRequest.builder();
+        builder.roleArn(arnRole);
+        builder.roleSessionName(UUID.randomUUID().toString());
+        builder.durationSeconds(900); // TODO(htowaileb): configurable? Can be 900 to 43200 (15 mins to 12 hours)
+        if (externalId != null) {
+            builder.externalId(externalId);
+        }
+
+        // credentials to be used to assume the role
+        AwsCredentialsProvider credentialsProvider;
+        AssumeRoleRequest request = builder.build();
+        String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+        if ("true".equalsIgnoreCase(instanceProfile)) {
+            credentialsProvider = getInstanceProfileCredentials(configuration, true);
+        } else if (accessKeyId != null && secretAccessKey != null) {
+            credentialsProvider = getAccessKeyCredentials(configuration, true);
+        } else {
+            throw new CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION);
+        }
+
+        // assume the role from the provided arn
+        try (StsClient stsClient =
+                StsClient.builder().region(region).credentialsProvider(credentialsProvider).build()) {
+            AssumeRoleResponse response = stsClient.assumeRole(request);
+            Credentials credentials = response.credentials();
+            return StaticCredentialsProvider.create(AwsSessionCredentials.create(credentials.accessKeyId(),
+                    credentials.secretAccessKey(), credentials.sessionToken()));
+        } catch (SdkException ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+        }
+    }
+
+    private static AwsCredentialsProvider getInstanceProfileCredentials(Map<String, String> configuration)
+            throws CompilationException {
+        return getInstanceProfileCredentials(configuration, false);
+    }
+
+    private static AwsCredentialsProvider getInstanceProfileCredentials(Map<String, String> configuration,
+            boolean assumeRoleAuthentication) throws CompilationException {
+        String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+
+        // only "true" value is allowed
+        if (!"true".equalsIgnoreCase(instanceProfile)) {
+            throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, INSTANCE_PROFILE_FIELD_NAME, "true");
+        }
+
+        if (!assumeRoleAuthentication) {
+            String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
+                    SESSION_TOKEN_FIELD_NAME);
+            if (notAllowed != null) {
+                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
+                        INSTANCE_PROFILE_FIELD_NAME);
+            }
+        }
+        return InstanceProfileCredentialsProvider.create();
+    }
+
+    private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, String> configuration)
+            throws CompilationException {
+        return getAccessKeyCredentials(configuration, false);
+    }
+
+    private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, String> configuration,
+            boolean assumeRoleAuthentication) throws CompilationException {
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+        String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
+
+        if (accessKeyId == null) {
+            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
+                    SECRET_ACCESS_KEY_FIELD_NAME);
+        }
+        if (secretAccessKey == null) {
+            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
+                    ACCESS_KEY_ID_FIELD_NAME);
+        }
+
+        if (!assumeRoleAuthentication) {
+            String notAllowed = getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, EXTERNAL_ID_FIELD_NAME);
+            if (notAllowed != null) {
+                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
+                        INSTANCE_PROFILE_FIELD_NAME);
+            }
+        }
+
+        // use session token if provided
+        if (sessionToken != null) {
+            return StaticCredentialsProvider
+                    .create(AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
+        } else {
+            return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey));
+        }
+    }
+
+    private static String getNonNull(Map<String, String> configuration, String... fieldNames) {
+        for (String fieldName : fieldNames) {
+            if (configuration.get(fieldName) != null) {
+                return fieldName;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Builds the S3 client using the provided configuration
+     *
+     * @param configuration      properties
+     * @param numberOfPartitions number of partitions in the cluster
+     */
+    public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
+            int numberOfPartitions) {
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+        String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
+        String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
+
+        //Disable caching S3 FileSystem
+        HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
+
+        /*
+         * Authentication Methods:
+         * 1- Anonymous: no accessKeyId and no secretAccessKey
+         * 2- Temporary: has to provide accessKeyId, secretAccessKey and sessionToken
+         * 3- Private: has to provide accessKeyId and secretAccessKey
+         */
+        if (accessKeyId == null) {
+            //Tells hadoop-aws it is an anonymous access
+            conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS);
+        } else {
+            conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId);
+            conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey);
+            if (sessionToken != null) {
+                conf.set(HADOOP_SESSION_TOKEN, sessionToken);
+                //Tells hadoop-aws it is a temporary access
+                conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS);
+            }
+        }
+
+        /*
+         * This is to allow S3 definition to have path-style form. Should always be true to match the current
+         * way we access files in S3
+         */
+        conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
+
+        /*
+         * Set the size of S3 connection pool to be the number of partitions
+         */
+        conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
+
+        if (serviceEndpoint != null) {
+            // Validation of the URL should be done at hadoop-aws level
+            conf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
+        } else {
+            //Region is ignored and buckets could be found by the central endpoint
+            conf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
+        }
+    }
+
+    /**
+     * Validate external dataset properties
+     *
+     * @param configuration properties
+     * @throws CompilationException Compilation exception
+     */
+    public static void validateProperties(IApplicationContext appCtx, Map<String, String> configuration,
+            SourceLocation srcLoc, IWarningCollector collector) throws CompilationException {
+        if (isDeltaTable(configuration)) {
+            validateDeltaTableProperties(configuration);
+        }
+        // check if the format property is present
+        else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
+        }
+
+        String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
+        String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+
+        if (arnRole != null) {
+            return;
+        } else if (externalId != null) {
+            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
+                    EXTERNAL_ID_FIELD_NAME);
+        } else if (accessKeyId == null || secretAccessKey == null) {
+            // If one is passed, the other is required
+            if (accessKeyId != null) {
+                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
+                        ACCESS_KEY_ID_FIELD_NAME);
+            } else if (secretAccessKey != null) {
+                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
+                        SECRET_ACCESS_KEY_FIELD_NAME);
+            }
+        }
+
+        validateIncludeExclude(configuration);
+        try {
+            // TODO(htowaileb): maybe something better, this will check to ensure type is supported before creation
+            new ExternalDataPrefix(configuration);
+        } catch (AlgebricksException ex) {
+            throw new CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex);
+        }
+
+        // Check if the bucket is present
+        S3Client s3Client = buildAwsS3Client(appCtx, configuration);
+        S3Response response;
+        boolean useOldApi = false;
+        String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+        String prefix = getPrefix(configuration);
+
+        try {
+            response = S3Utils.isBucketEmpty(s3Client, container, prefix, false);
+        } catch (S3Exception ex) {
+            // Method not implemented, try falling back to old API
+            try {
+                // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
+                if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
+                    useOldApi = true;
+                    response = S3Utils.isBucketEmpty(s3Client, container, prefix, true);
+                } else {
+                    throw ex;
+                }
+            } catch (SdkException ex2) {
+                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2, getMessageOrToString(ex));
+            }
+        } catch (SdkException ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+        } finally {
+            if (s3Client != null) {
+                CleanupUtils.close(s3Client, null);
+            }
+        }
+
+        boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty()
+                : ((ListObjectsV2Response) response).contents().isEmpty();
+        if (isEmpty && collector.shouldWarn()) {
+            Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+            collector.warn(warning);
+        }
+
+        // Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to
+        // ensure coverage, check if the result is successful as well and not only catch exceptions
+        if (!response.sdkHttpResponse().isSuccessful()) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 3cfccb4..a2b50e1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -18,48 +18,18 @@
  */
 package org.apache.asterix.external.util.aws.s3;
 
-import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
-import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
-import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
-import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
-import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
-import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
-import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_METHOD_NOT_IMPLEMENTED;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_SLOW_DOWN;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.EXTERNAL_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ACCESS_KEY_ID;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS_ACCESS;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_CONNECTION_POOL_SIZE;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_PROTOCOL;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SECRET_ACCESS_KEY;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SERVICE_END_POINT;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SESSION_TOKEN;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMP_ACCESS;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.INSTANCE_PROFILE_FIELD_NAME;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.REGION_FIELD_NAME;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.ROLE_ARN_FIELD_NAME;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.SECRET_ACCESS_KEY_FIELD_NAME;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.SESSION_TOKEN_FIELD_NAME;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
 import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.external.IExternalFilterEvaluator;
@@ -67,26 +37,13 @@
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.hadoop.fs.s3a.Constants;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.api.util.CleanupUtils;
 
-import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.core.exception.SdkException;
-import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3ClientBuilder;
 import software.amazon.awssdk.services.s3.model.CommonPrefix;
 import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
 import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
@@ -96,233 +53,12 @@
 import software.amazon.awssdk.services.s3.model.S3Object;
 import software.amazon.awssdk.services.s3.model.S3Response;
 import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
-import software.amazon.awssdk.services.sts.StsClient;
-import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
-import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
-import software.amazon.awssdk.services.sts.model.Credentials;
 
 public class S3Utils {
     private S3Utils() {
         throw new AssertionError("do not instantiate");
     }
 
-    public static boolean isRetryableError(String errorCode) {
-        return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
-    }
-
-    /**
-     * Builds the S3 client using the provided configuration
-     *
-     * @param configuration properties
-     * @return S3 client
-     * @throws CompilationException CompilationException
-     */
-    public static S3Client buildAwsS3Client(Map<String, String> configuration) throws CompilationException {
-        String regionId = configuration.get(REGION_FIELD_NAME);
-        String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
-
-        Region region = validateAndGetRegion(regionId);
-        AwsCredentialsProvider credentialsProvider = buildCredentialsProvider(configuration);
-
-        S3ClientBuilder builder = S3Client.builder();
-        builder.region(region);
-        builder.credentialsProvider(credentialsProvider);
-
-        // Validate the service endpoint if present
-        if (serviceEndpoint != null) {
-            try {
-                URI uri = new URI(serviceEndpoint);
-                try {
-                    builder.endpointOverride(uri);
-                } catch (NullPointerException ex) {
-                    throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
-                }
-            } catch (URISyntaxException ex) {
-                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
-                        String.format("Invalid service endpoint %s", serviceEndpoint));
-            }
-        }
-
-        return builder.build();
-    }
-
-    public static AwsCredentialsProvider buildCredentialsProvider(Map<String, String> configuration)
-            throws CompilationException {
-        String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
-        String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
-        String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-
-        if (noAuth(configuration)) {
-            return AnonymousCredentialsProvider.create();
-        } else if (arnRole != null) {
-            // TODO: Do auth validation and use existing credentials if exist already, if not, assume the role
-            return validateAndGetTrustAccountAuthentication(configuration);
-        } else if (instanceProfile != null) {
-            return validateAndGetInstanceProfileAuthentication(configuration);
-        } else if (accessKeyId != null || secretAccessKey != null) {
-            return validateAndGetAccessKeysAuthentications(configuration);
-        } else {
-            if (externalId != null) {
-                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
-                        EXTERNAL_ID_FIELD_NAME);
-            } else {
-                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
-                        SESSION_TOKEN_FIELD_NAME);
-            }
-        }
-    }
-
-    /**
-     * Builds the S3 client using the provided configuration
-     *
-     * @param configuration      properties
-     * @param numberOfPartitions number of partitions in the cluster
-     */
-    public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
-            int numberOfPartitions) {
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-        String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
-        String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
-
-        //Disable caching S3 FileSystem
-        HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
-
-        /*
-         * Authentication Methods:
-         * 1- Anonymous: no accessKeyId and no secretAccessKey
-         * 2- Temporary: has to provide accessKeyId, secretAccessKey and sessionToken
-         * 3- Private: has to provide accessKeyId and secretAccessKey
-         */
-        if (accessKeyId == null) {
-            //Tells hadoop-aws it is an anonymous access
-            conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS);
-        } else {
-            conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId);
-            conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey);
-            if (sessionToken != null) {
-                conf.set(HADOOP_SESSION_TOKEN, sessionToken);
-                //Tells hadoop-aws it is a temporary access
-                conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS);
-            }
-        }
-
-        /*
-         * This is to allow S3 definition to have path-style form. Should always be true to match the current
-         * way we access files in S3
-         */
-        conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
-
-        /*
-         * Set the size of S3 connection pool to be the number of partitions
-         */
-        conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
-
-        if (serviceEndpoint != null) {
-            // Validation of the URL should be done at hadoop-aws level
-            conf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        } else {
-            //Region is ignored and buckets could be found by the central endpoint
-            conf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
-        }
-    }
-
-    /**
-     * Validate external dataset properties
-     *
-     * @param configuration properties
-     * @throws CompilationException Compilation exception
-     */
-    public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
-            IWarningCollector collector) throws CompilationException {
-        if (isDeltaTable(configuration)) {
-            validateDeltaTableProperties(configuration);
-        }
-        // check if the format property is present
-        else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
-            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
-        }
-
-        String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
-        String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-
-        if (arnRole != null) {
-            String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
-                    SESSION_TOKEN_FIELD_NAME);
-            if (notAllowed != null) {
-                throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                        INSTANCE_PROFILE_FIELD_NAME);
-            }
-        } else if (externalId != null) {
-            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
-                    EXTERNAL_ID_FIELD_NAME);
-        } else if (accessKeyId == null || secretAccessKey == null) {
-            // If one is passed, the other is required
-            if (accessKeyId != null) {
-                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
-                        ACCESS_KEY_ID_FIELD_NAME);
-            } else if (secretAccessKey != null) {
-                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
-                        SECRET_ACCESS_KEY_FIELD_NAME);
-            }
-        }
-
-        validateIncludeExclude(configuration);
-        try {
-            // TODO(htowaileb): maybe something better, this will check to ensure type is supported before creation
-            new ExternalDataPrefix(configuration);
-        } catch (AlgebricksException ex) {
-            throw new CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex);
-        }
-
-        // Check if the bucket is present
-        S3Client s3Client = buildAwsS3Client(configuration);
-        S3Response response;
-        boolean useOldApi = false;
-        String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        String prefix = getPrefix(configuration);
-
-        try {
-            response = isBucketEmpty(s3Client, container, prefix, false);
-        } catch (S3Exception ex) {
-            // Method not implemented, try falling back to old API
-            try {
-                // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-                if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
-                    useOldApi = true;
-                    response = isBucketEmpty(s3Client, container, prefix, true);
-                } else {
-                    throw ex;
-                }
-            } catch (SdkException ex2) {
-                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2, getMessageOrToString(ex));
-            }
-        } catch (SdkException ex) {
-            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
-        } finally {
-            if (s3Client != null) {
-                CleanupUtils.close(s3Client, null);
-            }
-        }
-
-        boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty()
-                : ((ListObjectsV2Response) response).contents().isEmpty();
-        if (isEmpty && collector.shouldWarn()) {
-            Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-            collector.warn(warning);
-        }
-
-        // Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to
-        // ensure coverage, check if the result is successful as well and not only catch exceptions
-        if (!response.sdkHttpResponse().isSuccessful()) {
-            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
-        }
-    }
-
     /**
      * Checks for a single object in the specified bucket to determine if the bucket is empty or not.
      *
@@ -332,7 +68,7 @@
      * @param useOldApi flag whether to use the old API or not
      * @return returns the S3 response
      */
-    private static S3Response isBucketEmpty(S3Client s3Client, String container, String prefix, boolean useOldApi) {
+    protected static S3Response isBucketEmpty(S3Client s3Client, String container, String prefix, boolean useOldApi) {
         S3Response response;
         if (useOldApi) {
             ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder();
@@ -352,14 +88,14 @@
      * @param configuration         properties
      * @param includeExcludeMatcher include/exclude matchers to apply
      */
-    public static List<S3Object> listS3Objects(Map<String, String> configuration,
+    public static List<S3Object> listS3Objects(IApplicationContext appCtx, Map<String, String> configuration,
             AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
             IWarningCollector warningCollector, ExternalDataPrefix externalDataPrefix,
             IExternalFilterEvaluator evaluator) throws CompilationException, HyracksDataException {
         // Prepare to retrieve the objects
         List<S3Object> filesOnly;
         String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        S3Client s3Client = buildAwsS3Client(configuration);
+        S3Client s3Client = S3AuthUtils.buildAwsS3Client(appCtx, configuration);
         String prefix = getPrefix(configuration);
 
         try {
@@ -502,10 +238,11 @@
         }
     }
 
-    public static Map<String, List<String>> S3ObjectsOfSingleDepth(Map<String, String> configuration, String container,
-            String prefix) throws CompilationException {
+    public static Map<String, List<String>> S3ObjectsOfSingleDepth(IApplicationContext appCtx,
+            Map<String, String> configuration, String container, String prefix)
+            throws CompilationException, HyracksDataException {
         // create s3 client
-        S3Client s3Client = buildAwsS3Client(configuration);
+        S3Client s3Client = S3AuthUtils.buildAwsS3Client(appCtx, configuration);
         // fetch all the s3 objects
         return listS3ObjectsOfSingleDepth(s3Client, container, prefix);
     }
@@ -555,116 +292,4 @@
         allObjects.put("folders", folders);
         return allObjects;
     }
-
-    public static Region validateAndGetRegion(String regionId) throws CompilationException {
-        List<Region> regions = S3Client.serviceMetadata().regions();
-        Optional<Region> selectedRegion = regions.stream().filter(region -> region.id().equals(regionId)).findFirst();
-
-        if (selectedRegion.isEmpty()) {
-            throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
-        }
-        return selectedRegion.get();
-    }
-
-    // TODO(htowaileb): Currently, trust-account is always assuming we have instance profile setup in place
-    private static AwsCredentialsProvider validateAndGetTrustAccountAuthentication(Map<String, String> configuration)
-            throws CompilationException {
-        String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
-                SESSION_TOKEN_FIELD_NAME);
-        if (notAllowed != null) {
-            throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                    INSTANCE_PROFILE_FIELD_NAME);
-        }
-
-        String regionId = configuration.get(REGION_FIELD_NAME);
-        String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
-        String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
-        Region region = validateAndGetRegion(regionId);
-
-        AssumeRoleRequest.Builder builder = AssumeRoleRequest.builder();
-        builder.roleArn(arnRole);
-        builder.roleSessionName(UUID.randomUUID().toString());
-        builder.durationSeconds(900); // minimum role assume duration = 900 seconds (15 minutes), make configurable?
-        if (externalId != null) {
-            builder.externalId(externalId);
-        }
-        AssumeRoleRequest request = builder.build();
-        AwsCredentialsProvider credentialsProvider = validateAndGetInstanceProfileAuthentication(configuration);
-
-        // TODO(htowaileb): We shouldn't assume role with each request, rather stored the received temporary credentials
-        // and refresh when expired.
-        // assume the role from the provided arn
-        try (StsClient stsClient =
-                StsClient.builder().region(region).credentialsProvider(credentialsProvider).build()) {
-            AssumeRoleResponse response = stsClient.assumeRole(request);
-            Credentials credentials = response.credentials();
-            return StaticCredentialsProvider.create(AwsSessionCredentials.create(credentials.accessKeyId(),
-                    credentials.secretAccessKey(), credentials.sessionToken()));
-        } catch (SdkException ex) {
-            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
-        }
-    }
-
-    private static AwsCredentialsProvider validateAndGetInstanceProfileAuthentication(Map<String, String> configuration)
-            throws CompilationException {
-        String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
-
-        // only "true" value is allowed
-        if (!"true".equalsIgnoreCase(instanceProfile)) {
-            throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, INSTANCE_PROFILE_FIELD_NAME, "true");
-        }
-
-        String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
-                SESSION_TOKEN_FIELD_NAME);
-        if (notAllowed != null) {
-            throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                    INSTANCE_PROFILE_FIELD_NAME);
-        }
-        return InstanceProfileCredentialsProvider.create();
-    }
-
-    private static AwsCredentialsProvider validateAndGetAccessKeysAuthentications(Map<String, String> configuration)
-            throws CompilationException {
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-        String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
-
-        // accessKeyId authentication
-        if (accessKeyId == null) {
-            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
-                    SECRET_ACCESS_KEY_FIELD_NAME);
-        }
-        if (secretAccessKey == null) {
-            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
-                    ACCESS_KEY_ID_FIELD_NAME);
-        }
-
-        String notAllowed = getNonNull(configuration, EXTERNAL_ID_FIELD_NAME);
-        if (notAllowed != null) {
-            throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
-                    INSTANCE_PROFILE_FIELD_NAME);
-        }
-
-        // use session token if provided
-        if (sessionToken != null) {
-            return StaticCredentialsProvider
-                    .create(AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
-        } else {
-            return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey));
-        }
-    }
-
-    private static boolean noAuth(Map<String, String> configuration) {
-        return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME,
-                ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, SESSION_TOKEN_FIELD_NAME) == null;
-    }
-
-    private static String getNonNull(Map<String, String> configuration, String... fieldNames) {
-        for (String fieldName : fieldNames) {
-            if (configuration.get(fieldName) != null) {
-                return fieldName;
-            }
-        }
-        return null;
-    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
index dc20a89..a4113d1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
@@ -26,6 +26,7 @@
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -123,7 +124,7 @@
     }
 
     @Override
-    public void validate() throws AlgebricksException {
+    public void validate(IApplicationContext appCtx) throws AlgebricksException {
         Configuration conf = HDFSUtils.configureHDFSwrite(configuration);
         credentials = HDFSUtils.configureHadoopAuthentication(configuration, conf);
         try {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
index bdefaa8..b1d3a95 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
@@ -20,6 +20,7 @@
 
 import java.io.File;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -80,7 +81,7 @@
     }
 
     @Override
-    public void validate() throws AlgebricksException {
+    public void validate(IApplicationContext appCtx) throws AlgebricksException {
         // A special case validation for a single node cluster
         if (singleNodeCluster && staticPath != null) {
             if (isNonEmptyDirectory(new File(staticPath))) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index c152853..ef056c9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -994,6 +994,7 @@
             Map<String, String> configuration, ARecordType itemType, IWarningCollector warningCollector,
             IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException {
         try {
+            configuration.put(ExternalDataConstants.KEY_DATASET, dataset.getDatasetName());
             configuration.put(ExternalDataConstants.KEY_DATASET_DATABASE, dataset.getDatabaseName());
             configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE,
                     dataset.getDataverseName().getCanonicalForm());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 1caea58..e6716df 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -148,7 +148,7 @@
         String staticPath = staticPathExpr != null ? ConstantExpressionUtil.getStringConstant(staticPathExpr) : null;
         IExternalFileWriterFactory fileWriterFactory =
                 ExternalWriterProvider.createWriterFactory(appCtx, sink, staticPath, pathSourceLocation);
-        fileWriterFactory.validate();
+        fileWriterFactory.validate(appCtx);
         String fileExtension = ExternalWriterProvider.getFileExtension(sink);
         int maxResult = ExternalWriterProvider.getMaxResult(sink);
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
index 4a75db6..2aeca4f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
@@ -18,11 +18,12 @@
  */
 package org.apache.asterix.runtime.writer;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 
 public interface IExternalWriterFactoryValidator {
     /**
      * Perform the necessary validation to ensure the writer has the proper permissions
      */
-    void validate() throws AlgebricksException;
+    void validate(IApplicationContext appCtx) throws AlgebricksException;
 }