[ASTERIXDB-3514][EXT]: Cleanup on S3 cross-account auth
Ext-ref: MB-63505
Change-Id: Iba5dfd5e077f87aa0c1ee1cc81fcf197c9f06762
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19386
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
index ed26f7c..fcdbabd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
@@ -18,26 +18,20 @@
*/
package org.apache.asterix.app.external;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.external.IExternalCredentialsCache;
-import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.util.Span;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-
public class ExternalCredentialsCache implements IExternalCredentialsCache {
private static final Logger LOGGER = LogManager.getLogger();
- private final ConcurrentMap<String, Pair<Span, Object>> cache = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, TemporaryCredentials> cache = new ConcurrentHashMap<>();
private final int awsAssumeRoleDuration;
private final int refreshAwsAssumeRoleThresholdPercentage;
@@ -51,7 +45,7 @@
public synchronized Object get(String key) {
invalidateCache();
if (cache.containsKey(key)) {
- return cache.get(key).getRight();
+ return cache.get(key).getCredentials();
}
return null;
}
@@ -65,20 +59,9 @@
}
@Override
- public synchronized void put(String key, String type, Map<String, String> credentials) {
- if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equalsIgnoreCase(type)) {
- updateAwsCache(key, credentials);
- }
- }
-
- private void updateAwsCache(String name, Map<String, String> credentials) {
- String accessKeyId = credentials.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey = credentials.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
- String sessionToken = credentials.get(S3Constants.SESSION_TOKEN_FIELD_NAME);
-
- AwsSessionCredentials sessionCreds = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken);
- cache.put(name, Pair.of(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS), sessionCreds));
- LOGGER.info("Received and cached new credentials for {}", name);
+ public synchronized void put(String key, Object credentials) {
+ cache.put(key, new TemporaryCredentials(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS), credentials));
+ LOGGER.info("Received and cached new credentials for {}", key);
}
/**
@@ -86,7 +69,7 @@
*/
private void invalidateCache() {
cache.entrySet().removeIf(entry -> {
- boolean shouldRemove = needsRefresh(entry.getValue().getLeft());
+ boolean shouldRemove = needsRefresh(entry.getValue().getDuration());
if (shouldRemove) {
LOGGER.info("Removing cached credentials for {} because it expired", entry.getKey());
}
@@ -106,4 +89,22 @@
int passedPercentage = (int) (passed * 100);
return passedPercentage > refreshAwsAssumeRoleThresholdPercentage;
}
+
+ static class TemporaryCredentials {
+ private final Span duration;
+ private final Object credentials;
+
+ public TemporaryCredentials(Span duration, Object credentials) {
+ this.duration = duration;
+ this.credentials = credentials;
+ }
+
+ public Span getDuration() {
+ return duration;
+ }
+
+ public Object getCredentials() {
+ return credentials;
+ }
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
index 29e3239..4c382d0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
@@ -24,7 +24,6 @@
import static org.apache.asterix.common.exceptions.ErrorCode.REJECT_BAD_CLUSTER_STATE;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -39,10 +38,10 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.external.IExternalCredentialsCache;
import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.asterix.common.messaging.api.MessageFuture;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
import org.apache.asterix.messaging.CCMessageBroker;
import org.apache.asterix.messaging.NCMessageBroker;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -53,6 +52,11 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+/**
+ * The class is responsible for generating new credentials based on the adapter type. Given a request:
+ * - if we are the CC, generate new creds and ask all NCs to update their cache
+ * - if we are the NC, send a message to the CC to generate new creds
+ */
public class ExternalCredentialsCacheUpdater implements IExternalCredentialsCacheUpdater {
private static final Logger LOGGER = LogManager.getLogger();
@@ -66,49 +70,46 @@
public synchronized Object generateAndCacheCredentials(Map<String, String> configuration)
throws HyracksDataException, CompilationException {
IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
- String name = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
- Object credentials = cache.get(name);
+ String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
+ Object credentials = cache.get(key);
if (credentials != null) {
return credentials;
}
- /*
- * if we are the CC, generate new creds and ask all NCs to update their cache
- * if we are the NC, send a message to the CC to generate new creds and ask all NCs to update their cache
- */
- if (appCtx instanceof ICcApplicationContext ccAppCtx) {
- IClusterManagementWork.ClusterState state = ccAppCtx.getClusterStateManager().getState();
- if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) {
- throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, state);
- }
+ String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
+ if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(type)) {
+ credentials = generateAwsCredentials(configuration);
+ }
- String accessKeyId;
- String secretAccessKey;
- String sessionToken;
- Map<String, String> credentialsMap = new HashMap<>();
+ return credentials;
+ }
+
+ // TODO: this can probably be refactored out into something that is AWS-specific
+ private Object generateAwsCredentials(Map<String, String> configuration)
+ throws HyracksDataException, CompilationException {
+ String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
+ AwsSessionCredentials credentials;
+ if (appCtx instanceof ICcApplicationContext) {
+ validateClusterState();
try {
- LOGGER.info("attempting to update credentials for {}", name);
+ LOGGER.info("attempting to update AWS credentials for {}", key);
AwsCredentialsProvider newCredentials = S3AuthUtils.assumeRoleAndGetCredentials(configuration);
- LOGGER.info("updated credentials successfully for {}", name);
- AwsSessionCredentials sessionCredentials = (AwsSessionCredentials) newCredentials.resolveCredentials();
- accessKeyId = sessionCredentials.accessKeyId();
- secretAccessKey = sessionCredentials.secretAccessKey();
- sessionToken = sessionCredentials.sessionToken();
+ LOGGER.info("updated AWS credentials successfully for {}", key);
+ credentials = (AwsSessionCredentials) newCredentials.resolveCredentials();
+ appCtx.getExternalCredentialsCache().put(key, credentials);
} catch (CompilationException ex) {
- LOGGER.info("failed to refresh credentials for {}", name, ex);
+ LOGGER.info("failed to refresh AWS credentials for {}", key, ex);
throw ex;
}
- // credentials need refreshing
- credentialsMap.put(S3Constants.ACCESS_KEY_ID_FIELD_NAME, accessKeyId);
- credentialsMap.put(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME, secretAccessKey);
- credentialsMap.put(S3Constants.SESSION_TOKEN_FIELD_NAME, sessionToken);
+ String accessKeyId = credentials.accessKeyId();
+ String secretAccessKey = credentials.secretAccessKey();
+ String sessionToken = credentials.sessionToken();
+ UpdateAwsCredentialsCacheRequest request =
+ new UpdateAwsCredentialsCacheRequest(configuration, accessKeyId, secretAccessKey, sessionToken);
// request all NCs to update their credentials cache with the latest creds
- updateNcsCredentialsCache(ccAppCtx, name, credentialsMap, configuration);
- String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
- cache.put(name, type, credentialsMap);
- credentials = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken);
+ updateNcsCredentialsCache(key, request);
} else {
NCMessageBroker broker = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
MessageFuture messageFuture = broker.registerMessageFuture();
@@ -116,7 +117,7 @@
long futureId = messageFuture.getFutureId();
RefreshAwsCredentialsRequest request = new RefreshAwsCredentialsRequest(nodeId, futureId, configuration);
try {
- LOGGER.info("no valid credentials found for {}, requesting credentials from CC", name);
+ LOGGER.info("no valid AWS credentials found for {}, requesting AWS credentials from CC", key);
broker.sendMessageToPrimaryCC(request);
RefreshAwsCredentialsResponse response = (RefreshAwsCredentialsResponse) messageFuture
.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
@@ -126,7 +127,7 @@
credentials = AwsSessionCredentials.create(response.getAccessKeyId(), response.getSecretAccessKey(),
response.getSessionToken());
} catch (Exception ex) {
- LOGGER.info("failed to refresh credentials for {}", name, ex);
+ LOGGER.info("failed to refresh AWS credentials for {}", key, ex);
throw HyracksDataException.create(ex);
} finally {
broker.deregisterMessageFuture(futureId);
@@ -135,14 +136,12 @@
return credentials;
}
- private void updateNcsCredentialsCache(ICcApplicationContext appCtx, String name, Map<String, String> credentials,
- Map<String, String> configuration) throws HyracksDataException {
- final List<String> ncs = new ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes());
+ private void updateNcsCredentialsCache(String key, INcAddressedMessage request) throws HyracksDataException {
+ ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx;
+ final List<String> ncs = new ArrayList<>(ccAppCtx.getClusterStateManager().getParticipantNodes());
CCMessageBroker broker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
- UpdateAwsCredentialsCacheRequest request = new UpdateAwsCredentialsCacheRequest(configuration, credentials);
-
try {
- LOGGER.info("requesting all NCs to update their credentials for {}", name);
+ LOGGER.info("requesting all NCs to update their credentials for {}", key);
for (String nc : ncs) {
broker.sendApplicationMessageToNC(request, nc);
}
@@ -151,4 +150,12 @@
throw HyracksDataException.create(e);
}
}
+
+ private void validateClusterState() throws HyracksDataException {
+ ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx;
+ IClusterManagementWork.ClusterState state = ccAppCtx.getClusterStateManager().getState();
+ if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) {
+ throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, state);
+ }
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
index 438e425..753dbf1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
@@ -23,26 +23,31 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
public class UpdateAwsCredentialsCacheRequest implements INcAddressedMessage {
- private static final Logger LOGGER = LogManager.getLogger();
private static final long serialVersionUID = 1L;
private final Map<String, String> configuration;
- private final Map<String, String> credentials;
+ private final String accessKeyId;
+ private final String secretAccessKey;
+ private final String sessionToken;
- public UpdateAwsCredentialsCacheRequest(Map<String, String> configuration, Map<String, String> credentials) {
+ public UpdateAwsCredentialsCacheRequest(Map<String, String> configuration, String accessKeyId,
+ String secretAccessKey, String sessionToken) {
this.configuration = configuration;
- this.credentials = credentials;
+ this.accessKeyId = accessKeyId;
+ this.secretAccessKey = secretAccessKey;
+ this.sessionToken = sessionToken;
+
}
@Override
public void handle(INcApplicationContext appCtx) {
String name = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
- String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
- appCtx.getExternalCredentialsCache().put(name, type, credentials);
+ AwsSessionCredentials credentials = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken);
+ appCtx.getExternalCredentialsCache().put(name, credentials);
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 97bc3cd..8ba9a02 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -579,17 +579,11 @@
// async queries are completed after their job completes
if (ResultDelivery.ASYNC != resultDelivery) {
appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid());
- postRequestCompleteCleanup(requestParameters);
}
Thread.currentThread().setName(threadName);
}
}
- protected void postRequestCompleteCleanup(IRequestParameters requestParameters) {
- String uuid = requestParameters.getRequestReference().getUuid();
- appCtx.getExternalCredentialsCache().delete(uuid);
- }
-
protected void configureMetadataProvider(MetadataProvider metadataProvider, Map<String, String> config,
Counter resultSetIdCounter, FileSplit outputFile, IRequestParameters requestParameters,
Statement statement) {
@@ -1038,12 +1032,8 @@
ExternalDataUtils.normalize(properties);
ExternalDataUtils.validate(properties);
ExternalDataUtils.validateType(properties, (ARecordType) itemType);
- Map<String, String> propertiesCopy = preparePropertiesCopyForValidation(externalDetails, properties,
- dd.getSourceLocation(), mdTxnCtx, appCtx, metadataProvider);
- // do any necessary validation on the copy to avoid changing the original and storing it in the metadata
- metadataProvider.setExternalEntityIdFromParts(propertiesCopy, databaseName, dataverseName,
- datasetName, false);
- validateAdapterSpecificProperties(propertiesCopy, dd.getSourceLocation(), appCtx);
+ validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx,
+ appCtx, metadataProvider);
datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
TransactionState.COMMIT);
break;
@@ -2459,7 +2449,6 @@
sourceLoc, EnumSet.of(DropOption.IF_EXISTS), requestParameters.isForceDropDataset());
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
- deleteDatasetCachedCredentials(ds);
return true;
} catch (Exception e) {
LOGGER.error("failed to drop dataset; executing compensating operations", e);
@@ -2507,10 +2496,6 @@
}
}
- protected void deleteDatasetCachedCredentials(Dataset dataset) throws CompilationException {
- appCtx.getExternalCredentialsCache().delete(dataset.getDatasetFullyQualifiedName().toString());
- }
-
protected void handleIndexDropStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
@@ -4057,11 +4042,8 @@
copyStmt, itemType, mdTxnCtx, metadataProvider);
ExternalDataUtils.normalize(properties);
ExternalDataUtils.validate(properties);
- Map<String, String> propertiesCopy = preparePropertiesCopyForValidation(externalDetails, properties,
- copyStmt.getSourceLocation(), mdTxnCtx, appCtx, metadataProvider);
- // do any necessary validation on the copy to avoid changing the original and storing it in the metadata
- metadataProvider.setExternalEntityId(propertiesCopy, dataset);
- validateAdapterSpecificProperties(propertiesCopy, copyStmt.getSourceLocation(), appCtx);
+ validateExternalDatasetProperties(externalDetails, properties, copyStmt.getSourceLocation(), mdTxnCtx,
+ appCtx, metadataProvider);
CompiledCopyFromFileStatement cls = new CompiledCopyFromFileStatement(databaseName, dataverseName,
copyStmt.getDatasetName(), itemType, externalDetails.getAdapter(), properties);
cls.setSourceLocation(stmt.getSourceLocation());
@@ -4118,9 +4100,6 @@
ResultMetadata outMetadata, IRequestParameters requestParameters, Map<String, IAObject> stmtParams,
Stats stats) throws Exception {
CopyToStatement copyTo = (CopyToStatement) stmt;
- Namespace namespace = copyTo.getNamespace();
- DataverseName dataverseName = namespace.getDataverseName();
- String databaseName = namespace.getDatabaseName();
final IRequestTracker requestTracker = appCtx.getRequestTracker();
final ClientRequest clientRequest =
(ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
@@ -4149,14 +4128,9 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
ExternalDetailsDecl edd = copyTo.getExternalDetailsDecl();
- Map<String, String> properties = createAndValidateAdapterConfigurationForCopyToStmt(edd,
+ edd.setProperties(createAndValidateAdapterConfigurationForCopyToStmt(edd,
ExternalDataConstants.WRITER_SUPPORTED_ADAPTERS, copyTo.getSourceLocation(), mdTxnCtx,
- metadataProvider);
-
- // request id is used to cache credentials if needed, and clear them after request is done
- String uuid = requestParameters.getRequestReference().getUuid();
- metadataProvider.setExternalEntityIdFromParts(properties, databaseName, dataverseName, uuid, true);
- edd.setProperties(properties);
+ metadataProvider));
if (ExternalDataConstants.FORMAT_PARQUET
.equalsIgnoreCase(edd.getProperties().get(ExternalDataConstants.KEY_FORMAT))) {
@@ -5585,7 +5559,6 @@
// complete async jobs after their job completes
if (ResultDelivery.ASYNC == resultDelivery) {
requestTracker.complete(clientRequest.getId());
- postRequestCompleteCleanup(requestParameters);
}
locker.unlock();
}
@@ -5802,8 +5775,7 @@
* @param details external details
* @param sourceLoc source location
*/
- protected void normalizeAdapters(ExternalDetailsDecl details, SourceLocation sourceLoc)
- throws CompilationException {
+ private void normalizeAdapters(ExternalDetailsDecl details, SourceLocation sourceLoc) throws CompilationException {
String adapter = details.getAdapter();
Optional<String> normalizedAdapter =
getSupportedAdapters().stream().filter(k -> k.equalsIgnoreCase(adapter)).findFirst();
@@ -5817,15 +5789,17 @@
return ExternalDataConstants.EXTERNAL_READ_ADAPTERS;
}
- protected Map<String, String> preparePropertiesCopyForValidation(ExternalDetailsDecl externalDetails,
+ protected void validateExternalDatasetProperties(ExternalDetailsDecl externalDetails,
Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx,
IApplicationContext appCtx, MetadataProvider metadataProvider)
throws AlgebricksException, HyracksDataException {
+ // Validate adapter specific properties
normalizeAdapters(externalDetails, srcLoc);
String adapter = externalDetails.getAdapter();
Map<String, String> details = new HashMap<>(properties);
details.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter);
- return details;
+ metadataProvider.setExternalEntityId(details);
+ validateAdapterSpecificProperties(details, srcLoc, appCtx);
}
protected Map<String, String> createAndValidateAdapterConfigurationForCopyToStmt(
@@ -5835,6 +5809,7 @@
String adapterName = externalDetailsDecl.getAdapter();
Map<String, String> properties = externalDetailsDecl.getProperties();
properties.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapterName);
+ md.setExternalEntityId(properties);
WriterValidationUtil.validateWriterConfiguration(adapterName, supportedAdapters, properties, sourceLocation);
return properties;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
index 3a9ae1c..689ff16 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.common.external;
-import java.util.Map;
-
public interface IExternalCredentialsCache {
/**
@@ -41,8 +39,7 @@
* Updates the credentials cache with the provided credentials for the specified name
*
* @param key credentials key
- * @param type credentials type
* @param credentials credentials to cache
*/
- void put(String key, String type, Map<String, String> credentials);
+ void put(String key, Object credentials);
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 6f3a9b6..20299d5 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -32,6 +32,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.asterix.common.api.INamespaceResolver;
@@ -48,7 +49,6 @@
import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.metadata.DatasetFullyQualifiedName;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.metadata.LockList;
import org.apache.asterix.common.metadata.MetadataConstants;
@@ -999,7 +999,8 @@
configuration.put(ExternalDataConstants.KEY_DATASET_DATABASE, dataset.getDatabaseName());
configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE,
dataset.getDataverseName().getCanonicalForm());
- setExternalEntityId(configuration, dataset);
+ setExternalEntityId(configuration);
+ setSourceType(configuration, adapterName);
return AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(), adapterName,
configuration, itemType, null, warningCollector, filterEvaluatorFactory);
} catch (Exception e) {
@@ -1007,18 +1008,12 @@
}
}
- public void setExternalEntityId(Map<String, String> configuration, Dataset dataset) throws AlgebricksException {
- configuration.put(ExternalDataConstants.KEY_ENTITY_ID, dataset.getDatasetFullyQualifiedName().toString());
+ protected void setSourceType(Map<String, String> configuration, String adapterName) {
+ configuration.putIfAbsent(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapterName);
}
- public void setExternalEntityIdFromParts(Map<String, String> configuration, String database,
- DataverseName dataverse, String dataset, boolean isUuid) throws AlgebricksException {
- if (isUuid) {
- configuration.put(ExternalDataConstants.KEY_ENTITY_ID, dataset);
- } else {
- DatasetFullyQualifiedName fqn = new DatasetFullyQualifiedName(database, dataverse, dataset);
- configuration.put(ExternalDataConstants.KEY_ENTITY_ID, fqn.toString());
- }
+ public void setExternalEntityId(Map<String, String> configuration) throws AlgebricksException {
+ configuration.put(ExternalDataConstants.KEY_ENTITY_ID, UUID.randomUUID().toString());
}
public TxnId getTxnId() {