[ASTERIXDB-3514][EXT]: Pass entity-id for all creds required operations

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Pass entity-id to uniquely identify the cached
  credentials for all external operations.

Ext-ref: MB-63505
Change-Id: If5bd9ed7c1fc0aedba7f7bb90acb48c2edf3d580
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19365
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
index 8cb3a47..ed26f7c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
@@ -24,14 +24,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.api.IApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.external.IExternalCredentialsCache;
-import org.apache.asterix.common.metadata.DatasetFullyQualifiedName;
-import org.apache.asterix.common.metadata.DataverseName;
-import org.apache.asterix.common.metadata.IFullyQualifiedName;
-import org.apache.asterix.common.metadata.MetadataConstants;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
 import org.apache.commons.lang3.tuple.Pair;
@@ -46,41 +39,35 @@
     private static final Logger LOGGER = LogManager.getLogger();
     private final ConcurrentMap<String, Pair<Span, Object>> cache = new ConcurrentHashMap<>();
     private final int awsAssumeRoleDuration;
-    private final double refreshAwsAssumeRoleThreshold;
+    private final int refreshAwsAssumeRoleThresholdPercentage;
 
     public ExternalCredentialsCache(IApplicationContext appCtx) {
         this.awsAssumeRoleDuration = appCtx.getExternalProperties().getAwsAssumeRoleDuration();
-        this.refreshAwsAssumeRoleThreshold = appCtx.getExternalProperties().getAwsRefreshAssumeRoleThreshold();
+        this.refreshAwsAssumeRoleThresholdPercentage =
+                appCtx.getExternalProperties().getAwsRefreshAssumeRoleThresholdPercentage();
     }
 
     @Override
-    public synchronized Object getCredentials(Map<String, String> configuration) throws CompilationException {
-        IFullyQualifiedName fqn = getFullyQualifiedNameFromConfiguration(configuration);
-        return getCredentials(fqn);
-    }
-
-    @Override
-    public synchronized Object getCredentials(IFullyQualifiedName fqn) {
-        String name = getName(fqn);
-        if (cache.containsKey(name) && !needsRefresh(cache.get(name).getLeft())) {
-            return cache.get(name).getRight();
+    public synchronized Object get(String key) {
+        invalidateCache();
+        if (cache.containsKey(key)) {
+            return cache.get(key).getRight();
         }
         return null;
     }
 
     @Override
-    public synchronized void updateCache(Map<String, String> configuration, Map<String, String> credentials)
-            throws CompilationException {
-        IFullyQualifiedName fqn = getFullyQualifiedNameFromConfiguration(configuration);
-        String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
-        updateCache(fqn, type, credentials);
+    public void delete(String key) {
+        Object removed = cache.remove(key);
+        if (removed != null) {
+            LOGGER.info("Removed cached credentials for {} because it got deleted", key);
+        }
     }
 
     @Override
-    public synchronized void updateCache(IFullyQualifiedName fqn, String type, Map<String, String> credentials) {
-        String name = getName(fqn);
+    public synchronized void put(String key, String type, Map<String, String> credentials) {
         if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equalsIgnoreCase(type)) {
-            updateAwsCache(name, credentials);
+            updateAwsCache(key, credentials);
         }
     }
 
@@ -88,34 +75,23 @@
         String accessKeyId = credentials.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME);
         String secretAccessKey = credentials.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
         String sessionToken = credentials.get(S3Constants.SESSION_TOKEN_FIELD_NAME);
-        doUpdateAwsCache(name, AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
-    }
 
-    private void doUpdateAwsCache(String name, AwsSessionCredentials credentials) {
-        cache.put(name, Pair.of(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS), credentials));
+        AwsSessionCredentials sessionCreds = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken);
+        cache.put(name, Pair.of(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS), sessionCreds));
         LOGGER.info("Received and cached new credentials for {}", name);
     }
 
-    @Override
-    public void deleteCredentials(IFullyQualifiedName fqn) {
-        String name = getName(fqn);
-        Object removed = cache.remove(name);
-        if (removed != null) {
-            LOGGER.info("Removed cached credentials for {}", name);
-        } else {
-            LOGGER.info("No cached credentials found for {}, nothing to remove", name);
-        }
-    }
-
-    @Override
-    public String getName(Map<String, String> configuration) throws CompilationException {
-        IFullyQualifiedName fqn = getFullyQualifiedNameFromConfiguration(configuration);
-        return getName(fqn);
-    }
-
-    @Override
-    public String getName(IFullyQualifiedName fqn) {
-        return fqn.toString();
+    /**
+     * Iterates the cache and removes the credentials that are considered expired
+     */
+    private void invalidateCache() {
+        cache.entrySet().removeIf(entry -> {
+            boolean shouldRemove = needsRefresh(entry.getValue().getLeft());
+            if (shouldRemove) {
+                LOGGER.info("Removing cached credentials for {} because it expired", entry.getKey());
+            }
+            return shouldRemove;
+        });
     }
 
     /**
@@ -125,27 +101,9 @@
      * @return true if the remaining time is less than the configured refresh percentage, false otherwise
      */
     private boolean needsRefresh(Span span) {
-        return (double) span.remaining(TimeUnit.SECONDS)
-                / span.getSpan(TimeUnit.SECONDS) < refreshAwsAssumeRoleThreshold;
-    }
-
-    protected IFullyQualifiedName getFullyQualifiedNameFromConfiguration(Map<String, String> configuration)
-            throws CompilationException {
-        String database = configuration.get(ExternalDataConstants.KEY_DATASET_DATABASE);
-        if (database == null) {
-            database = MetadataConstants.DEFAULT_DATABASE;
-        }
-        String stringDataverse = configuration.get(ExternalDataConstants.KEY_DATASET_DATAVERSE);
-        DataverseName dataverse = getDataverseName(stringDataverse);
-        String dataset = configuration.get(ExternalDataConstants.KEY_DATASET);
-        return new DatasetFullyQualifiedName(database, dataverse, dataset);
-    }
-
-    protected DataverseName getDataverseName(String dataverse) throws CompilationException {
-        try {
-            return DataverseName.createSinglePartName(dataverse);
-        } catch (AsterixException ex) {
-            throw new CompilationException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, dataverse);
-        }
+        double remaining = (double) span.remaining(TimeUnit.SECONDS) / span.getSpan(TimeUnit.SECONDS);
+        double passed = 1 - remaining;
+        int passedPercentage = (int) (passed * 100);
+        return passedPercentage > refreshAwsAssumeRoleThresholdPercentage;
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
index f07caaa..29e3239 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
@@ -40,6 +40,7 @@
 import org.apache.asterix.common.external.IExternalCredentialsCache;
 import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
 import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
 import org.apache.asterix.messaging.CCMessageBroker;
@@ -65,7 +66,8 @@
     public synchronized Object generateAndCacheCredentials(Map<String, String> configuration)
             throws HyracksDataException, CompilationException {
         IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
-        Object credentials = cache.getCredentials(configuration);
+        String name = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
+        Object credentials = cache.get(name);
         if (credentials != null) {
             return credentials;
         }
@@ -74,9 +76,7 @@
          * if we are the CC, generate new creds and ask all NCs to update their cache
          * if we are the NC, send a message to the CC to generate new creds and ask all NCs to update their cache
          */
-        String name = cache.getName(configuration);
-        if (appCtx instanceof ICcApplicationContext) {
-            ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx;
+        if (appCtx instanceof ICcApplicationContext ccAppCtx) {
             IClusterManagementWork.ClusterState state = ccAppCtx.getClusterStateManager().getState();
             if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) {
                 throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, state);
@@ -106,7 +106,8 @@
 
             // request all NCs to update their credentials cache with the latest creds
             updateNcsCredentialsCache(ccAppCtx, name, credentialsMap, configuration);
-            cache.updateCache(configuration, credentialsMap);
+            String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
+            cache.put(name, type, credentialsMap);
             credentials = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken);
         } else {
             NCMessageBroker broker = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
index d1a9ebf..438e425 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
@@ -21,9 +21,8 @@
 import java.util.Map;
 
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -40,13 +39,10 @@
     }
 
     @Override
-    public void handle(INcApplicationContext appCtx) throws HyracksDataException {
-        try {
-            appCtx.getExternalCredentialsCache().updateCache(configuration, credentials);
-        } catch (CompilationException ex) {
-            LOGGER.info("Failed to process request", ex);
-            throw HyracksDataException.create(ex);
-        }
+    public void handle(INcApplicationContext appCtx) {
+        String name = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
+        String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
+        appCtx.getExternalCredentialsCache().put(name, type, credentials);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 51371da..97bc3cd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -274,6 +274,7 @@
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.result.IResultSet;
 import org.apache.hyracks.api.result.ResultSetId;
+import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
@@ -578,11 +579,17 @@
             // async queries are completed after their job completes
             if (ResultDelivery.ASYNC != resultDelivery) {
                 appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid());
+                postRequestCompleteCleanup(requestParameters);
             }
             Thread.currentThread().setName(threadName);
         }
     }
 
+    protected void postRequestCompleteCleanup(IRequestParameters requestParameters) {
+        String uuid = requestParameters.getRequestReference().getUuid();
+        appCtx.getExternalCredentialsCache().delete(uuid);
+    }
+
     protected void configureMetadataProvider(MetadataProvider metadataProvider, Map<String, String> config,
             Counter resultSetIdCounter, FileSplit outputFile, IRequestParameters requestParameters,
             Statement statement) {
@@ -1031,8 +1038,12 @@
                     ExternalDataUtils.normalize(properties);
                     ExternalDataUtils.validate(properties);
                     ExternalDataUtils.validateType(properties, (ARecordType) itemType);
-                    validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx,
-                            appCtx, metadataProvider);
+                    Map<String, String> propertiesCopy = preparePropertiesCopyForValidation(externalDetails, properties,
+                            dd.getSourceLocation(), mdTxnCtx, appCtx, metadataProvider);
+                    // do any necessary validation on the copy to avoid changing the original and storing it in the metadata
+                    metadataProvider.setExternalEntityIdFromParts(propertiesCopy, databaseName, dataverseName,
+                            datasetName, false);
+                    validateAdapterSpecificProperties(propertiesCopy, dd.getSourceLocation(), appCtx);
                     datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
                             TransactionState.COMMIT);
                     break;
@@ -2448,9 +2459,7 @@
                     sourceLoc, EnumSet.of(DropOption.IF_EXISTS), requestParameters.isForceDropDataset());
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
-            if (ds.getDatasetType().equals(DatasetType.EXTERNAL)) {
-                appCtx.getExternalCredentialsCache().deleteCredentials(ds.getDatasetFullyQualifiedName());
-            }
+            deleteDatasetCachedCredentials(ds);
             return true;
         } catch (Exception e) {
             LOGGER.error("failed to drop dataset; executing compensating operations", e);
@@ -2498,6 +2507,10 @@
         }
     }
 
+    protected void deleteDatasetCachedCredentials(Dataset dataset) throws CompilationException {
+        appCtx.getExternalCredentialsCache().delete(dataset.getDatasetFullyQualifiedName().toString());
+    }
+
     protected void handleIndexDropStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
         IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
@@ -3637,7 +3650,7 @@
     protected CreateResult doCreateLibrary(MetadataProvider metadataProvider, String databaseName,
             DataverseName dataverseName, String libraryName, String libraryHash, CreateLibraryStatement cls,
             IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
-        JobUtils.ProgressState progress = ProgressState.NO_PROGRESS;
+        ProgressState progress = ProgressState.NO_PROGRESS;
         boolean prepareJobSuccessful = false;
         JobSpecification abortJobSpec = null;
         Library existingLibrary = null;
@@ -3773,7 +3786,7 @@
     protected boolean doDropLibrary(MetadataProvider metadataProvider, LibraryDropStatement stmtDropLibrary,
             String databaseName, DataverseName dataverseName, String libraryName, IHyracksClientConnection hcc,
             IRequestParameters requestParameters) throws Exception {
-        JobUtils.ProgressState progress = ProgressState.NO_PROGRESS;
+        ProgressState progress = ProgressState.NO_PROGRESS;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -4044,8 +4057,11 @@
                     copyStmt, itemType, mdTxnCtx, metadataProvider);
             ExternalDataUtils.normalize(properties);
             ExternalDataUtils.validate(properties);
-            validateExternalDatasetProperties(externalDetails, properties, copyStmt.getSourceLocation(), mdTxnCtx,
-                    appCtx, metadataProvider);
+            Map<String, String> propertiesCopy = preparePropertiesCopyForValidation(externalDetails, properties,
+                    copyStmt.getSourceLocation(), mdTxnCtx, appCtx, metadataProvider);
+            // do any necessary validation on the copy to avoid changing the original and storing it in the metadata
+            metadataProvider.setExternalEntityId(propertiesCopy, dataset);
+            validateAdapterSpecificProperties(propertiesCopy, copyStmt.getSourceLocation(), appCtx);
             CompiledCopyFromFileStatement cls = new CompiledCopyFromFileStatement(databaseName, dataverseName,
                     copyStmt.getDatasetName(), itemType, externalDetails.getAdapter(), properties);
             cls.setSourceLocation(stmt.getSourceLocation());
@@ -4102,6 +4118,9 @@
             ResultMetadata outMetadata, IRequestParameters requestParameters, Map<String, IAObject> stmtParams,
             Stats stats) throws Exception {
         CopyToStatement copyTo = (CopyToStatement) stmt;
+        Namespace namespace = copyTo.getNamespace();
+        DataverseName dataverseName = namespace.getDataverseName();
+        String databaseName = namespace.getDatabaseName();
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         final ClientRequest clientRequest =
                 (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
@@ -4130,16 +4149,21 @@
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             try {
                 ExternalDetailsDecl edd = copyTo.getExternalDetailsDecl();
-                edd.setProperties(createAndValidateAdapterConfigurationForCopyToStmt(edd,
+                Map<String, String> properties = createAndValidateAdapterConfigurationForCopyToStmt(edd,
                         ExternalDataConstants.WRITER_SUPPORTED_ADAPTERS, copyTo.getSourceLocation(), mdTxnCtx,
-                        metadataProvider));
+                        metadataProvider);
+
+                // request id is used to cache credentials if needed, and clear them after request is done
+                String uuid = requestParameters.getRequestReference().getUuid();
+                metadataProvider.setExternalEntityIdFromParts(properties, databaseName, dataverseName, uuid, true);
+                edd.setProperties(properties);
 
                 if (ExternalDataConstants.FORMAT_PARQUET
                         .equalsIgnoreCase(edd.getProperties().get(ExternalDataConstants.KEY_FORMAT))) {
                     if (copyTo.getType() != null) {
-                        DataverseName dataverseName =
+                        DataverseName dummyDataverse =
                                 DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME);
-                        IAType iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dataverseName,
+                        IAType iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dummyDataverse,
                                 ExternalDataConstants.DUMMY_TYPE_NAME, copyTo.getType(), mdTxnCtx);
                         edd.getProperties().put(ExternalDataConstants.PARQUET_SCHEMA_KEY,
                                 SchemaConverterVisitor.convertToParquetSchemaString((ARecordType) iaType));
@@ -4148,14 +4172,14 @@
 
                 if (edd.getProperties().get(ExternalDataConstants.KEY_FORMAT)
                         .equalsIgnoreCase(ExternalDataConstants.FORMAT_CSV_LOWER_CASE)) {
-                    DataverseName dataverseName =
+                    DataverseName dummyDataverse =
                             DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME);
                     IAType iaType;
                     if (copyTo.getType() != null) {
-                        iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dataverseName,
+                        iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dummyDataverse,
                                 ExternalDataConstants.DUMMY_TYPE_NAME, copyTo.getType(), mdTxnCtx);
                     } else if (copyTo.getTypeExpressionItemType() != null) {
-                        iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dataverseName,
+                        iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dummyDataverse,
                                 ExternalDataConstants.DUMMY_TYPE_NAME, copyTo.getTypeExpressionItemType(), mdTxnCtx);
                     } else {
                         throw new CompilationException(ErrorCode.COMPILATION_ERROR,
@@ -5552,7 +5576,7 @@
             if (atomic && jobId != null) {
                 globalTxManager.abortTransaction(jobId);
             }
-            if (org.apache.hyracks.api.util.ExceptionUtils.getRootCause(e) instanceof InterruptedException) {
+            if (ExceptionUtils.getRootCause(e) instanceof InterruptedException) {
                 Thread.currentThread().interrupt();
                 throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId());
             }
@@ -5561,6 +5585,7 @@
             // complete async jobs after their job completes
             if (ResultDelivery.ASYNC == resultDelivery) {
                 requestTracker.complete(clientRequest.getId());
+                postRequestCompleteCleanup(requestParameters);
             }
             locker.unlock();
         }
@@ -5777,33 +5802,39 @@
      * @param details external details
      * @param sourceLoc source location
      */
-    private void normalizeAdapters(ExternalDetailsDecl details, SourceLocation sourceLoc) throws CompilationException {
+    protected void normalizeAdapters(ExternalDetailsDecl details, SourceLocation sourceLoc)
+            throws CompilationException {
         String adapter = details.getAdapter();
-        Optional<String> normalizedAdapter = ExternalDataConstants.EXTERNAL_READ_ADAPTERS.stream()
-                .filter(k -> k.equalsIgnoreCase(adapter)).findFirst();
+        Optional<String> normalizedAdapter =
+                getSupportedAdapters().stream().filter(k -> k.equalsIgnoreCase(adapter)).findFirst();
         if (normalizedAdapter.isEmpty()) {
             throw CompilationException.create(ErrorCode.UNKNOWN_ADAPTER, sourceLoc, adapter);
         }
         details.setAdapter(normalizedAdapter.get());
     }
 
-    protected void validateExternalDatasetProperties(ExternalDetailsDecl externalDetails,
+    protected Set<String> getSupportedAdapters() {
+        return ExternalDataConstants.EXTERNAL_READ_ADAPTERS;
+    }
+
+    protected Map<String, String> preparePropertiesCopyForValidation(ExternalDetailsDecl externalDetails,
             Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx,
             IApplicationContext appCtx, MetadataProvider metadataProvider)
             throws AlgebricksException, HyracksDataException {
-        // Validate adapter specific properties
         normalizeAdapters(externalDetails, srcLoc);
         String adapter = externalDetails.getAdapter();
         Map<String, String> details = new HashMap<>(properties);
         details.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter);
-        validateAdapterSpecificProperties(details, srcLoc, appCtx);
+        return details;
     }
 
     protected Map<String, String> createAndValidateAdapterConfigurationForCopyToStmt(
             ExternalDetailsDecl externalDetailsDecl, Set<String> supportedAdapters, SourceLocation sourceLocation,
             MetadataTransactionContext mdTxnCtx, MetadataProvider md) throws AlgebricksException {
+        normalizeAdapters(externalDetailsDecl, sourceLocation);
         String adapterName = externalDetailsDecl.getAdapter();
         Map<String, String> properties = externalDetailsDecl.getProperties();
+        properties.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapterName);
         WriterValidationUtil.validateWriterConfiguration(adapterName, supportedAdapters, properties, sourceLocation);
         return properties;
     }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 29a4a6e..3ffb080 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -9,7 +9,7 @@
     "active\.stop\.timeout" : 3600,
     "active\.suspend\.timeout" : 3600,
     "aws.assume.role.duration" : 900,
-    "aws.refresh.assume.role.threshold" : 0.5,
+    "aws.refresh.assume.role.threshold.percentage" : 75,
     "azure.request.timeout" : 120,
     "cloud.acquire.token.timeout" : 100,
     "cloud.deployment" : false,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index dbbf83f..aeb3cea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -9,7 +9,7 @@
     "active\.stop\.timeout" : 3600,
     "active\.suspend\.timeout" : 3600,
     "aws.assume.role.duration" : 900,
-    "aws.refresh.assume.role.threshold" : 0.5,
+    "aws.refresh.assume.role.threshold.percentage" : 75,
     "azure.request.timeout" : 120,
     "cloud.acquire.token.timeout" : 100,
     "cloud.deployment" : false,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 4778cb8..b475612 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -9,7 +9,7 @@
     "active\.stop\.timeout" : 3600,
     "active\.suspend\.timeout" : 3600,
     "aws.assume.role.duration" : 900,
-    "aws.refresh.assume.role.threshold" : 0.5,
+    "aws.refresh.assume.role.threshold.percentage" : 75,
     "azure.request.timeout" : 120,
     "cloud.acquire.token.timeout" : 100,
     "cloud.deployment" : false,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
index f6559f0..62437b2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
@@ -18,12 +18,12 @@
  */
 package org.apache.asterix.common.config;
 
-import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
 import static org.apache.hyracks.control.common.config.OptionTypes.LEVEL;
 import static org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER_BYTE_UNIT;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+import static org.apache.hyracks.control.common.config.OptionTypes.getRangedIntegerType;
 
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.IOptionType;
@@ -55,14 +55,16 @@
         LIBRARY_DEPLOY_TIMEOUT(POSITIVE_INTEGER, 1800, "Timeout to upload a UDF in seconds"),
         AZURE_REQUEST_TIMEOUT(POSITIVE_INTEGER, 120, "Timeout for Azure client requests in seconds"),
         AWS_ASSUME_ROLE_DURATION(
-                POSITIVE_INTEGER,
+                getRangedIntegerType(900, 43200),
                 900,
                 "AWS assuming role duration in seconds. "
                         + "Range from 900 seconds (15 mins) to 43200 seconds (12 hours)"),
-        AWS_REFRESH_ASSUME_ROLE_THRESHOLD(
-                DOUBLE,
-                .5,
-                "Percentage of left duration before assume role credentials " + "needs to be refreshed");
+        AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE(
+                getRangedIntegerType(25, 90),
+                75,
+                "Percentage of duration passed before assume role credentials need to be refreshed, the value ranges "
+                        + "from 25 to 90, default is 75. For example, if the value is set to 65, this means the "
+                        + "credentials need to be refreshed if 65% of the total expiration duration is already passed");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -91,7 +93,7 @@
                 case LIBRARY_DEPLOY_TIMEOUT:
                 case AZURE_REQUEST_TIMEOUT:
                 case AWS_ASSUME_ROLE_DURATION:
-                case AWS_REFRESH_ASSUME_ROLE_THRESHOLD:
+                case AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE:
                     return Section.COMMON;
                 case CC_JAVA_OPTS:
                 case NC_JAVA_OPTS:
@@ -177,7 +179,7 @@
         return accessor.getInt(Option.AWS_ASSUME_ROLE_DURATION);
     }
 
-    public double getAwsRefreshAssumeRoleThreshold() {
-        return accessor.getDouble(Option.AWS_REFRESH_ASSUME_ROLE_THRESHOLD);
+    public int getAwsRefreshAssumeRoleThresholdPercentage() {
+        return accessor.getInt(Option.AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE);
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
index 3ff444d..3a9ae1c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
@@ -20,64 +20,29 @@
 
 import java.util.Map;
 
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.metadata.IFullyQualifiedName;
-
 public interface IExternalCredentialsCache {
 
     /**
-     * Returns the cached credentials. Can be of any supported external credentials type
+     * Returns the cached credentials.
      *
-     * @param configuration configuration containing external collection details
-     * @return credentials if present, null otherwise
+     * @param key credentials key
+     * @return credentials if present and not expired/need refreshing, null otherwise
      */
-    Object getCredentials(Map<String, String> configuration) throws CompilationException;
+    Object get(String key);
 
     /**
-     * Returns the cached credentials. Can be of any supported external credentials type
+     * Deletes the cache for the provided entity
      *
-     * @param fqn fully qualified name of the credentials entity
-     * @return credentials if present, null otherwise
+     * @param key credentials key
      */
-    Object getCredentials(IFullyQualifiedName fqn) throws CompilationException;
+    void delete(String key);
 
     /**
      * Updates the credentials cache with the provided credentials for the specified name
      *
-     * @param configuration configuration containing external collection details
+     * @param key credentials key
+     * @param type credentials type
      * @param credentials credentials to cache
      */
-    void updateCache(Map<String, String> configuration, Map<String, String> credentials) throws CompilationException;
-
-    /**
-     * Updates the credentials cache with the provided credentials for the specified name
-     *
-     * @param fqn fully qualified name for the credentials entity
-     * @param type type of the entity
-     * @param credentials credentials to cache
-     */
-    void updateCache(IFullyQualifiedName fqn, String type, Map<String, String> credentials);
-
-    /**
-     * Deletes the cache for the provided enitty
-     *
-     * @param fqn fully qualified name of entity for which the credentials are to be deleted
-     */
-    void deleteCredentials(IFullyQualifiedName fqn);
-
-    /**
-     * Returns the name of the entity which the cached credentials belong to
-     *
-     * @param configuration configuration containing external collection details
-     * @return name of entity which credentials belong to
-     */
-    String getName(Map<String, String> configuration) throws CompilationException;
-
-    /**
-     * Returns the name of the entity which the cached credentials belong to
-     *
-     * @param fqn fully qualified name for the credentials entity
-     * @return name of entity which credentials belong to
-     */
-    String getName(IFullyQualifiedName fqn) throws CompilationException;
+    void put(String key, String type, Map<String, String> credentials);
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 4cc1656..9e9df8b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -105,6 +105,7 @@
     public static final String KEY_WAIT_FOR_DATA = "wait-for-data";
     public static final String KEY_FEED_NAME = "feed";
     // a string representing external bucket name
+    public static final String KEY_ENTITY_ID = "entity-id";
     public static final String KEY_EXTERNAL_SOURCE_TYPE = "type";
     // a comma delimited list of nodes
     public static final String KEY_NODES = "nodes";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index 57889ea..4e0a435 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -241,7 +241,7 @@
     public static AwsCredentialsProvider getTrustAccountCredentials(IApplicationContext appCtx,
             Map<String, String> configuration) throws CompilationException {
         IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
-        Object credentialsObject = cache.getCredentials(configuration);
+        Object credentialsObject = cache.get(configuration.get(ExternalDataConstants.KEY_ENTITY_ID));
         if (credentialsObject != null) {
             return () -> (AwsSessionCredentials) credentialsObject;
         }
@@ -389,7 +389,7 @@
      */
     public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, JobConf jobConf,
             Map<String, String> configuration, int numberOfPartitions) throws CompilationException {
-        setHadoopCredentials(jobConf, configuration);
+        setHadoopCredentials(appCtx, jobConf, configuration);
         String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
         Region region = validateAndGetRegion(configuration.get(REGION_FIELD_NAME));
         jobConf.set(HADOOP_REGION, region.toString());
@@ -421,7 +421,8 @@
      * @param jobConf hadoop job config
      * @param configuration external details configuration
      */
-    private static void setHadoopCredentials(JobConf jobConf, Map<String, String> configuration) {
+    private static void setHadoopCredentials(IApplicationContext appCtx, JobConf jobConf,
+            Map<String, String> configuration) {
         AuthenticationType authenticationType = getAuthenticationType(configuration);
         switch (authenticationType) {
             case ANONYMOUS:
@@ -432,7 +433,11 @@
                 jobConf.set(HADOOP_ASSUME_ROLE_ARN, configuration.get(ROLE_ARN_FIELD_NAME));
                 jobConf.set(HADOOP_ASSUME_ROLE_EXTERNAL_ID, configuration.get(EXTERNAL_ID_FIELD_NAME));
                 jobConf.set(HADOOP_ASSUME_ROLE_SESSION_NAME, "parquet-" + UUID.randomUUID());
-                jobConf.set(HADOOP_ASSUME_ROLE_SESSION_DURATION, "15m");
+
+                // hadoop accepts time 15m to 1h, we will base it on the provided configuration
+                int durationInSeconds = appCtx.getExternalProperties().getAwsAssumeRoleDuration();
+                String hadoopDuration = getHadoopDuration(durationInSeconds);
+                jobConf.set(HADOOP_ASSUME_ROLE_SESSION_DURATION, hadoopDuration);
 
                 // TODO: this assumes basic keys always, also support if we use InstanceProfile to assume a role
                 jobConf.set(HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY, HADOOP_SIMPLE);
@@ -456,6 +461,36 @@
     }
 
     /**
+     * Hadoop accepts duration values from 15m to 1h (in this format). We will base this on the configured
+     * duration in seconds. If the time exceeds 1 hour, we will return 1h
+     *
+     * @param seconds configured duration in seconds
+     * @return hadoop updated duration
+     */
+    private static String getHadoopDuration(int seconds) {
+        // constants for time thresholds
+        final int FIFTEEN_MINUTES_IN_SECONDS = 15 * 60;
+        final int ONE_HOUR_IN_SECONDS = 60 * 60;
+
+        // Adjust seconds to fit within bounds
+        if (seconds < FIFTEEN_MINUTES_IN_SECONDS) {
+            seconds = FIFTEEN_MINUTES_IN_SECONDS;
+        } else if (seconds > ONE_HOUR_IN_SECONDS) {
+            seconds = ONE_HOUR_IN_SECONDS;
+        }
+
+        // Convert seconds to minutes
+        int minutes = seconds / 60;
+
+        // Format the result
+        if (minutes == 60) {
+            return "1h";
+        } else {
+            return minutes + "m";
+        }
+    }
+
+    /**
      * Validate external dataset properties
      *
      * @param configuration properties
@@ -471,27 +506,6 @@
             throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
         }
 
-        String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
-        String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-
-        if (arnRole != null) {
-            return;
-        } else if (externalId != null) {
-            throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
-                    EXTERNAL_ID_FIELD_NAME);
-        } else if (accessKeyId == null || secretAccessKey == null) {
-            // If one is passed, the other is required
-            if (accessKeyId != null) {
-                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
-                        ACCESS_KEY_ID_FIELD_NAME);
-            } else if (secretAccessKey != null) {
-                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
-                        SECRET_ACCESS_KEY_FIELD_NAME);
-            }
-        }
-
         validateIncludeExclude(configuration);
         try {
             // TODO(htowaileb): maybe something better, this will check to ensure type is supported before creation
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index ef056c9..6f3a9b6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -48,6 +48,7 @@
 import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.metadata.DatasetFullyQualifiedName;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.metadata.LockList;
 import org.apache.asterix.common.metadata.MetadataConstants;
@@ -998,6 +999,7 @@
             configuration.put(ExternalDataConstants.KEY_DATASET_DATABASE, dataset.getDatabaseName());
             configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE,
                     dataset.getDataverseName().getCanonicalForm());
+            setExternalEntityId(configuration, dataset);
             return AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(), adapterName,
                     configuration, itemType, null, warningCollector, filterEvaluatorFactory);
         } catch (Exception e) {
@@ -1005,6 +1007,20 @@
         }
     }
 
+    public void setExternalEntityId(Map<String, String> configuration, Dataset dataset) throws AlgebricksException {
+        configuration.put(ExternalDataConstants.KEY_ENTITY_ID, dataset.getDatasetFullyQualifiedName().toString());
+    }
+
+    public void setExternalEntityIdFromParts(Map<String, String> configuration, String database,
+            DataverseName dataverse, String dataset, boolean isUuid) throws AlgebricksException {
+        if (isUuid) {
+            configuration.put(ExternalDataConstants.KEY_ENTITY_ID, dataset);
+        } else {
+            DatasetFullyQualifiedName fqn = new DatasetFullyQualifiedName(database, dataverse, dataset);
+            configuration.put(ExternalDataConstants.KEY_ENTITY_ID, fqn.toString());
+        }
+    }
+
     public TxnId getTxnId() {
         return txnId;
     }