[ASTERIXDB-3514][EXT]: Add error codes for missing invalid/missing creds to assume role

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Add error codes for missing/temporary credentials, need
  long-lived credentials to assume the role.
- Unify getting name to update/delete cache.

Ext-ref: MB-63505
Change-Id: I3ea48cc83d4c0b92d66e518f7e8108050f0e553a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19344
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
index 66fbdec..8cb3a47 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
@@ -24,7 +24,13 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.metadata.DatasetFullyQualifiedName;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.IFullyQualifiedName;
 import org.apache.asterix.common.metadata.MetadataConstants;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
@@ -48,8 +54,14 @@
     }
 
     @Override
-    public synchronized Object getCredentials(Map<String, String> configuration) {
-        String name = getName(configuration);
+    public synchronized Object getCredentials(Map<String, String> configuration) throws CompilationException {
+        IFullyQualifiedName fqn = getFullyQualifiedNameFromConfiguration(configuration);
+        return getCredentials(fqn);
+    }
+
+    @Override
+    public synchronized Object getCredentials(IFullyQualifiedName fqn) {
+        String name = getName(fqn);
         if (cache.containsKey(name) && !needsRefresh(cache.get(name).getLeft())) {
             return cache.get(name).getRight();
         }
@@ -57,42 +69,55 @@
     }
 
     @Override
-    public synchronized void updateCache(Map<String, String> configuration, Map<String, String> credentials) {
-        String type = configuration.get(ExternalDataConstants.KEY_READER);
+    public synchronized void updateCache(Map<String, String> configuration, Map<String, String> credentials)
+            throws CompilationException {
+        IFullyQualifiedName fqn = getFullyQualifiedNameFromConfiguration(configuration);
+        String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
+        updateCache(fqn, type, credentials);
+    }
+
+    @Override
+    public synchronized void updateCache(IFullyQualifiedName fqn, String type, Map<String, String> credentials) {
+        String name = getName(fqn);
         if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equalsIgnoreCase(type)) {
-            updateAwsCache(configuration, credentials);
+            updateAwsCache(name, credentials);
         }
     }
 
-    @Override
-    public void deleteCredentials(String name) {
-        cache.remove(name);
-    }
-
-    @Override
-    public String getName(Map<String, String> configuration) {
-        String database = configuration.get(ExternalDataConstants.KEY_DATASET_DATABASE);
-        if (database == null) {
-            database = MetadataConstants.DEFAULT_DATABASE;
-        }
-        String dataverse = configuration.get(ExternalDataConstants.KEY_DATASET_DATAVERSE);
-        String dataset = configuration.get(ExternalDataConstants.KEY_DATASET);
-        return String.join(".", database, dataverse, dataset);
-    }
-
-    private void updateAwsCache(Map<String, String> configuration, Map<String, String> credentials) {
+    private void updateAwsCache(String name, Map<String, String> credentials) {
         String accessKeyId = credentials.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME);
         String secretAccessKey = credentials.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
         String sessionToken = credentials.get(S3Constants.SESSION_TOKEN_FIELD_NAME);
-        doUpdateAwsCache(configuration, AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
+        doUpdateAwsCache(name, AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
     }
 
-    private void doUpdateAwsCache(Map<String, String> configuration, AwsSessionCredentials credentials) {
-        String name = getName(configuration);
+    private void doUpdateAwsCache(String name, AwsSessionCredentials credentials) {
         cache.put(name, Pair.of(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS), credentials));
         LOGGER.info("Received and cached new credentials for {}", name);
     }
 
+    @Override
+    public void deleteCredentials(IFullyQualifiedName fqn) {
+        String name = getName(fqn);
+        Object removed = cache.remove(name);
+        if (removed != null) {
+            LOGGER.info("Removed cached credentials for {}", name);
+        } else {
+            LOGGER.info("No cached credentials found for {}, nothing to remove", name);
+        }
+    }
+
+    @Override
+    public String getName(Map<String, String> configuration) throws CompilationException {
+        IFullyQualifiedName fqn = getFullyQualifiedNameFromConfiguration(configuration);
+        return getName(fqn);
+    }
+
+    @Override
+    public String getName(IFullyQualifiedName fqn) {
+        return fqn.toString();
+    }
+
     /**
      * Refresh if the remaining time is less than the configured refresh percentage
      *
@@ -103,4 +128,24 @@
         return (double) span.remaining(TimeUnit.SECONDS)
                 / span.getSpan(TimeUnit.SECONDS) < refreshAwsAssumeRoleThreshold;
     }
+
+    protected IFullyQualifiedName getFullyQualifiedNameFromConfiguration(Map<String, String> configuration)
+            throws CompilationException {
+        String database = configuration.get(ExternalDataConstants.KEY_DATASET_DATABASE);
+        if (database == null) {
+            database = MetadataConstants.DEFAULT_DATABASE;
+        }
+        String stringDataverse = configuration.get(ExternalDataConstants.KEY_DATASET_DATAVERSE);
+        DataverseName dataverse = getDataverseName(stringDataverse);
+        String dataset = configuration.get(ExternalDataConstants.KEY_DATASET);
+        return new DatasetFullyQualifiedName(database, dataverse, dataset);
+    }
+
+    protected DataverseName getDataverseName(String dataverse) throws CompilationException {
+        try {
+            return DataverseName.createSinglePartName(dataverse);
+        } catch (AsterixException ex) {
+            throw new CompilationException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, dataverse);
+        }
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
index 44d4c21..d1a9ebf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
@@ -21,10 +21,15 @@
 import java.util.Map;
 
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class UpdateAwsCredentialsCacheRequest implements INcAddressedMessage {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     private static final long serialVersionUID = 1L;
     private final Map<String, String> configuration;
     private final Map<String, String> credentials;
@@ -35,8 +40,13 @@
     }
 
     @Override
-    public void handle(INcApplicationContext appCtx) {
-        appCtx.getExternalCredentialsCache().updateCache(configuration, credentials);
+    public void handle(INcApplicationContext appCtx) throws HyracksDataException {
+        try {
+            appCtx.getExternalCredentialsCache().updateCache(configuration, credentials);
+        } catch (CompilationException ex) {
+            LOGGER.info("Failed to process request", ex);
+            throw HyracksDataException.create(ex);
+        }
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 4ee674e..51371da 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2448,8 +2448,9 @@
                     sourceLoc, EnumSet.of(DropOption.IF_EXISTS), requestParameters.isForceDropDataset());
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
-            appCtx.getExternalCredentialsCache()
-                    .deleteCredentials(String.join(".", databaseName, dataverseName.getCanonicalForm(), datasetName));
+            if (ds.getDatasetType().equals(DatasetType.EXTERNAL)) {
+                appCtx.getExternalCredentialsCache().deleteCredentials(ds.getDatasetFullyQualifiedName());
+            }
             return true;
         } catch (Exception e) {
             LOGGER.error("failed to drop dataset; executing compensating operations", e);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index afc43e2..d06a9e5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -318,6 +318,8 @@
     COULD_NOT_CREATE_TOKENS(1211),
     NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION(1212),
     FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION(1213),
+    LONG_LIVED_CREDENTIALS_NEEDED_TO_ASSUME_ROLE(1214),
+    TEMPORARY_CREDENTIALS_CANNOT_BE_USED_TO_ASSUME_ROLE(1215),
 
     // Feed errors
     DATAFLOW_ILLEGAL_STATE(3001),
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
index c603893..3ff444d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
@@ -20,6 +20,9 @@
 
 import java.util.Map;
 
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.metadata.IFullyQualifiedName;
+
 public interface IExternalCredentialsCache {
 
     /**
@@ -28,7 +31,15 @@
      * @param configuration configuration containing external collection details
      * @return credentials if present, null otherwise
      */
-    Object getCredentials(Map<String, String> configuration);
+    Object getCredentials(Map<String, String> configuration) throws CompilationException;
+
+    /**
+     * Returns the cached credentials. Can be of any supported external credentials type
+     *
+     * @param fqn fully qualified name of the credentials entity
+     * @return credentials if present, null otherwise
+     */
+    Object getCredentials(IFullyQualifiedName fqn) throws CompilationException;
 
     /**
      * Updates the credentials cache with the provided credentials for the specified name
@@ -36,14 +47,23 @@
      * @param configuration configuration containing external collection details
      * @param credentials credentials to cache
      */
-    void updateCache(Map<String, String> configuration, Map<String, String> credentials);
+    void updateCache(Map<String, String> configuration, Map<String, String> credentials) throws CompilationException;
 
     /**
-     * Deletes the cache for the provided entity name
+     * Updates the credentials cache with the provided credentials for the specified name
      *
-     * @param name name of the entity for which the credentials are to be deleted
+     * @param fqn fully qualified name for the credentials entity
+     * @param type type of the entity
+     * @param credentials credentials to cache
      */
-    void deleteCredentials(String name);
+    void updateCache(IFullyQualifiedName fqn, String type, Map<String, String> credentials);
+
+    /**
+     * Deletes the cache for the provided enitty
+     *
+     * @param fqn fully qualified name of entity for which the credentials are to be deleted
+     */
+    void deleteCredentials(IFullyQualifiedName fqn);
 
     /**
      * Returns the name of the entity which the cached credentials belong to
@@ -51,5 +71,13 @@
      * @param configuration configuration containing external collection details
      * @return name of entity which credentials belong to
      */
-    String getName(Map<String, String> configuration);
+    String getName(Map<String, String> configuration) throws CompilationException;
+
+    /**
+     * Returns the name of the entity which the cached credentials belong to
+     *
+     * @param fqn fully qualified name for the credentials entity
+     * @return name of entity which credentials belong to
+     */
+    String getName(IFullyQualifiedName fqn) throws CompilationException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/DatasetFullyQualifiedName.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/DatasetFullyQualifiedName.java
index 261cddc..e6b66e6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/DatasetFullyQualifiedName.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/DatasetFullyQualifiedName.java
@@ -21,7 +21,7 @@
 import java.io.Serializable;
 import java.util.Objects;
 
-public class DatasetFullyQualifiedName implements Serializable {
+public class DatasetFullyQualifiedName implements Serializable, IFullyQualifiedName {
 
     private static final long serialVersionUID = 2L;
     private final String databaseName;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IFullyQualifiedName.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IFullyQualifiedName.java
new file mode 100644
index 0000000..f099902
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IFullyQualifiedName.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.metadata;
+
+public interface IFullyQualifiedName {
+}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 6900de3..c1ffd13 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -320,6 +320,8 @@
 1211 = Could not create delegation tokens
 1212 = No credentials found for cross-account authentication. Expected instance profile or access key id & secret access key for assuming role
 1213 = Failed to perform cross-account authentication. Encountered error : '%1$s'
+1214 = Long-lived credentials are required to assume a role
+1215 = Temporary credentials cannot be used to assume a role
 
 # Feed Errors
 3001 = Illegal state.