[ASTERIXDB-3514][EXT]: Cleanup on S3 cross-account auth

Ext-ref: MB-63505
Change-Id: Iba5dfd5e077f87aa0c1ee1cc81fcf197c9f06762
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19386
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
index ed26f7c..fcdbabd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
@@ -18,26 +18,20 @@
  */
 package org.apache.asterix.app.external;
 
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.external.IExternalCredentialsCache;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.util.Span;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-
 public class ExternalCredentialsCache implements IExternalCredentialsCache {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private final ConcurrentMap<String, Pair<Span, Object>> cache = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, TemporaryCredentials> cache = new ConcurrentHashMap<>();
     private final int awsAssumeRoleDuration;
     private final int refreshAwsAssumeRoleThresholdPercentage;
 
@@ -51,7 +45,7 @@
     public synchronized Object get(String key) {
         invalidateCache();
         if (cache.containsKey(key)) {
-            return cache.get(key).getRight();
+            return cache.get(key).getCredentials();
         }
         return null;
     }
@@ -65,20 +59,9 @@
     }
 
     @Override
-    public synchronized void put(String key, String type, Map<String, String> credentials) {
-        if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equalsIgnoreCase(type)) {
-            updateAwsCache(key, credentials);
-        }
-    }
-
-    private void updateAwsCache(String name, Map<String, String> credentials) {
-        String accessKeyId = credentials.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = credentials.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
-        String sessionToken = credentials.get(S3Constants.SESSION_TOKEN_FIELD_NAME);
-
-        AwsSessionCredentials sessionCreds = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken);
-        cache.put(name, Pair.of(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS), sessionCreds));
-        LOGGER.info("Received and cached new credentials for {}", name);
+    public synchronized void put(String key, Object credentials) {
+        cache.put(key, new TemporaryCredentials(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS), credentials));
+        LOGGER.info("Received and cached new credentials for {}", key);
     }
 
     /**
@@ -86,7 +69,7 @@
      */
     private void invalidateCache() {
         cache.entrySet().removeIf(entry -> {
-            boolean shouldRemove = needsRefresh(entry.getValue().getLeft());
+            boolean shouldRemove = needsRefresh(entry.getValue().getDuration());
             if (shouldRemove) {
                 LOGGER.info("Removing cached credentials for {} because it expired", entry.getKey());
             }
@@ -106,4 +89,22 @@
         int passedPercentage = (int) (passed * 100);
         return passedPercentage > refreshAwsAssumeRoleThresholdPercentage;
     }
+
+    static class TemporaryCredentials {
+        private final Span duration;
+        private final Object credentials;
+
+        public TemporaryCredentials(Span duration, Object credentials) {
+            this.duration = duration;
+            this.credentials = credentials;
+        }
+
+        public Span getDuration() {
+            return duration;
+        }
+
+        public Object getCredentials() {
+            return credentials;
+        }
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
index 29e3239..4c382d0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
@@ -24,7 +24,6 @@
 import static org.apache.asterix.common.exceptions.ErrorCode.REJECT_BAD_CLUSTER_STATE;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -39,10 +38,10 @@
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.external.IExternalCredentialsCache;
 import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.common.messaging.api.MessageFuture;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
 import org.apache.asterix.messaging.CCMessageBroker;
 import org.apache.asterix.messaging.NCMessageBroker;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -53,6 +52,11 @@
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
 
+/**
+ * The class is responsible for generating new credentials based on the adapter type. Given a request:
+ * - if we are the CC, generate new creds and ask all NCs to update their cache
+ * - if we are the NC, send a message to the CC to generate new creds
+ */
 public class ExternalCredentialsCacheUpdater implements IExternalCredentialsCacheUpdater {
 
     private static final Logger LOGGER = LogManager.getLogger();
@@ -66,49 +70,46 @@
     public synchronized Object generateAndCacheCredentials(Map<String, String> configuration)
             throws HyracksDataException, CompilationException {
         IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
-        String name = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
-        Object credentials = cache.get(name);
+        String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
+        Object credentials = cache.get(key);
         if (credentials != null) {
             return credentials;
         }
 
-        /*
-         * if we are the CC, generate new creds and ask all NCs to update their cache
-         * if we are the NC, send a message to the CC to generate new creds and ask all NCs to update their cache
-         */
-        if (appCtx instanceof ICcApplicationContext ccAppCtx) {
-            IClusterManagementWork.ClusterState state = ccAppCtx.getClusterStateManager().getState();
-            if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) {
-                throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, state);
-            }
+        String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
+        if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(type)) {
+            credentials = generateAwsCredentials(configuration);
+        }
 
-            String accessKeyId;
-            String secretAccessKey;
-            String sessionToken;
-            Map<String, String> credentialsMap = new HashMap<>();
+        return credentials;
+    }
+
+    // TODO: this can probably be refactored out into something that is AWS-specific
+    private Object generateAwsCredentials(Map<String, String> configuration)
+            throws HyracksDataException, CompilationException {
+        String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
+        AwsSessionCredentials credentials;
+        if (appCtx instanceof ICcApplicationContext) {
+            validateClusterState();
             try {
-                LOGGER.info("attempting to update credentials for {}", name);
+                LOGGER.info("attempting to update AWS credentials for {}", key);
                 AwsCredentialsProvider newCredentials = S3AuthUtils.assumeRoleAndGetCredentials(configuration);
-                LOGGER.info("updated credentials successfully for {}", name);
-                AwsSessionCredentials sessionCredentials = (AwsSessionCredentials) newCredentials.resolveCredentials();
-                accessKeyId = sessionCredentials.accessKeyId();
-                secretAccessKey = sessionCredentials.secretAccessKey();
-                sessionToken = sessionCredentials.sessionToken();
+                LOGGER.info("updated AWS credentials successfully for {}", key);
+                credentials = (AwsSessionCredentials) newCredentials.resolveCredentials();
+                appCtx.getExternalCredentialsCache().put(key, credentials);
             } catch (CompilationException ex) {
-                LOGGER.info("failed to refresh credentials for {}", name, ex);
+                LOGGER.info("failed to refresh AWS credentials for {}", key, ex);
                 throw ex;
             }
 
-            // credentials need refreshing
-            credentialsMap.put(S3Constants.ACCESS_KEY_ID_FIELD_NAME, accessKeyId);
-            credentialsMap.put(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME, secretAccessKey);
-            credentialsMap.put(S3Constants.SESSION_TOKEN_FIELD_NAME, sessionToken);
+            String accessKeyId = credentials.accessKeyId();
+            String secretAccessKey = credentials.secretAccessKey();
+            String sessionToken = credentials.sessionToken();
+            UpdateAwsCredentialsCacheRequest request =
+                    new UpdateAwsCredentialsCacheRequest(configuration, accessKeyId, secretAccessKey, sessionToken);
 
             // request all NCs to update their credentials cache with the latest creds
-            updateNcsCredentialsCache(ccAppCtx, name, credentialsMap, configuration);
-            String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
-            cache.put(name, type, credentialsMap);
-            credentials = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken);
+            updateNcsCredentialsCache(key, request);
         } else {
             NCMessageBroker broker = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
             MessageFuture messageFuture = broker.registerMessageFuture();
@@ -116,7 +117,7 @@
             long futureId = messageFuture.getFutureId();
             RefreshAwsCredentialsRequest request = new RefreshAwsCredentialsRequest(nodeId, futureId, configuration);
             try {
-                LOGGER.info("no valid credentials found for {}, requesting credentials from CC", name);
+                LOGGER.info("no valid AWS credentials found for {}, requesting AWS credentials from CC", key);
                 broker.sendMessageToPrimaryCC(request);
                 RefreshAwsCredentialsResponse response = (RefreshAwsCredentialsResponse) messageFuture
                         .get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
@@ -126,7 +127,7 @@
                 credentials = AwsSessionCredentials.create(response.getAccessKeyId(), response.getSecretAccessKey(),
                         response.getSessionToken());
             } catch (Exception ex) {
-                LOGGER.info("failed to refresh credentials for {}", name, ex);
+                LOGGER.info("failed to refresh AWS credentials for {}", key, ex);
                 throw HyracksDataException.create(ex);
             } finally {
                 broker.deregisterMessageFuture(futureId);
@@ -135,14 +136,12 @@
         return credentials;
     }
 
-    private void updateNcsCredentialsCache(ICcApplicationContext appCtx, String name, Map<String, String> credentials,
-            Map<String, String> configuration) throws HyracksDataException {
-        final List<String> ncs = new ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes());
+    private void updateNcsCredentialsCache(String key, INcAddressedMessage request) throws HyracksDataException {
+        ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx;
+        final List<String> ncs = new ArrayList<>(ccAppCtx.getClusterStateManager().getParticipantNodes());
         CCMessageBroker broker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
-        UpdateAwsCredentialsCacheRequest request = new UpdateAwsCredentialsCacheRequest(configuration, credentials);
-
         try {
-            LOGGER.info("requesting all NCs to update their credentials for {}", name);
+            LOGGER.info("requesting all NCs to update their credentials for {}", key);
             for (String nc : ncs) {
                 broker.sendApplicationMessageToNC(request, nc);
             }
@@ -151,4 +150,12 @@
             throw HyracksDataException.create(e);
         }
     }
+
+    private void validateClusterState() throws HyracksDataException {
+        ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx;
+        IClusterManagementWork.ClusterState state = ccAppCtx.getClusterStateManager().getState();
+        if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) {
+            throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, state);
+        }
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
index 438e425..753dbf1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
@@ -23,26 +23,31 @@
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
 
 public class UpdateAwsCredentialsCacheRequest implements INcAddressedMessage {
 
-    private static final Logger LOGGER = LogManager.getLogger();
     private static final long serialVersionUID = 1L;
     private final Map<String, String> configuration;
-    private final Map<String, String> credentials;
+    private final String accessKeyId;
+    private final String secretAccessKey;
+    private final String sessionToken;
 
-    public UpdateAwsCredentialsCacheRequest(Map<String, String> configuration, Map<String, String> credentials) {
+    public UpdateAwsCredentialsCacheRequest(Map<String, String> configuration, String accessKeyId,
+            String secretAccessKey, String sessionToken) {
         this.configuration = configuration;
-        this.credentials = credentials;
+        this.accessKeyId = accessKeyId;
+        this.secretAccessKey = secretAccessKey;
+        this.sessionToken = sessionToken;
+
     }
 
     @Override
     public void handle(INcApplicationContext appCtx) {
         String name = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
-        String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
-        appCtx.getExternalCredentialsCache().put(name, type, credentials);
+        AwsSessionCredentials credentials = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken);
+        appCtx.getExternalCredentialsCache().put(name, credentials);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 97bc3cd..8ba9a02 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -579,17 +579,11 @@
             // async queries are completed after their job completes
             if (ResultDelivery.ASYNC != resultDelivery) {
                 appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid());
-                postRequestCompleteCleanup(requestParameters);
             }
             Thread.currentThread().setName(threadName);
         }
     }
 
-    protected void postRequestCompleteCleanup(IRequestParameters requestParameters) {
-        String uuid = requestParameters.getRequestReference().getUuid();
-        appCtx.getExternalCredentialsCache().delete(uuid);
-    }
-
     protected void configureMetadataProvider(MetadataProvider metadataProvider, Map<String, String> config,
             Counter resultSetIdCounter, FileSplit outputFile, IRequestParameters requestParameters,
             Statement statement) {
@@ -1038,12 +1032,8 @@
                     ExternalDataUtils.normalize(properties);
                     ExternalDataUtils.validate(properties);
                     ExternalDataUtils.validateType(properties, (ARecordType) itemType);
-                    Map<String, String> propertiesCopy = preparePropertiesCopyForValidation(externalDetails, properties,
-                            dd.getSourceLocation(), mdTxnCtx, appCtx, metadataProvider);
-                    // do any necessary validation on the copy to avoid changing the original and storing it in the metadata
-                    metadataProvider.setExternalEntityIdFromParts(propertiesCopy, databaseName, dataverseName,
-                            datasetName, false);
-                    validateAdapterSpecificProperties(propertiesCopy, dd.getSourceLocation(), appCtx);
+                    validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx,
+                            appCtx, metadataProvider);
                     datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
                             TransactionState.COMMIT);
                     break;
@@ -2459,7 +2449,6 @@
                     sourceLoc, EnumSet.of(DropOption.IF_EXISTS), requestParameters.isForceDropDataset());
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
-            deleteDatasetCachedCredentials(ds);
             return true;
         } catch (Exception e) {
             LOGGER.error("failed to drop dataset; executing compensating operations", e);
@@ -2507,10 +2496,6 @@
         }
     }
 
-    protected void deleteDatasetCachedCredentials(Dataset dataset) throws CompilationException {
-        appCtx.getExternalCredentialsCache().delete(dataset.getDatasetFullyQualifiedName().toString());
-    }
-
     protected void handleIndexDropStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
         IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
@@ -4057,11 +4042,8 @@
                     copyStmt, itemType, mdTxnCtx, metadataProvider);
             ExternalDataUtils.normalize(properties);
             ExternalDataUtils.validate(properties);
-            Map<String, String> propertiesCopy = preparePropertiesCopyForValidation(externalDetails, properties,
-                    copyStmt.getSourceLocation(), mdTxnCtx, appCtx, metadataProvider);
-            // do any necessary validation on the copy to avoid changing the original and storing it in the metadata
-            metadataProvider.setExternalEntityId(propertiesCopy, dataset);
-            validateAdapterSpecificProperties(propertiesCopy, copyStmt.getSourceLocation(), appCtx);
+            validateExternalDatasetProperties(externalDetails, properties, copyStmt.getSourceLocation(), mdTxnCtx,
+                    appCtx, metadataProvider);
             CompiledCopyFromFileStatement cls = new CompiledCopyFromFileStatement(databaseName, dataverseName,
                     copyStmt.getDatasetName(), itemType, externalDetails.getAdapter(), properties);
             cls.setSourceLocation(stmt.getSourceLocation());
@@ -4118,9 +4100,6 @@
             ResultMetadata outMetadata, IRequestParameters requestParameters, Map<String, IAObject> stmtParams,
             Stats stats) throws Exception {
         CopyToStatement copyTo = (CopyToStatement) stmt;
-        Namespace namespace = copyTo.getNamespace();
-        DataverseName dataverseName = namespace.getDataverseName();
-        String databaseName = namespace.getDatabaseName();
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         final ClientRequest clientRequest =
                 (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
@@ -4149,14 +4128,9 @@
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             try {
                 ExternalDetailsDecl edd = copyTo.getExternalDetailsDecl();
-                Map<String, String> properties = createAndValidateAdapterConfigurationForCopyToStmt(edd,
+                edd.setProperties(createAndValidateAdapterConfigurationForCopyToStmt(edd,
                         ExternalDataConstants.WRITER_SUPPORTED_ADAPTERS, copyTo.getSourceLocation(), mdTxnCtx,
-                        metadataProvider);
-
-                // request id is used to cache credentials if needed, and clear them after request is done
-                String uuid = requestParameters.getRequestReference().getUuid();
-                metadataProvider.setExternalEntityIdFromParts(properties, databaseName, dataverseName, uuid, true);
-                edd.setProperties(properties);
+                        metadataProvider));
 
                 if (ExternalDataConstants.FORMAT_PARQUET
                         .equalsIgnoreCase(edd.getProperties().get(ExternalDataConstants.KEY_FORMAT))) {
@@ -5585,7 +5559,6 @@
             // complete async jobs after their job completes
             if (ResultDelivery.ASYNC == resultDelivery) {
                 requestTracker.complete(clientRequest.getId());
-                postRequestCompleteCleanup(requestParameters);
             }
             locker.unlock();
         }
@@ -5802,8 +5775,7 @@
      * @param details external details
      * @param sourceLoc source location
      */
-    protected void normalizeAdapters(ExternalDetailsDecl details, SourceLocation sourceLoc)
-            throws CompilationException {
+    private void normalizeAdapters(ExternalDetailsDecl details, SourceLocation sourceLoc) throws CompilationException {
         String adapter = details.getAdapter();
         Optional<String> normalizedAdapter =
                 getSupportedAdapters().stream().filter(k -> k.equalsIgnoreCase(adapter)).findFirst();
@@ -5817,15 +5789,17 @@
         return ExternalDataConstants.EXTERNAL_READ_ADAPTERS;
     }
 
-    protected Map<String, String> preparePropertiesCopyForValidation(ExternalDetailsDecl externalDetails,
+    protected void validateExternalDatasetProperties(ExternalDetailsDecl externalDetails,
             Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx,
             IApplicationContext appCtx, MetadataProvider metadataProvider)
             throws AlgebricksException, HyracksDataException {
+        // Validate adapter specific properties
         normalizeAdapters(externalDetails, srcLoc);
         String adapter = externalDetails.getAdapter();
         Map<String, String> details = new HashMap<>(properties);
         details.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter);
-        return details;
+        metadataProvider.setExternalEntityId(details);
+        validateAdapterSpecificProperties(details, srcLoc, appCtx);
     }
 
     protected Map<String, String> createAndValidateAdapterConfigurationForCopyToStmt(
@@ -5835,6 +5809,7 @@
         String adapterName = externalDetailsDecl.getAdapter();
         Map<String, String> properties = externalDetailsDecl.getProperties();
         properties.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapterName);
+        md.setExternalEntityId(properties);
         WriterValidationUtil.validateWriterConfiguration(adapterName, supportedAdapters, properties, sourceLocation);
         return properties;
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
index 3a9ae1c..689ff16 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.common.external;
 
-import java.util.Map;
-
 public interface IExternalCredentialsCache {
 
     /**
@@ -41,8 +39,7 @@
      * Updates the credentials cache with the provided credentials for the specified name
      *
      * @param key credentials key
-     * @param type credentials type
      * @param credentials credentials to cache
      */
-    void put(String key, String type, Map<String, String> credentials);
+    void put(String key, Object credentials);
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 6f3a9b6..20299d5 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -32,6 +32,7 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INamespaceResolver;
@@ -48,7 +49,6 @@
 import org.apache.asterix.common.external.IDataSourceAdapter;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.metadata.DatasetFullyQualifiedName;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.metadata.LockList;
 import org.apache.asterix.common.metadata.MetadataConstants;
@@ -999,7 +999,8 @@
             configuration.put(ExternalDataConstants.KEY_DATASET_DATABASE, dataset.getDatabaseName());
             configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE,
                     dataset.getDataverseName().getCanonicalForm());
-            setExternalEntityId(configuration, dataset);
+            setExternalEntityId(configuration);
+            setSourceType(configuration, adapterName);
             return AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(), adapterName,
                     configuration, itemType, null, warningCollector, filterEvaluatorFactory);
         } catch (Exception e) {
@@ -1007,18 +1008,12 @@
         }
     }
 
-    public void setExternalEntityId(Map<String, String> configuration, Dataset dataset) throws AlgebricksException {
-        configuration.put(ExternalDataConstants.KEY_ENTITY_ID, dataset.getDatasetFullyQualifiedName().toString());
+    protected void setSourceType(Map<String, String> configuration, String adapterName) {
+        configuration.putIfAbsent(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapterName);
     }
 
-    public void setExternalEntityIdFromParts(Map<String, String> configuration, String database,
-            DataverseName dataverse, String dataset, boolean isUuid) throws AlgebricksException {
-        if (isUuid) {
-            configuration.put(ExternalDataConstants.KEY_ENTITY_ID, dataset);
-        } else {
-            DatasetFullyQualifiedName fqn = new DatasetFullyQualifiedName(database, dataverse, dataset);
-            configuration.put(ExternalDataConstants.KEY_ENTITY_ID, fqn.toString());
-        }
+    public void setExternalEntityId(Map<String, String> configuration) throws AlgebricksException {
+        configuration.put(ExternalDataConstants.KEY_ENTITY_ID, UUID.randomUUID().toString());
     }
 
     public TxnId getTxnId() {