[ASTERIXDB-3514][EXT]: Assume role only when temporary credentials expire
- user model changes: no
- storage format changes: no
- interface changes: yes
When using temporary credentials from assuming
a role, cache the credentials and only refresh
them when they expired.
Ext-ref: MB-63505
Change-Id: I622853b794f81cd4bda84964a6cf7041d889d20f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19067
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index 6ea7aeb..e892d04 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -24,6 +24,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
+import org.apache.asterix.app.external.ExternalCredentialsCache;
+import org.apache.asterix.app.external.ExternalCredentialsCacheUpdater;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.api.IConfigValidator;
import org.apache.asterix.common.api.IConfigValidatorFactory;
@@ -55,6 +57,8 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
import org.apache.asterix.common.external.IAdapterFactoryService;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
@@ -127,6 +131,8 @@
private final IOManager ioManager;
private final INamespacePathResolver namespacePathResolver;
private final INamespaceResolver namespaceResolver;
+ private final IExternalCredentialsCache externalCredentialsCache;
+ private final IExternalCredentialsCacheUpdater externalCredentialsCacheUpdater;
public CcApplicationContext(ICCServiceContext ccServiceCtx, HyracksConnection hcc,
Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
@@ -177,6 +183,8 @@
this.globalTxManager = globalTxManager;
this.ioManager = ioManager;
dataPartitioningProvider = DataPartitioningProvider.create(this);
+ externalCredentialsCache = new ExternalCredentialsCache();
+ externalCredentialsCacheUpdater = new ExternalCredentialsCacheUpdater(this);
}
@Override
@@ -415,4 +423,14 @@
public IOManager getIoManager() {
return ioManager;
}
+
+ @Override
+ public IExternalCredentialsCache getExternalCredentialsCache() {
+ return externalCredentialsCache;
+ }
+
+ @Override
+ public IExternalCredentialsCacheUpdater getExternalCredentialsCacheUpdater() {
+ return externalCredentialsCacheUpdater;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
new file mode 100644
index 0000000..0ddca4e
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.metadata.MetadataConstants;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.util.Span;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+
+public class ExternalCredentialsCache implements IExternalCredentialsCache {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final ConcurrentMap<String, Pair<Span, Object>> cache = new ConcurrentHashMap<>();
+
+ public ExternalCredentialsCache() {
+ }
+
+ @Override
+ public synchronized Object getCredentials(Map<String, String> configuration) {
+ String name = getName(configuration);
+ if (cache.containsKey(name) && !needsRefresh(cache.get(name).getLeft())) {
+ return cache.get(name).getRight();
+ }
+ return null;
+ }
+
+ @Override
+ public synchronized void updateCache(Map<String, String> configuration, Map<String, String> credentials) {
+ String type = configuration.get(ExternalDataConstants.KEY_READER);
+ if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equalsIgnoreCase(type)) {
+ updateAwsCache(configuration, credentials);
+ }
+ }
+
+ @Override
+ public String getName(Map<String, String> configuration) {
+ String database = configuration.get(ExternalDataConstants.KEY_DATASET_DATABASE);
+ if (database == null) {
+ database = MetadataConstants.DEFAULT_DATABASE;
+ }
+ String dataverse = configuration.get(ExternalDataConstants.KEY_DATASET_DATAVERSE);
+ String dataset = configuration.get(ExternalDataConstants.KEY_DATASET);
+ return String.join(".", database, dataverse, dataset);
+ }
+
+ private void updateAwsCache(Map<String, String> configuration, Map<String, String> credentials) {
+ String accessKeyId = credentials.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey = credentials.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
+ String sessionToken = credentials.get(S3Constants.SESSION_TOKEN_FIELD_NAME);
+ doUpdateAwsCache(configuration, AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
+ }
+
+ private void doUpdateAwsCache(Map<String, String> configuration, AwsSessionCredentials credentials) {
+ // TODO(htowaileb): Set default expiration value
+ String name = getName(configuration);
+ cache.put(name, Pair.of(Span.start(15, TimeUnit.MINUTES), credentials));
+ LOGGER.info("Received and cached new credentials for {}", name);
+ }
+
+ /**
+ * Refresh if the remaining time is half or less than the total expiration time
+ *
+ * @param span expiration span
+ * @return true if the remaining time is half or less than the total expiration time, false otherwise
+ */
+ private boolean needsRefresh(Span span) {
+ // TODO(htowaileb): At what % (and should be configurable?) do we decide it's better to refresh credentials
+ return (double) span.remaining(TimeUnit.MINUTES) / span.getSpan(TimeUnit.MINUTES) < 0.5;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
new file mode 100644
index 0000000..f07caaa
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.external;
+
+import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
+import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.ACTIVE;
+import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.REBALANCE_REQUIRED;
+import static org.apache.asterix.common.exceptions.ErrorCode.REJECT_BAD_CLUSTER_STATE;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.app.message.RefreshAwsCredentialsRequest;
+import org.apache.asterix.app.message.RefreshAwsCredentialsResponse;
+import org.apache.asterix.app.message.UpdateAwsCredentialsCacheRequest;
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+
+public class ExternalCredentialsCacheUpdater implements IExternalCredentialsCacheUpdater {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final IApplicationContext appCtx;
+
+ public ExternalCredentialsCacheUpdater(IApplicationContext appCtx) {
+ this.appCtx = appCtx;
+ }
+
+ @Override
+ public synchronized Object generateAndCacheCredentials(Map<String, String> configuration)
+ throws HyracksDataException, CompilationException {
+ IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
+ Object credentials = cache.getCredentials(configuration);
+ if (credentials != null) {
+ return credentials;
+ }
+
+ /*
+ * if we are the CC, generate new creds and ask all NCs to update their cache
+ * if we are the NC, send a message to the CC to generate new creds and ask all NCs to update their cache
+ */
+ String name = cache.getName(configuration);
+ if (appCtx instanceof ICcApplicationContext) {
+ ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx;
+ IClusterManagementWork.ClusterState state = ccAppCtx.getClusterStateManager().getState();
+ if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) {
+ throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, state);
+ }
+
+ String accessKeyId;
+ String secretAccessKey;
+ String sessionToken;
+ Map<String, String> credentialsMap = new HashMap<>();
+ try {
+ LOGGER.info("attempting to update credentials for {}", name);
+ AwsCredentialsProvider newCredentials = S3AuthUtils.assumeRoleAndGetCredentials(configuration);
+ LOGGER.info("updated credentials successfully for {}", name);
+ AwsSessionCredentials sessionCredentials = (AwsSessionCredentials) newCredentials.resolveCredentials();
+ accessKeyId = sessionCredentials.accessKeyId();
+ secretAccessKey = sessionCredentials.secretAccessKey();
+ sessionToken = sessionCredentials.sessionToken();
+ } catch (CompilationException ex) {
+ LOGGER.info("failed to refresh credentials for {}", name, ex);
+ throw ex;
+ }
+
+ // credentials need refreshing
+ credentialsMap.put(S3Constants.ACCESS_KEY_ID_FIELD_NAME, accessKeyId);
+ credentialsMap.put(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME, secretAccessKey);
+ credentialsMap.put(S3Constants.SESSION_TOKEN_FIELD_NAME, sessionToken);
+
+ // request all NCs to update their credentials cache with the latest creds
+ updateNcsCredentialsCache(ccAppCtx, name, credentialsMap, configuration);
+ cache.updateCache(configuration, credentialsMap);
+ credentials = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken);
+ } else {
+ NCMessageBroker broker = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ MessageFuture messageFuture = broker.registerMessageFuture();
+ String nodeId = ((INCServiceContext) appCtx.getServiceContext()).getNodeId();
+ long futureId = messageFuture.getFutureId();
+ RefreshAwsCredentialsRequest request = new RefreshAwsCredentialsRequest(nodeId, futureId, configuration);
+ try {
+ LOGGER.info("no valid credentials found for {}, requesting credentials from CC", name);
+ broker.sendMessageToPrimaryCC(request);
+ RefreshAwsCredentialsResponse response = (RefreshAwsCredentialsResponse) messageFuture
+ .get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ if (response.getFailure() != null) {
+ throw HyracksDataException.create(response.getFailure());
+ }
+ credentials = AwsSessionCredentials.create(response.getAccessKeyId(), response.getSecretAccessKey(),
+ response.getSessionToken());
+ } catch (Exception ex) {
+ LOGGER.info("failed to refresh credentials for {}", name, ex);
+ throw HyracksDataException.create(ex);
+ } finally {
+ broker.deregisterMessageFuture(futureId);
+ }
+ }
+ return credentials;
+ }
+
+ private void updateNcsCredentialsCache(ICcApplicationContext appCtx, String name, Map<String, String> credentials,
+ Map<String, String> configuration) throws HyracksDataException {
+ final List<String> ncs = new ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes());
+ CCMessageBroker broker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ UpdateAwsCredentialsCacheRequest request = new UpdateAwsCredentialsCacheRequest(configuration, credentials);
+
+ try {
+ LOGGER.info("requesting all NCs to update their credentials for {}", name);
+ for (String nc : ncs) {
+ broker.sendApplicationMessageToNC(request, nc);
+ }
+ } catch (Exception e) {
+ LOGGER.info("failed to send message to nc", e);
+ throw HyracksDataException.create(e);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsRequest.java
new file mode 100644
index 0000000..32de92d
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsRequest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.Map;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+
+public class RefreshAwsCredentialsRequest implements ICcAddressedMessage {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private final String nodeId;
+ private final long reqId;
+ private final Map<String, String> configuration;
+
+ public RefreshAwsCredentialsRequest(String nodeId, long reqId, Map<String, String> configuration) {
+ this.nodeId = nodeId;
+ this.reqId = reqId;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public final void handle(ICcApplicationContext appCtx) throws HyracksDataException {
+ try {
+ IExternalCredentialsCacheUpdater cacheUpdater = appCtx.getExternalCredentialsCacheUpdater();
+ Object credentials = cacheUpdater.generateAndCacheCredentials(configuration);
+ AwsSessionCredentials sessionCredentials = (AwsSessionCredentials) credentials;
+
+ // respond with the credentials
+ RefreshAwsCredentialsResponse response =
+ new RefreshAwsCredentialsResponse(reqId, sessionCredentials.accessKeyId(),
+ sessionCredentials.secretAccessKey(), sessionCredentials.sessionToken(), null);
+ respond(appCtx, response);
+ } catch (Exception e) {
+ LOGGER.info("failed to refresh credentials", e);
+ RefreshAwsCredentialsResponse response = new RefreshAwsCredentialsResponse(reqId, null, null, null, e);
+ respond(appCtx, response);
+ }
+ }
+
+ private void respond(ICcApplicationContext appCtx, RefreshAwsCredentialsResponse response)
+ throws HyracksDataException {
+ CCMessageBroker broker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ try {
+ broker.sendApplicationMessageToNC(response, nodeId);
+ } catch (Exception e) {
+ LOGGER.info("failed to send reply to nc", e);
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public boolean isWhispered() {
+ return true;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsResponse.java
new file mode 100644
index 0000000..9ea0e11
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsResponse.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+
+public class RefreshAwsCredentialsResponse implements INcAddressedMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final long reqId;
+ private final String accessKeyId;
+ private final String secretAccessKey;
+ private final String sessionToken;
+ private final Throwable failure;
+
+ public RefreshAwsCredentialsResponse(long reqId, String accessKeyId, String secretAccessKey, String sessionToken,
+ Throwable failure) {
+ this.reqId = reqId;
+ this.accessKeyId = accessKeyId;
+ this.secretAccessKey = secretAccessKey;
+ this.sessionToken = sessionToken;
+ this.failure = failure;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) {
+ NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ MessageFuture future = mb.deregisterMessageFuture(reqId);
+ if (future != null) {
+ future.complete(this);
+ }
+ }
+
+ public String getAccessKeyId() {
+ return accessKeyId;
+ }
+
+ public String getSecretAccessKey() {
+ return secretAccessKey;
+ }
+
+ public String getSessionToken() {
+ return sessionToken;
+ }
+
+ public Throwable getFailure() {
+ return failure;
+ }
+
+ @Override
+ public boolean isWhispered() {
+ return true;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
new file mode 100644
index 0000000..44d4c21
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.Map;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+
+public class UpdateAwsCredentialsCacheRequest implements INcAddressedMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final Map<String, String> configuration;
+ private final Map<String, String> credentials;
+
+ public UpdateAwsCredentialsCacheRequest(Map<String, String> configuration, Map<String, String> credentials) {
+ this.configuration = configuration;
+ this.credentials = credentials;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) {
+ appCtx.getExternalCredentialsCache().updateCache(configuration, credentials);
+ }
+
+ @Override
+ public boolean isWhispered() {
+ return true;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 7da3838..8c2a0ab 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -32,6 +32,8 @@
import java.util.concurrent.ExecutorService;
import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.app.external.ExternalCredentialsCache;
+import org.apache.asterix.app.external.ExternalCredentialsCacheUpdater;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.cloud.CloudConfigurator;
import org.apache.asterix.cloud.LocalPartitionBootstrapper;
@@ -63,6 +65,8 @@
import org.apache.asterix.common.context.DiskWriteRateLimiterProvider;
import org.apache.asterix.common.context.GlobalVirtualBufferCache;
import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
@@ -186,6 +190,8 @@
private final INamespacePathResolver namespacePathResolver;
private final INamespaceResolver namespaceResolver;
private IDiskCacheMonitoringService diskCacheService;
+ protected IExternalCredentialsCache externalCredentialsCache;
+ protected IExternalCredentialsCacheUpdater externalCredentialsCacheUpdater;
public NCAppRuntimeContext(INCServiceContext ncServiceContext, NCExtensionManager extensionManager,
IPropertiesFactory propertiesFactory, INamespaceResolver namespaceResolver,
@@ -210,6 +216,8 @@
cacheManager = new CacheManager();
this.namespacePathResolver = namespacePathResolver;
this.namespaceResolver = namespaceResolver;
+ this.externalCredentialsCache = new ExternalCredentialsCache();
+ this.externalCredentialsCacheUpdater = new ExternalCredentialsCacheUpdater(this);
}
@Override
@@ -748,4 +756,14 @@
return isCloudDeployment() ? storageProperties.getStoragePartitionsCount()
: ncServiceContext.getIoManager().getIODevices().size();
}
+
+ @Override
+ public IExternalCredentialsCache getExternalCredentialsCache() {
+ return externalCredentialsCache;
+ }
+
+ @Override
+ public IExternalCredentialsCacheUpdater getExternalCredentialsCacheUpdater() {
+ return externalCredentialsCacheUpdater;
+ }
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
index 589ee79..198b3ad 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
@@ -30,6 +30,7 @@
import org.apache.asterix.cloud.WriterSingleBufferProvider;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.clients.ICloudWriter;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -60,17 +61,17 @@
writeBufferSize = externalConfig.getWriteBufferSize();
}
- abstract ICloudClient createCloudClient() throws CompilationException;
+ abstract ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException;
abstract boolean isNoContainerFoundException(IOException e);
abstract boolean isSdkException(Throwable e);
- final void buildClient() throws HyracksDataException {
+ final void buildClient(IApplicationContext appCtx) throws HyracksDataException {
try {
synchronized (this) {
if (cloudClient == null) {
- cloudClient = createCloudClient();
+ cloudClient = createCloudClient(appCtx);
}
}
} catch (CompilationException e) {
@@ -79,8 +80,8 @@
}
@Override
- public final void validate() throws AlgebricksException {
- ICloudClient testClient = createCloudClient();
+ public final void validate(IApplicationContext appCtx) throws AlgebricksException {
+ ICloudClient testClient = createCloudClient(appCtx);
String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
if (bucket == null || bucket.isEmpty()) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 886f20d..63a8366 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -24,6 +24,7 @@
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.google.gcs.GCSUtils;
@@ -61,7 +62,7 @@
}
@Override
- ICloudClient createCloudClient() throws CompilationException {
+ ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException {
GCSClientConfig config = GCSClientConfig.of(configuration, writeBufferSize);
return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
@@ -80,7 +81,7 @@
@Override
public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
throws HyracksDataException {
- buildClient();
+ buildClient(((IApplicationContext) context.getJobletContext().getServiceContext().getApplicationContext()));
String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
IExternalPrinter printer = printerFactory.createPrinter();
IWarningCollector warningCollector = context.getWarningCollector();
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
index e07acc0..d268efd 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java
@@ -24,9 +24,10 @@
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
import org.apache.asterix.runtime.writer.IExternalFileWriter;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
@@ -61,9 +62,9 @@
}
@Override
- ICloudClient createCloudClient() throws CompilationException {
+ ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException {
S3ClientConfig config = S3ClientConfig.of(configuration, writeBufferSize);
- return new S3CloudClient(config, S3Utils.buildAwsS3Client(configuration),
+ return new S3CloudClient(config, S3AuthUtils.buildAwsS3Client(appCtx, configuration),
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
}
@@ -80,7 +81,7 @@
@Override
public IExternalFileWriter createWriter(IHyracksTaskContext context, IExternalPrinterFactory printerFactory)
throws HyracksDataException {
- buildClient();
+ buildClient(((IApplicationContext) context.getJobletContext().getServiceContext().getApplicationContext()));
String bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
IExternalPrinter printer = printerFactory.createPrinter();
IWarningCollector warningCollector = context.getWarningCollector();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index eabebf7..19e4ad7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -29,6 +29,8 @@
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.config.TransactionProperties;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -106,4 +108,14 @@
INamespaceResolver getNamespaceResolver();
INamespacePathResolver getNamespacePathResolver();
+
+ /**
+ * @return external credentials cache
+ */
+ IExternalCredentialsCache getExternalCredentialsCache();
+
+ /**
+ * @return external credentials cache updater
+ */
+ IExternalCredentialsCacheUpdater getExternalCredentialsCacheUpdater();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 0cf6eb2..35e0699 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -314,6 +314,8 @@
MAXIMUM_VALUE_ALLOWED_FOR_PARAM(1209),
STORAGE_SIZE_NOT_APPLICABLE_TO_TYPE(1210),
COULD_NOT_CREATE_TOKENS(1211),
+ NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION(1212),
+ FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION(1213),
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
new file mode 100644
index 0000000..245b350
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import java.util.Map;
+
+public interface IExternalCredentialsCache {
+
+ /**
+ * Returns the cached credentials. Can be of any supported external credentials type
+ *
+ * @param configuration configuration containing external collection details
+ * @return credentials if present, null otherwise
+ */
+ Object getCredentials(Map<String, String> configuration);
+
+ /**
+ * Updates the credentials cache with the provided credentials for the specified name
+ *
+ * @param configuration configuration containing external collection details
+ * @param credentials credentials to cache
+ */
+ void updateCache(Map<String, String> configuration, Map<String, String> credentials);
+
+ /**
+ * Returns the name of the entity which the cached credentials belong to
+ *
+ * @param configuration configuration containing external collection details
+ * @return name of entity which credentials belong to
+ */
+ String getName(Map<String, String> configuration);
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCacheUpdater.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCacheUpdater.java
new file mode 100644
index 0000000..48553c0
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCacheUpdater.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IExternalCredentialsCacheUpdater {
+
+ /**
+ * Generates new credentials and caches them
+ *
+ * @param configuration configuration containing external collection details
+ */
+ Object generateAndCacheCredentials(Map<String, String> configuration)
+ throws HyracksDataException, CompilationException;
+}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index adb26fe..d15a751 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -316,6 +316,8 @@
1209 = Maximum value allowed for '%1$s' is %2$s. Found %3$s
1210 = Retrieving storage size is not applicable to type: %1$s.
1211 = Could not create delegation tokens
+1212 = No credentials found for cross-account authentication. Expected instance profile or access key id & secret access key for assuming role
+1213 = Failed to perform cross-account authentication. Encountered error : '%1$s'
# Feed Errors
3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 7a1bdad..138b364 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -26,13 +26,14 @@
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
@@ -48,14 +49,16 @@
public class AwsS3InputStream extends AbstractExternalInputStream {
// Configuration
+ private final IApplicationContext ncAppCtx;
private final String bucket;
private final S3Client s3Client;
private ResponseInputStream<?> s3InStream;
private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service
- public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths,
+ public AwsS3InputStream(IApplicationContext ncAppCtx, Map<String, String> configuration, List<String> filePaths,
IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException {
super(configuration, filePaths, valueEmbedder);
+ this.ncAppCtx = ncAppCtx;
this.s3Client = buildAwsS3Client(configuration);
this.bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}
@@ -113,7 +116,7 @@
}
private boolean shouldRetry(String errorCode, int currentRetry) {
- return currentRetry < MAX_RETRIES && S3Utils.isRetryableError(errorCode);
+ return currentRetry < MAX_RETRIES && S3AuthUtils.isRetryableError(errorCode);
}
@Override
@@ -141,7 +144,7 @@
private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException {
try {
- return S3Utils.buildAwsS3Client(configuration);
+ return S3AuthUtils.buildAwsS3Client(ncAppCtx, configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 36d21d1..e9c72e3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.PriorityQueue;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
@@ -47,8 +48,10 @@
public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder();
int partition = context.getPartition();
- return new AwsS3InputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(),
- valueEmbedder);
+ IApplicationContext ncAppCtx = (IApplicationContext) context.getTaskContext().getJobletContext()
+ .getServiceContext().getApplicationContext();
+ return new AwsS3InputStream(ncAppCtx, configuration,
+ partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(), valueEmbedder);
}
@Override
@@ -65,7 +68,8 @@
configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
// get the items
- List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector,
+ IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext();
+ List<S3Object> filesOnly = S3Utils.listS3Objects(appCtx, configuration, includeExcludeMatcher, warningCollector,
externalDataPrefix, evaluator);
// Distribute work load amongst the partitions
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index abed33a..7ddbab91 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.external.input.record.reader.aws.parquet;
-import static org.apache.asterix.external.util.aws.s3.S3Utils.configureAwsS3HdfsJobConf;
+import static org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
import static org.apache.asterix.external.util.aws.s3.S3Utils.listS3Objects;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.external.IExternalFilterEvaluator;
@@ -73,7 +74,8 @@
configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- List<S3Object> filesOnly = listS3Objects(configuration, includeExcludeMatcher, warningCollector,
+ IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
+ List<S3Object> filesOnly = listS3Objects(appCtx, configuration, includeExcludeMatcher, warningCollector,
externalDataPrefix, evaluator);
path = buildPathURIs(container, filesOnly);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index e758f64..1de2cd2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -39,6 +39,7 @@
// used to specify the stream factory for an adapter that has a stream data source
public static final String KEY_STREAM = "stream";
//TODO(DB): check adapter configuration
+ public static final String KEY_DATASET = "dataset";
public static final String KEY_DATASET_DATABASE = "dataset-database";
// used to specify the dataverse of the adapter
public static final String KEY_DATASET_DATAVERSE = "dataset-dataverse";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 8c72a8c..1d5f2ed 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -74,8 +74,8 @@
import org.apache.asterix.external.library.JavaLibrary;
import org.apache.asterix.external.library.msgpack.MessagePackUtils;
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
+import org.apache.asterix.external.util.aws.s3.S3AuthUtils;
import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.asterix.external.util.azure.blob_storage.AzureConstants;
import org.apache.asterix.external.util.google.gcs.GCSConstants;
import org.apache.asterix.om.types.ARecordType;
@@ -675,7 +675,7 @@
switch (type) {
case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
- S3Utils.validateProperties(configuration, srcLoc, collector);
+ S3AuthUtils.validateProperties(appCtx, configuration, srcLoc, collector);
break;
case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
validateAzureBlobProperties(configuration, srcLoc, collector, appCtx);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
new file mode 100644
index 0000000..2ba1844
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.aws.s3;
+
+import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
+import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
+import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
+import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
+import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_METHOD_NOT_IMPLEMENTED;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_SLOW_DOWN;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.EXTERNAL_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ACCESS_KEY_ID;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS_ACCESS;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_CONNECTION_POOL_SIZE;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_PROTOCOL;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SECRET_ACCESS_KEY;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SERVICE_END_POINT;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SESSION_TOKEN;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMP_ACCESS;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.INSTANCE_PROFILE_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.REGION_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ROLE_ARN_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.SECRET_ACCESS_KEY_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.SESSION_TOKEN_FIELD_NAME;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalCredentialsCache;
+import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataPrefix;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.util.CleanupUtils;
+
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Response;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+public class S3AuthUtils {
+ private S3AuthUtils() {
+ throw new AssertionError("do not instantiate");
+ }
+
+ public static boolean isRetryableError(String errorCode) {
+ return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
+ }
+
+ /**
+ * Builds the S3 client using the provided configuration
+ *
+ * @param configuration properties
+ * @return S3 client
+ * @throws CompilationException CompilationException
+ */
+ public static S3Client buildAwsS3Client(IApplicationContext appCtx, Map<String, String> configuration)
+ throws CompilationException {
+ String regionId = configuration.get(REGION_FIELD_NAME);
+ String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
+
+ Region region = validateAndGetRegion(regionId);
+ AwsCredentialsProvider credentialsProvider = buildCredentialsProvider(appCtx, configuration);
+
+ S3ClientBuilder builder = S3Client.builder();
+ builder.region(region);
+ builder.crossRegionAccessEnabled(true);
+ builder.credentialsProvider(credentialsProvider);
+
+ // Validate the service endpoint if present
+ if (serviceEndpoint != null) {
+ try {
+ URI uri = new URI(serviceEndpoint);
+ try {
+ builder.endpointOverride(uri);
+ } catch (NullPointerException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+ }
+ } catch (URISyntaxException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
+ String.format("Invalid service endpoint %s", serviceEndpoint));
+ }
+ }
+
+ return builder.build();
+ }
+
+ public static AwsCredentialsProvider buildCredentialsProvider(IApplicationContext appCtx,
+ Map<String, String> configuration) throws CompilationException {
+ String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
+ String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
+ String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+ String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+
+ if (noAuth(configuration)) {
+ return AnonymousCredentialsProvider.create();
+ } else if (arnRole != null) {
+ return getTrustAccountCredentials(appCtx, configuration);
+ } else if (instanceProfile != null) {
+ return getInstanceProfileCredentials(configuration);
+ } else if (accessKeyId != null || secretAccessKey != null) {
+ return getAccessKeyCredentials(configuration);
+ } else {
+ if (externalId != null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
+ EXTERNAL_ID_FIELD_NAME);
+ } else {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
+ SESSION_TOKEN_FIELD_NAME);
+ }
+ }
+ }
+
+ public static Region validateAndGetRegion(String regionId) throws CompilationException {
+ List<Region> regions = S3Client.serviceMetadata().regions();
+ Optional<Region> selectedRegion = regions.stream().filter(region -> region.id().equals(regionId)).findFirst();
+
+ if (selectedRegion.isEmpty()) {
+ throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
+ }
+ return selectedRegion.get();
+ }
+
+ private static boolean noAuth(Map<String, String> configuration) {
+ return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME,
+ ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, SESSION_TOKEN_FIELD_NAME) == null;
+ }
+
+ /**
+ * Returns the cached credentials if valid, otherwise, generates new credentials by assume a role
+ *
+ * @param appCtx application context
+ * @param configuration configuration
+ * @return returns the cached credentials if valid, otherwise, generates new credentials by assume a role
+ * @throws CompilationException CompilationException
+ */
+ public static AwsCredentialsProvider getTrustAccountCredentials(IApplicationContext appCtx,
+ Map<String, String> configuration) throws CompilationException {
+ IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache();
+ Object credentialsObject = cache.getCredentials(configuration);
+ if (credentialsObject != null) {
+ return () -> (AwsSessionCredentials) credentialsObject;
+ }
+ IExternalCredentialsCacheUpdater cacheUpdater = appCtx.getExternalCredentialsCacheUpdater();
+ AwsSessionCredentials credentials;
+ try {
+ credentials = (AwsSessionCredentials) cacheUpdater.generateAndCacheCredentials(configuration);
+ } catch (HyracksDataException ex) {
+ throw new CompilationException(ErrorCode.FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION, ex, ex.getMessage());
+ }
+
+ return () -> credentials;
+ }
+
+ /**
+ * Assume role using provided credentials and return the new credentials
+ *
+ * @param configuration configuration
+ * @return return credentials from the assume role
+ * @throws CompilationException CompilationException
+ */
+ public static AwsCredentialsProvider assumeRoleAndGetCredentials(Map<String, String> configuration)
+ throws CompilationException {
+ String regionId = configuration.get(REGION_FIELD_NAME);
+ String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
+ String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
+ Region region = validateAndGetRegion(regionId);
+
+ AssumeRoleRequest.Builder builder = AssumeRoleRequest.builder();
+ builder.roleArn(arnRole);
+ builder.roleSessionName(UUID.randomUUID().toString());
+ builder.durationSeconds(900); // TODO(htowaileb): configurable? Can be 900 to 43200 (15 mins to 12 hours)
+ if (externalId != null) {
+ builder.externalId(externalId);
+ }
+
+ // credentials to be used to assume the role
+ AwsCredentialsProvider credentialsProvider;
+ AssumeRoleRequest request = builder.build();
+ String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+ String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+ if ("true".equalsIgnoreCase(instanceProfile)) {
+ credentialsProvider = getInstanceProfileCredentials(configuration, true);
+ } else if (accessKeyId != null && secretAccessKey != null) {
+ credentialsProvider = getAccessKeyCredentials(configuration, true);
+ } else {
+ throw new CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION);
+ }
+
+ // assume the role from the provided arn
+ try (StsClient stsClient =
+ StsClient.builder().region(region).credentialsProvider(credentialsProvider).build()) {
+ AssumeRoleResponse response = stsClient.assumeRole(request);
+ Credentials credentials = response.credentials();
+ return StaticCredentialsProvider.create(AwsSessionCredentials.create(credentials.accessKeyId(),
+ credentials.secretAccessKey(), credentials.sessionToken()));
+ } catch (SdkException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+ }
+ }
+
+ private static AwsCredentialsProvider getInstanceProfileCredentials(Map<String, String> configuration)
+ throws CompilationException {
+ return getInstanceProfileCredentials(configuration, false);
+ }
+
+ private static AwsCredentialsProvider getInstanceProfileCredentials(Map<String, String> configuration,
+ boolean assumeRoleAuthentication) throws CompilationException {
+ String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+
+ // only "true" value is allowed
+ if (!"true".equalsIgnoreCase(instanceProfile)) {
+ throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, INSTANCE_PROFILE_FIELD_NAME, "true");
+ }
+
+ if (!assumeRoleAuthentication) {
+ String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
+ SESSION_TOKEN_FIELD_NAME);
+ if (notAllowed != null) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
+ INSTANCE_PROFILE_FIELD_NAME);
+ }
+ }
+ return InstanceProfileCredentialsProvider.create();
+ }
+
+ private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, String> configuration)
+ throws CompilationException {
+ return getAccessKeyCredentials(configuration, false);
+ }
+
+ private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, String> configuration,
+ boolean assumeRoleAuthentication) throws CompilationException {
+ String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+ String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
+
+ if (accessKeyId == null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
+ SECRET_ACCESS_KEY_FIELD_NAME);
+ }
+ if (secretAccessKey == null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
+ ACCESS_KEY_ID_FIELD_NAME);
+ }
+
+ if (!assumeRoleAuthentication) {
+ String notAllowed = getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, EXTERNAL_ID_FIELD_NAME);
+ if (notAllowed != null) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
+ INSTANCE_PROFILE_FIELD_NAME);
+ }
+ }
+
+ // use session token if provided
+ if (sessionToken != null) {
+ return StaticCredentialsProvider
+ .create(AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
+ } else {
+ return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey));
+ }
+ }
+
+ private static String getNonNull(Map<String, String> configuration, String... fieldNames) {
+ for (String fieldName : fieldNames) {
+ if (configuration.get(fieldName) != null) {
+ return fieldName;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Builds the S3 client using the provided configuration
+ *
+ * @param configuration properties
+ * @param numberOfPartitions number of partitions in the cluster
+ */
+ public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
+ int numberOfPartitions) {
+ String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+ String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
+ String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
+
+ //Disable caching S3 FileSystem
+ HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
+
+ /*
+ * Authentication Methods:
+ * 1- Anonymous: no accessKeyId and no secretAccessKey
+ * 2- Temporary: has to provide accessKeyId, secretAccessKey and sessionToken
+ * 3- Private: has to provide accessKeyId and secretAccessKey
+ */
+ if (accessKeyId == null) {
+ //Tells hadoop-aws it is an anonymous access
+ conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS);
+ } else {
+ conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId);
+ conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey);
+ if (sessionToken != null) {
+ conf.set(HADOOP_SESSION_TOKEN, sessionToken);
+ //Tells hadoop-aws it is a temporary access
+ conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS);
+ }
+ }
+
+ /*
+ * This is to allow S3 definition to have path-style form. Should always be true to match the current
+ * way we access files in S3
+ */
+ conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
+
+ /*
+ * Set the size of S3 connection pool to be the number of partitions
+ */
+ conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
+
+ if (serviceEndpoint != null) {
+ // Validation of the URL should be done at hadoop-aws level
+ conf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
+ } else {
+ //Region is ignored and buckets could be found by the central endpoint
+ conf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
+ }
+ }
+
+ /**
+ * Validate external dataset properties
+ *
+ * @param configuration properties
+ * @throws CompilationException Compilation exception
+ */
+ public static void validateProperties(IApplicationContext appCtx, Map<String, String> configuration,
+ SourceLocation srcLoc, IWarningCollector collector) throws CompilationException {
+ if (isDeltaTable(configuration)) {
+ validateDeltaTableProperties(configuration);
+ }
+ // check if the format property is present
+ else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
+ }
+
+ String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
+ String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
+ String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+
+ if (arnRole != null) {
+ return;
+ } else if (externalId != null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
+ EXTERNAL_ID_FIELD_NAME);
+ } else if (accessKeyId == null || secretAccessKey == null) {
+ // If one is passed, the other is required
+ if (accessKeyId != null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
+ ACCESS_KEY_ID_FIELD_NAME);
+ } else if (secretAccessKey != null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
+ SECRET_ACCESS_KEY_FIELD_NAME);
+ }
+ }
+
+ validateIncludeExclude(configuration);
+ try {
+ // TODO(htowaileb): maybe something better, this will check to ensure type is supported before creation
+ new ExternalDataPrefix(configuration);
+ } catch (AlgebricksException ex) {
+ throw new CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex);
+ }
+
+ // Check if the bucket is present
+ S3Client s3Client = buildAwsS3Client(appCtx, configuration);
+ S3Response response;
+ boolean useOldApi = false;
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ String prefix = getPrefix(configuration);
+
+ try {
+ response = S3Utils.isBucketEmpty(s3Client, container, prefix, false);
+ } catch (S3Exception ex) {
+ // Method not implemented, try falling back to old API
+ try {
+ // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
+ if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
+ useOldApi = true;
+ response = S3Utils.isBucketEmpty(s3Client, container, prefix, true);
+ } else {
+ throw ex;
+ }
+ } catch (SdkException ex2) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2, getMessageOrToString(ex));
+ }
+ } catch (SdkException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+ } finally {
+ if (s3Client != null) {
+ CleanupUtils.close(s3Client, null);
+ }
+ }
+
+ boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty()
+ : ((ListObjectsV2Response) response).contents().isEmpty();
+ if (isEmpty && collector.shouldWarn()) {
+ Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+ collector.warn(warning);
+ }
+
+ // Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to
+ // ensure coverage, check if the result is successful as well and not only catch exceptions
+ if (!response.sdkHttpResponse().isSuccessful()) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 3cfccb4..a2b50e1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -18,48 +18,18 @@
*/
package org.apache.asterix.external.util.aws.s3;
-import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
-import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
-import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
-import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
-import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
-import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
-import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR;
import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_METHOD_NOT_IMPLEMENTED;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_SLOW_DOWN;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.EXTERNAL_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ACCESS_KEY_ID;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS_ACCESS;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_CONNECTION_POOL_SIZE;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_PROTOCOL;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SECRET_ACCESS_KEY;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SERVICE_END_POINT;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SESSION_TOKEN;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMP_ACCESS;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.INSTANCE_PROFILE_FIELD_NAME;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.REGION_FIELD_NAME;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.ROLE_ARN_FIELD_NAME;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.SECRET_ACCESS_KEY_FIELD_NAME;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.SESSION_TOKEN_FIELD_NAME;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.external.IExternalFilterEvaluator;
@@ -67,26 +37,13 @@
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.hadoop.fs.s3a.Constants;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.api.util.CleanupUtils;
-import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.exception.SdkException;
-import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.CommonPrefix;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
@@ -96,233 +53,12 @@
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.S3Response;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
-import software.amazon.awssdk.services.sts.StsClient;
-import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
-import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
-import software.amazon.awssdk.services.sts.model.Credentials;
public class S3Utils {
private S3Utils() {
throw new AssertionError("do not instantiate");
}
- public static boolean isRetryableError(String errorCode) {
- return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
- }
-
- /**
- * Builds the S3 client using the provided configuration
- *
- * @param configuration properties
- * @return S3 client
- * @throws CompilationException CompilationException
- */
- public static S3Client buildAwsS3Client(Map<String, String> configuration) throws CompilationException {
- String regionId = configuration.get(REGION_FIELD_NAME);
- String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
-
- Region region = validateAndGetRegion(regionId);
- AwsCredentialsProvider credentialsProvider = buildCredentialsProvider(configuration);
-
- S3ClientBuilder builder = S3Client.builder();
- builder.region(region);
- builder.credentialsProvider(credentialsProvider);
-
- // Validate the service endpoint if present
- if (serviceEndpoint != null) {
- try {
- URI uri = new URI(serviceEndpoint);
- try {
- builder.endpointOverride(uri);
- } catch (NullPointerException ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
- }
- } catch (URISyntaxException ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
- String.format("Invalid service endpoint %s", serviceEndpoint));
- }
- }
-
- return builder.build();
- }
-
- public static AwsCredentialsProvider buildCredentialsProvider(Map<String, String> configuration)
- throws CompilationException {
- String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
- String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
- String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-
- if (noAuth(configuration)) {
- return AnonymousCredentialsProvider.create();
- } else if (arnRole != null) {
- // TODO: Do auth validation and use existing credentials if exist already, if not, assume the role
- return validateAndGetTrustAccountAuthentication(configuration);
- } else if (instanceProfile != null) {
- return validateAndGetInstanceProfileAuthentication(configuration);
- } else if (accessKeyId != null || secretAccessKey != null) {
- return validateAndGetAccessKeysAuthentications(configuration);
- } else {
- if (externalId != null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
- EXTERNAL_ID_FIELD_NAME);
- } else {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
- SESSION_TOKEN_FIELD_NAME);
- }
- }
- }
-
- /**
- * Builds the S3 client using the provided configuration
- *
- * @param configuration properties
- * @param numberOfPartitions number of partitions in the cluster
- */
- public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
- int numberOfPartitions) {
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
- String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
- String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
-
- //Disable caching S3 FileSystem
- HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
-
- /*
- * Authentication Methods:
- * 1- Anonymous: no accessKeyId and no secretAccessKey
- * 2- Temporary: has to provide accessKeyId, secretAccessKey and sessionToken
- * 3- Private: has to provide accessKeyId and secretAccessKey
- */
- if (accessKeyId == null) {
- //Tells hadoop-aws it is an anonymous access
- conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS);
- } else {
- conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId);
- conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey);
- if (sessionToken != null) {
- conf.set(HADOOP_SESSION_TOKEN, sessionToken);
- //Tells hadoop-aws it is a temporary access
- conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS);
- }
- }
-
- /*
- * This is to allow S3 definition to have path-style form. Should always be true to match the current
- * way we access files in S3
- */
- conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
-
- /*
- * Set the size of S3 connection pool to be the number of partitions
- */
- conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
-
- if (serviceEndpoint != null) {
- // Validation of the URL should be done at hadoop-aws level
- conf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
- } else {
- //Region is ignored and buckets could be found by the central endpoint
- conf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
- }
- }
-
- /**
- * Validate external dataset properties
- *
- * @param configuration properties
- * @throws CompilationException Compilation exception
- */
- public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
- IWarningCollector collector) throws CompilationException {
- if (isDeltaTable(configuration)) {
- validateDeltaTableProperties(configuration);
- }
- // check if the format property is present
- else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
- throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
- }
-
- String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
- String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-
- if (arnRole != null) {
- String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
- SESSION_TOKEN_FIELD_NAME);
- if (notAllowed != null) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
- INSTANCE_PROFILE_FIELD_NAME);
- }
- } else if (externalId != null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
- EXTERNAL_ID_FIELD_NAME);
- } else if (accessKeyId == null || secretAccessKey == null) {
- // If one is passed, the other is required
- if (accessKeyId != null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
- ACCESS_KEY_ID_FIELD_NAME);
- } else if (secretAccessKey != null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
- SECRET_ACCESS_KEY_FIELD_NAME);
- }
- }
-
- validateIncludeExclude(configuration);
- try {
- // TODO(htowaileb): maybe something better, this will check to ensure type is supported before creation
- new ExternalDataPrefix(configuration);
- } catch (AlgebricksException ex) {
- throw new CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex);
- }
-
- // Check if the bucket is present
- S3Client s3Client = buildAwsS3Client(configuration);
- S3Response response;
- boolean useOldApi = false;
- String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- String prefix = getPrefix(configuration);
-
- try {
- response = isBucketEmpty(s3Client, container, prefix, false);
- } catch (S3Exception ex) {
- // Method not implemented, try falling back to old API
- try {
- // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
- if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
- useOldApi = true;
- response = isBucketEmpty(s3Client, container, prefix, true);
- } else {
- throw ex;
- }
- } catch (SdkException ex2) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2, getMessageOrToString(ex));
- }
- } catch (SdkException ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
- } finally {
- if (s3Client != null) {
- CleanupUtils.close(s3Client, null);
- }
- }
-
- boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty()
- : ((ListObjectsV2Response) response).contents().isEmpty();
- if (isEmpty && collector.shouldWarn()) {
- Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
- collector.warn(warning);
- }
-
- // Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to
- // ensure coverage, check if the result is successful as well and not only catch exceptions
- if (!response.sdkHttpResponse().isSuccessful()) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
- }
- }
-
/**
* Checks for a single object in the specified bucket to determine if the bucket is empty or not.
*
@@ -332,7 +68,7 @@
* @param useOldApi flag whether to use the old API or not
* @return returns the S3 response
*/
- private static S3Response isBucketEmpty(S3Client s3Client, String container, String prefix, boolean useOldApi) {
+ protected static S3Response isBucketEmpty(S3Client s3Client, String container, String prefix, boolean useOldApi) {
S3Response response;
if (useOldApi) {
ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder();
@@ -352,14 +88,14 @@
* @param configuration properties
* @param includeExcludeMatcher include/exclude matchers to apply
*/
- public static List<S3Object> listS3Objects(Map<String, String> configuration,
+ public static List<S3Object> listS3Objects(IApplicationContext appCtx, Map<String, String> configuration,
AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
IWarningCollector warningCollector, ExternalDataPrefix externalDataPrefix,
IExternalFilterEvaluator evaluator) throws CompilationException, HyracksDataException {
// Prepare to retrieve the objects
List<S3Object> filesOnly;
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- S3Client s3Client = buildAwsS3Client(configuration);
+ S3Client s3Client = S3AuthUtils.buildAwsS3Client(appCtx, configuration);
String prefix = getPrefix(configuration);
try {
@@ -502,10 +238,11 @@
}
}
- public static Map<String, List<String>> S3ObjectsOfSingleDepth(Map<String, String> configuration, String container,
- String prefix) throws CompilationException {
+ public static Map<String, List<String>> S3ObjectsOfSingleDepth(IApplicationContext appCtx,
+ Map<String, String> configuration, String container, String prefix)
+ throws CompilationException, HyracksDataException {
// create s3 client
- S3Client s3Client = buildAwsS3Client(configuration);
+ S3Client s3Client = S3AuthUtils.buildAwsS3Client(appCtx, configuration);
// fetch all the s3 objects
return listS3ObjectsOfSingleDepth(s3Client, container, prefix);
}
@@ -555,116 +292,4 @@
allObjects.put("folders", folders);
return allObjects;
}
-
- public static Region validateAndGetRegion(String regionId) throws CompilationException {
- List<Region> regions = S3Client.serviceMetadata().regions();
- Optional<Region> selectedRegion = regions.stream().filter(region -> region.id().equals(regionId)).findFirst();
-
- if (selectedRegion.isEmpty()) {
- throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
- }
- return selectedRegion.get();
- }
-
- // TODO(htowaileb): Currently, trust-account is always assuming we have instance profile setup in place
- private static AwsCredentialsProvider validateAndGetTrustAccountAuthentication(Map<String, String> configuration)
- throws CompilationException {
- String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
- SESSION_TOKEN_FIELD_NAME);
- if (notAllowed != null) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
- INSTANCE_PROFILE_FIELD_NAME);
- }
-
- String regionId = configuration.get(REGION_FIELD_NAME);
- String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
- String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
- Region region = validateAndGetRegion(regionId);
-
- AssumeRoleRequest.Builder builder = AssumeRoleRequest.builder();
- builder.roleArn(arnRole);
- builder.roleSessionName(UUID.randomUUID().toString());
- builder.durationSeconds(900); // minimum role assume duration = 900 seconds (15 minutes), make configurable?
- if (externalId != null) {
- builder.externalId(externalId);
- }
- AssumeRoleRequest request = builder.build();
- AwsCredentialsProvider credentialsProvider = validateAndGetInstanceProfileAuthentication(configuration);
-
- // TODO(htowaileb): We shouldn't assume role with each request, rather stored the received temporary credentials
- // and refresh when expired.
- // assume the role from the provided arn
- try (StsClient stsClient =
- StsClient.builder().region(region).credentialsProvider(credentialsProvider).build()) {
- AssumeRoleResponse response = stsClient.assumeRole(request);
- Credentials credentials = response.credentials();
- return StaticCredentialsProvider.create(AwsSessionCredentials.create(credentials.accessKeyId(),
- credentials.secretAccessKey(), credentials.sessionToken()));
- } catch (SdkException ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
- }
- }
-
- private static AwsCredentialsProvider validateAndGetInstanceProfileAuthentication(Map<String, String> configuration)
- throws CompilationException {
- String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
-
- // only "true" value is allowed
- if (!"true".equalsIgnoreCase(instanceProfile)) {
- throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, INSTANCE_PROFILE_FIELD_NAME, "true");
- }
-
- String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
- SESSION_TOKEN_FIELD_NAME);
- if (notAllowed != null) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
- INSTANCE_PROFILE_FIELD_NAME);
- }
- return InstanceProfileCredentialsProvider.create();
- }
-
- private static AwsCredentialsProvider validateAndGetAccessKeysAuthentications(Map<String, String> configuration)
- throws CompilationException {
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
- String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
-
- // accessKeyId authentication
- if (accessKeyId == null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
- SECRET_ACCESS_KEY_FIELD_NAME);
- }
- if (secretAccessKey == null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
- ACCESS_KEY_ID_FIELD_NAME);
- }
-
- String notAllowed = getNonNull(configuration, EXTERNAL_ID_FIELD_NAME);
- if (notAllowed != null) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
- INSTANCE_PROFILE_FIELD_NAME);
- }
-
- // use session token if provided
- if (sessionToken != null) {
- return StaticCredentialsProvider
- .create(AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
- } else {
- return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey));
- }
- }
-
- private static boolean noAuth(Map<String, String> configuration) {
- return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME,
- ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, SESSION_TOKEN_FIELD_NAME) == null;
- }
-
- private static String getNonNull(Map<String, String> configuration, String... fieldNames) {
- for (String fieldName : fieldNames) {
- if (configuration.get(fieldName) != null) {
- return fieldName;
- }
- }
- return null;
- }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
index dc20a89..a4113d1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/HDFSExternalFileWriterFactory.java
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.UUID;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -123,7 +124,7 @@
}
@Override
- public void validate() throws AlgebricksException {
+ public void validate(IApplicationContext appCtx) throws AlgebricksException {
Configuration conf = HDFSUtils.configureHDFSwrite(configuration);
credentials = HDFSUtils.configureHadoopAuthentication(configuration, conf);
try {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
index bdefaa8..b1d3a95 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/LocalFSExternalFileWriterFactory.java
@@ -20,6 +20,7 @@
import java.io.File;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -80,7 +81,7 @@
}
@Override
- public void validate() throws AlgebricksException {
+ public void validate(IApplicationContext appCtx) throws AlgebricksException {
// A special case validation for a single node cluster
if (singleNodeCluster && staticPath != null) {
if (isNonEmptyDirectory(new File(staticPath))) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index c152853..ef056c9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -994,6 +994,7 @@
Map<String, String> configuration, ARecordType itemType, IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException {
try {
+ configuration.put(ExternalDataConstants.KEY_DATASET, dataset.getDatasetName());
configuration.put(ExternalDataConstants.KEY_DATASET_DATABASE, dataset.getDatabaseName());
configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE,
dataset.getDataverseName().getCanonicalForm());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 1caea58..e6716df 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -148,7 +148,7 @@
String staticPath = staticPathExpr != null ? ConstantExpressionUtil.getStringConstant(staticPathExpr) : null;
IExternalFileWriterFactory fileWriterFactory =
ExternalWriterProvider.createWriterFactory(appCtx, sink, staticPath, pathSourceLocation);
- fileWriterFactory.validate();
+ fileWriterFactory.validate(appCtx);
String fileExtension = ExternalWriterProvider.getFileExtension(sink);
int maxResult = ExternalWriterProvider.getMaxResult(sink);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
index 4a75db6..2aeca4f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/IExternalWriterFactoryValidator.java
@@ -18,11 +18,12 @@
*/
package org.apache.asterix.runtime.writer;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
public interface IExternalWriterFactoryValidator {
/**
* Perform the necessary validation to ensure the writer has the proper permissions
*/
- void validate() throws AlgebricksException;
+ void validate(IApplicationContext appCtx) throws AlgebricksException;
}