[ASTERIXDB-3514][EXT]: Support trust auth for parquet + delete assumed creds on collection drop
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Support using trusted credentials for reading S3 parquet
files since it uses different code path to build the client.
- Delete the temporarily generated credentials when the
collection is dropped.
Ext-ref: MB-63505
Change-Id: I77998a5dfcc304692e12280b7b4018f3593085b9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19246
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Tested-by: Hussain Towaileb <hussainht@gmail.com>
Integration-Tests: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index e892d04..9cecaa7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -183,7 +183,7 @@
this.globalTxManager = globalTxManager;
this.ioManager = ioManager;
dataPartitioningProvider = DataPartitioningProvider.create(this);
- externalCredentialsCache = new ExternalCredentialsCache();
+ externalCredentialsCache = new ExternalCredentialsCache(this);
externalCredentialsCacheUpdater = new ExternalCredentialsCacheUpdater(this);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
index 0ddca4e..66fbdec 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
@@ -23,6 +23,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.external.IExternalCredentialsCache;
import org.apache.asterix.common.metadata.MetadataConstants;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -38,8 +39,12 @@
private static final Logger LOGGER = LogManager.getLogger();
private final ConcurrentMap<String, Pair<Span, Object>> cache = new ConcurrentHashMap<>();
+ private final int awsAssumeRoleDuration;
+ private final double refreshAwsAssumeRoleThreshold;
- public ExternalCredentialsCache() {
+ public ExternalCredentialsCache(IApplicationContext appCtx) {
+ this.awsAssumeRoleDuration = appCtx.getExternalProperties().getAwsAssumeRoleDuration();
+ this.refreshAwsAssumeRoleThreshold = appCtx.getExternalProperties().getAwsRefreshAssumeRoleThreshold();
}
@Override
@@ -60,6 +65,11 @@
}
@Override
+ public void deleteCredentials(String name) {
+ cache.remove(name);
+ }
+
+ @Override
public String getName(Map<String, String> configuration) {
String database = configuration.get(ExternalDataConstants.KEY_DATASET_DATABASE);
if (database == null) {
@@ -78,20 +88,19 @@
}
private void doUpdateAwsCache(Map<String, String> configuration, AwsSessionCredentials credentials) {
- // TODO(htowaileb): Set default expiration value
String name = getName(configuration);
- cache.put(name, Pair.of(Span.start(15, TimeUnit.MINUTES), credentials));
+ cache.put(name, Pair.of(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS), credentials));
LOGGER.info("Received and cached new credentials for {}", name);
}
/**
- * Refresh if the remaining time is half or less than the total expiration time
+ * Refresh if the remaining time is less than the configured refresh percentage
*
* @param span expiration span
- * @return true if the remaining time is half or less than the total expiration time, false otherwise
+ * @return true if the remaining time is less than the configured refresh percentage, false otherwise
*/
private boolean needsRefresh(Span span) {
- // TODO(htowaileb): At what % (and should be configurable?) do we decide it's better to refresh credentials
- return (double) span.remaining(TimeUnit.MINUTES) / span.getSpan(TimeUnit.MINUTES) < 0.5;
+ return (double) span.remaining(TimeUnit.SECONDS)
+ / span.getSpan(TimeUnit.SECONDS) < refreshAwsAssumeRoleThreshold;
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 8c2a0ab..343baf0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -216,7 +216,7 @@
cacheManager = new CacheManager();
this.namespacePathResolver = namespacePathResolver;
this.namespaceResolver = namespaceResolver;
- this.externalCredentialsCache = new ExternalCredentialsCache();
+ this.externalCredentialsCache = new ExternalCredentialsCache(this);
this.externalCredentialsCacheUpdater = new ExternalCredentialsCacheUpdater(this);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index ed93838..4ee674e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2448,6 +2448,8 @@
sourceLoc, EnumSet.of(DropOption.IF_EXISTS), requestParameters.isForceDropDataset());
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
+ appCtx.getExternalCredentialsCache()
+ .deleteCredentials(String.join(".", databaseName, dataverseName.getCanonicalForm(), datasetName));
return true;
} catch (Exception e) {
LOGGER.error("failed to drop dataset; executing compensating operations", e);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 7c963a7..29a4a6e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -8,6 +8,8 @@
"active\.memory\.global\.budget" : 67108864,
"active\.stop\.timeout" : 3600,
"active\.suspend\.timeout" : 3600,
+ "aws.assume.role.duration" : 900,
+ "aws.refresh.assume.role.threshold" : 0.5,
"azure.request.timeout" : 120,
"cloud.acquire.token.timeout" : 100,
"cloud.deployment" : false,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 50927b4..dbbf83f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -8,6 +8,8 @@
"active\.memory\.global\.budget" : 67108864,
"active\.stop\.timeout" : 3600,
"active\.suspend\.timeout" : 3600,
+ "aws.assume.role.duration" : 900,
+ "aws.refresh.assume.role.threshold" : 0.5,
"azure.request.timeout" : 120,
"cloud.acquire.token.timeout" : 100,
"cloud.deployment" : false,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 1b035a0..4778cb8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -8,6 +8,8 @@
"active\.memory\.global\.budget" : 67108864,
"active\.stop\.timeout" : 3600,
"active\.suspend\.timeout" : 3600,
+ "aws.assume.role.duration" : 900,
+ "aws.refresh.assume.role.threshold" : 0.5,
"azure.request.timeout" : 120,
"cloud.acquire.token.timeout" : 100,
"cloud.deployment" : false,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
index 8aa4532..f6559f0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.common.config;
+import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
import static org.apache.hyracks.control.common.config.OptionTypes.LEVEL;
import static org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
@@ -52,7 +53,16 @@
"The maximum accepted web request size in bytes"),
REQUESTS_ARCHIVE_SIZE(NONNEGATIVE_INTEGER, 1000, "The maximum number of archived requests to maintain"),
LIBRARY_DEPLOY_TIMEOUT(POSITIVE_INTEGER, 1800, "Timeout to upload a UDF in seconds"),
- AZURE_REQUEST_TIMEOUT(POSITIVE_INTEGER, 120, "Timeout for Azure client requests in seconds");
+ AZURE_REQUEST_TIMEOUT(POSITIVE_INTEGER, 120, "Timeout for Azure client requests in seconds"),
+ AWS_ASSUME_ROLE_DURATION(
+ POSITIVE_INTEGER,
+ 900,
+ "AWS assuming role duration in seconds. "
+ + "Range from 900 seconds (15 mins) to 43200 seconds (12 hours)"),
+ AWS_REFRESH_ASSUME_ROLE_THRESHOLD(
+ DOUBLE,
+ .5,
+ "Percentage of left duration before assume role credentials " + "needs to be refreshed");
private final IOptionType type;
private final Object defaultValue;
@@ -80,6 +90,8 @@
case MAX_WEB_REQUEST_SIZE:
case LIBRARY_DEPLOY_TIMEOUT:
case AZURE_REQUEST_TIMEOUT:
+ case AWS_ASSUME_ROLE_DURATION:
+ case AWS_REFRESH_ASSUME_ROLE_THRESHOLD:
return Section.COMMON;
case CC_JAVA_OPTS:
case NC_JAVA_OPTS:
@@ -160,4 +172,12 @@
public int getAzureRequestTimeout() {
return accessor.getInt(Option.AZURE_REQUEST_TIMEOUT);
}
+
+ public int getAwsAssumeRoleDuration() {
+ return accessor.getInt(Option.AWS_ASSUME_ROLE_DURATION);
+ }
+
+ public double getAwsRefreshAssumeRoleThreshold() {
+ return accessor.getDouble(Option.AWS_REFRESH_ASSUME_ROLE_THRESHOLD);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
index 245b350..c603893 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
@@ -39,6 +39,13 @@
void updateCache(Map<String, String> configuration, Map<String, String> credentials);
/**
+ * Deletes the cache for the provided entity name
+ *
+ * @param name name of the entity for which the credentials are to be deleted
+ */
+ void deleteCredentials(String name);
+
+ /**
* Returns the name of the entity which the cached credentials belong to
*
* @param configuration configuration containing external collection details
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 138b364..f950feb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -19,6 +19,7 @@
package org.apache.asterix.external.input.record.reader.aws;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+import static org.apache.hyracks.util.LogRedactionUtil.userData;
import java.io.IOException;
import java.util.List;
@@ -37,7 +38,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
-import org.apache.hyracks.util.LogRedactionUtil;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
@@ -48,12 +48,11 @@
public class AwsS3InputStream extends AbstractExternalInputStream {
- // Configuration
+ private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service
private final IApplicationContext ncAppCtx;
private final String bucket;
- private final S3Client s3Client;
+ private S3Client s3Client;
private ResponseInputStream<?> s3InStream;
- private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service
public AwsS3InputStream(IApplicationContext ncAppCtx, Map<String, String> configuration, List<String> filePaths,
IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException {
@@ -85,7 +84,7 @@
*
* @return true
*/
- private boolean doGetInputStream(GetObjectRequest request) throws RuntimeDataException {
+ private boolean doGetInputStream(GetObjectRequest request) throws HyracksDataException {
int retries = 0;
while (retries < MAX_RETRIES) {
try {
@@ -93,14 +92,18 @@
in = s3InStream;
break;
} catch (NoSuchKeyException ex) {
- LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(request.key()) + " was not found in bucket "
- + request.bucket());
+ LOGGER.debug(() -> "Key " + userData(request.key()) + " was not found in bucket {}" + request.bucket());
return false;
} catch (S3Exception ex) {
- if (!shouldRetry(ex.awsErrorDetails().errorCode(), retries++)) {
+ if (S3AuthUtils.isArnAssumedRoleExpiredToken(configuration, ex.awsErrorDetails().errorCode())) {
+ LOGGER.debug(() -> "Expired AWS assume role session, will attempt to refresh the session");
+ rebuildAwsS3Client(configuration);
+ LOGGER.debug(() -> "Successfully refreshed AWS assume role session");
+ } else if (shouldRetry(ex.awsErrorDetails().errorCode(), retries++)) {
+ LOGGER.debug(() -> "S3 retryable error: " + userData(ex.getMessage()));
+ } else {
throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
}
- LOGGER.debug(() -> "S3 retryable error: " + LogRedactionUtil.userData(ex.getMessage()));
// Backoff for 1 sec for the first 2 retries, and 2 seconds from there onward
try {
@@ -149,4 +152,8 @@
throw HyracksDataException.create(ex);
}
}
+
+ private void rebuildAwsS3Client(Map<String, String> configuration) throws HyracksDataException {
+ s3Client = buildAwsS3Client(configuration);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index ba0d0f4..7d678a9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -24,6 +24,8 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.hadoop.mapred.JobConf;
@@ -34,8 +36,9 @@
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
@Override
- protected void configureJobConf(JobConf conf, Map<String, String> configuration) {
- configureAwsS3HdfsJobConf(conf, configuration);
+ protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration)
+ throws CompilationException {
+ configureAwsS3HdfsJobConf(appCtx, conf, configuration);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index 790db8c..e3313f7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -30,6 +30,7 @@
import java.util.PriorityQueue;
import java.util.Set;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -84,8 +85,8 @@
return locationConstraints;
}
- protected abstract void configureJobConf(JobConf conf, Map<String, String> configuration)
- throws AlgebricksException;
+ protected abstract void configureJobConf(IApplicationContext appCtx, JobConf conf,
+ Map<String, String> configuration) throws AlgebricksException;
protected abstract String getTablePath(Map<String, String> configuration) throws AlgebricksException;
@@ -95,7 +96,7 @@
throws AlgebricksException, HyracksDataException {
JobConf conf = new JobConf();
ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
- configureJobConf(conf, configuration);
+ configureJobConf(appCtx, conf, configuration);
confFactory = new ConfFactory(conf);
String tableMetadataPath = getTablePath(configuration);
Engine engine = DefaultEngine.create(conf);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 7ddbab91..2d92e10 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -87,7 +87,9 @@
try {
JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
int numberOfPartitions = getPartitionConstraint().getLocations().length;
- configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions);
+
+ IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
+ configureAwsS3HdfsJobConf(appCtx, conf, configuration, numberOfPartitions);
configureHdfsConf(conf, configuration);
} catch (SdkException | SdkBaseException ex) {
throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
index ee88569..db7673c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.google.gcs.GCSUtils;
@@ -34,7 +35,8 @@
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_GCS);
@Override
- protected void configureJobConf(JobConf conf, Map<String, String> configuration) throws AlgebricksException {
+ protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration)
+ throws AlgebricksException {
GCSUtils.configureHdfsJobConf(conf, configuration);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 6767f93..82b5dad 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -519,12 +519,13 @@
}
}
- public static void validateDeltaTableExists(Map<String, String> configuration) throws AlgebricksException {
+ public static void validateDeltaTableExists(IApplicationContext appCtx, Map<String, String> configuration)
+ throws AlgebricksException {
String tableMetadataPath = null;
JobConf conf = new JobConf();
if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
.equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
- configureAwsS3HdfsJobConf(conf, configuration);
+ configureAwsS3HdfsJobConf(appCtx, conf, configuration);
tableMetadataPath = S3Utils.getPath(configuration);
} else if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
.equals(ExternalDataConstants.KEY_ADAPTER_NAME_GCS)) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index 45988e8..2ed16a1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -28,20 +28,28 @@
import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
import static org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_EXPIRED_TOKEN;
import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR;
import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_METHOD_NOT_IMPLEMENTED;
import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_SLOW_DOWN;
import static org.apache.asterix.external.util.aws.s3.S3Constants.EXTERNAL_ID_FIELD_NAME;
import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ACCESS_KEY_ID;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS_ACCESS;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUMED_ROLE;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_ARN;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_EXTERNAL_ID;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_DURATION;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY;
import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INSTANCE_PROFILE;
import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_REGION;
import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_CONNECTION_POOL_SIZE;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_PROTOCOL;
import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SECRET_ACCESS_KEY;
import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SERVICE_END_POINT;
import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SESSION_TOKEN;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMP_ACCESS;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SIMPLE;
import static org.apache.asterix.external.util.aws.s3.S3Constants.INSTANCE_PROFILE_FIELD_NAME;
import static org.apache.asterix.external.util.aws.s3.S3Constants.REGION_FIELD_NAME;
import static org.apache.asterix.external.util.aws.s3.S3Constants.ROLE_ARN_FIELD_NAME;
@@ -64,7 +72,6 @@
import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
-import org.apache.asterix.external.util.HDFSUtils;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -94,6 +101,14 @@
import software.amazon.awssdk.services.sts.model.Credentials;
public class S3AuthUtils {
+ enum AuthenticationType {
+ ANONYMOUS,
+ ARN,
+ INSTANCE_PROFILE,
+ ACCESS_KEYS,
+ BAD_AUTHENTICATION
+ }
+
private S3AuthUtils() {
throw new AssertionError("do not instantiate");
}
@@ -102,6 +117,10 @@
return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
}
+ public static boolean isArnAssumedRoleExpiredToken(Map<String, String> configuration, String errorCode) {
+ return ERROR_EXPIRED_TOKEN.equals(errorCode) && getAuthenticationType(configuration) == AuthenticationType.ARN;
+ }
+
/**
* Builds the S3 client using the provided configuration
*
@@ -119,7 +138,6 @@
S3ClientBuilder builder = S3Client.builder();
builder.region(region);
- builder.crossRegionAccessEnabled(true);
builder.credentialsProvider(credentialsProvider);
// Validate the service endpoint if present
@@ -142,28 +160,26 @@
public static AwsCredentialsProvider buildCredentialsProvider(IApplicationContext appCtx,
Map<String, String> configuration) throws CompilationException {
- String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
- String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
- String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-
- if (noAuth(configuration)) {
- return AnonymousCredentialsProvider.create();
- } else if (arnRole != null) {
- return getTrustAccountCredentials(appCtx, configuration);
- } else if (instanceProfile != null) {
- return getInstanceProfileCredentials(configuration);
- } else if (accessKeyId != null || secretAccessKey != null) {
- return getAccessKeyCredentials(configuration);
- } else {
- if (externalId != null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
- EXTERNAL_ID_FIELD_NAME);
- } else {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
- SESSION_TOKEN_FIELD_NAME);
- }
+ AuthenticationType authenticationType = getAuthenticationType(configuration);
+ switch (authenticationType) {
+ case ANONYMOUS:
+ return AnonymousCredentialsProvider.create();
+ case ARN:
+ return getTrustAccountCredentials(appCtx, configuration);
+ case INSTANCE_PROFILE:
+ return getInstanceProfileCredentials(configuration);
+ case ACCESS_KEYS:
+ return getAccessKeyCredentials(configuration);
+ default:
+ // missing required creds, report correct error message
+ String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
+ if (externalId != null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
+ EXTERNAL_ID_FIELD_NAME);
+ } else {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
+ SESSION_TOKEN_FIELD_NAME);
+ }
}
}
@@ -177,6 +193,25 @@
return selectedRegion.get();
}
+ private static AuthenticationType getAuthenticationType(Map<String, String> configuration) {
+ String roleArn = configuration.get(ROLE_ARN_FIELD_NAME);
+ String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+ String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+
+ if (noAuth(configuration)) {
+ return AuthenticationType.ANONYMOUS;
+ } else if (roleArn != null) {
+ return AuthenticationType.ARN;
+ } else if (instanceProfile != null) {
+ return AuthenticationType.INSTANCE_PROFILE;
+ } else if (accessKeyId != null || secretAccessKey != null) {
+ return AuthenticationType.ACCESS_KEYS;
+ } else {
+ return AuthenticationType.BAD_AUTHENTICATION;
+ }
+ }
+
private static boolean noAuth(Map<String, String> configuration) {
return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME,
ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, SESSION_TOKEN_FIELD_NAME) == null;
@@ -327,64 +362,83 @@
return null;
}
+ public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, JobConf conf,
+ Map<String, String> configuration) throws CompilationException {
+ configureAwsS3HdfsJobConf(appCtx, conf, configuration, 0);
+ }
+
/**
* Builds the S3 client using the provided configuration
*
+ * @param appCtx application context
* @param configuration properties
* @param numberOfPartitions number of partitions in the cluster
*/
- public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration) {
- configureAwsS3HdfsJobConf(conf, configuration, 0);
- }
-
- public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
- int numberOfPartitions) {
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
- String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
+ public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, JobConf jobConf,
+ Map<String, String> configuration, int numberOfPartitions) throws CompilationException {
+ setHadoopCredentials(jobConf, configuration);
String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
-
- //Disable caching S3 FileSystem
- HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
-
- /*
- * Authentication Methods:
- * 1- Anonymous: no accessKeyId and no secretAccessKey
- * 2- Temporary: has to provide accessKeyId, secretAccessKey and sessionToken
- * 3- Private: has to provide accessKeyId and secretAccessKey
- */
- if (accessKeyId == null) {
- //Tells hadoop-aws it is an anonymous access
- conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS);
+ Region region = validateAndGetRegion(configuration.get(REGION_FIELD_NAME));
+ jobConf.set(HADOOP_REGION, region.toString());
+ if (serviceEndpoint != null) {
+ // Validation of the URL should be done at hadoop-aws level
+ jobConf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
} else {
- conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId);
- conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey);
- if (sessionToken != null) {
- conf.set(HADOOP_SESSION_TOKEN, sessionToken);
- //Tells hadoop-aws it is a temporary access
- conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS);
- }
+ //Region is ignored and buckets could be found by the central endpoint
+ jobConf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
}
/*
* This is to allow S3 definition to have path-style form. Should always be true to match the current
* way we access files in S3
*/
- conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
+ jobConf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
/*
* Set the size of S3 connection pool to be the number of partitions
*/
if (numberOfPartitions != 0) {
- conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
+ jobConf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
}
+ }
- if (serviceEndpoint != null) {
- // Validation of the URL should be done at hadoop-aws level
- conf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
- } else {
- //Region is ignored and buckets could be found by the central endpoint
- conf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
+ /**
+ * Sets the credentials provider type and the credentials to hadoop based on the provided configuration
+ *
+ * @param jobConf hadoop job config
+ * @param configuration external details configuration
+ */
+ private static void setHadoopCredentials(JobConf jobConf, Map<String, String> configuration) {
+ AuthenticationType authenticationType = getAuthenticationType(configuration);
+ switch (authenticationType) {
+ case ANONYMOUS:
+ jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS);
+ break;
+ case ARN:
+ jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ASSUMED_ROLE);
+ jobConf.set(HADOOP_ASSUME_ROLE_ARN, configuration.get(ROLE_ARN_FIELD_NAME));
+ jobConf.set(HADOOP_ASSUME_ROLE_EXTERNAL_ID, configuration.get(EXTERNAL_ID_FIELD_NAME));
+ jobConf.set(HADOOP_ASSUME_ROLE_SESSION_NAME, "parquet-" + UUID.randomUUID());
+ jobConf.set(HADOOP_ASSUME_ROLE_SESSION_DURATION, "15m");
+
+ // TODO: this assumes basic keys always, also support if we use InstanceProfile to assume a role
+ jobConf.set(HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY, HADOOP_SIMPLE);
+ jobConf.set(HADOOP_ACCESS_KEY_ID, configuration.get(ACCESS_KEY_ID_FIELD_NAME));
+ jobConf.set(HADOOP_SECRET_ACCESS_KEY, configuration.get(SECRET_ACCESS_KEY_FIELD_NAME));
+ break;
+ case INSTANCE_PROFILE:
+ jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_INSTANCE_PROFILE);
+ break;
+ case ACCESS_KEYS:
+ jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_SIMPLE);
+ jobConf.set(HADOOP_ACCESS_KEY_ID, configuration.get(ACCESS_KEY_ID_FIELD_NAME));
+ jobConf.set(HADOOP_SECRET_ACCESS_KEY, configuration.get(SECRET_ACCESS_KEY_FIELD_NAME));
+ if (configuration.get(SESSION_TOKEN_FIELD_NAME) != null) {
+ jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_SESSION_TOKEN);
+ jobConf.set(HADOOP_SESSION_TOKEN, configuration.get(SESSION_TOKEN_FIELD_NAME));
+ }
+ break;
+ case BAD_AUTHENTICATION:
}
}
@@ -477,7 +531,7 @@
}
if (isDeltaTable(configuration)) {
try {
- validateDeltaTableExists(configuration);
+ validateDeltaTableExists(appCtx, configuration);
} catch (AlgebricksException e) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, e);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
index 126c868..95c5040 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
@@ -23,6 +23,7 @@
throw new AssertionError("do not instantiate");
}
+ // Authentication specific parameters
public static final String REGION_FIELD_NAME = "region";
public static final String INSTANCE_PROFILE_FIELD_NAME = "instanceProfile";
public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId";
@@ -36,11 +37,15 @@
public static final String ERROR_INTERNAL_ERROR = "InternalError";
public static final String ERROR_SLOW_DOWN = "SlowDown";
public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented";
+ public static final String ERROR_EXPIRED_TOKEN = "ExpiredToken";
/*
* Hadoop-AWS
- * AWS connectors for s3 and s3n are deprecated.
*/
+ public static final String HADOOP_ASSUME_ROLE_ARN = "fs.s3a.assumed.role.arn";
+ public static final String HADOOP_ASSUME_ROLE_EXTERNAL_ID = "fs.s3a.assumed.role.external.id";
+ public static final String HADOOP_ASSUME_ROLE_SESSION_NAME = "fs.s3a.assumed.role.session.name";
+ public static final String HADOOP_ASSUME_ROLE_SESSION_DURATION = "fs.s3a.assumed.role.session.duration";
public static final String HADOOP_ACCESS_KEY_ID = "fs.s3a.access.key";
public static final String HADOOP_SECRET_ACCESS_KEY = "fs.s3a.secret.key";
public static final String HADOOP_SESSION_TOKEN = "fs.s3a.session.token";
@@ -58,10 +63,14 @@
//S3 used protocol
public static final String HADOOP_S3_PROTOCOL = "s3a";
- //Hadoop credentials provider key
+ // hadoop credentials provider key
public static final String HADOOP_CREDENTIAL_PROVIDER_KEY = "fs.s3a.aws.credentials.provider";
- //Anonymous credential provider
- public static final String HADOOP_ANONYMOUS_ACCESS = "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider";
- //Temporary credential provider
- public static final String HADOOP_TEMP_ACCESS = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider";
+ public static final String HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY = "fs.s3a.assumed.role.credentials.provider";
+
+ // credential providers
+ public static final String HADOOP_ANONYMOUS = "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider";
+ public static final String HADOOP_ASSUMED_ROLE = "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider";
+ public static final String HADOOP_INSTANCE_PROFILE = "org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider";
+ public static final String HADOOP_SIMPLE = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider";
+ public static final String HADOOP_TEMPORARY = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider";
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index bfd35fc..481b7ff 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -222,17 +222,17 @@
}
}
+ public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration)
+ throws AlgebricksException {
+ configureHdfsJobConf(conf, configuration, 0);
+ }
+
/**
* Builds the client using the provided configuration
*
* @param configuration properties
* @param numberOfPartitions number of partitions in the cluster
*/
- public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration)
- throws AlgebricksException {
- configureHdfsJobConf(conf, configuration, 0);
- }
-
public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration, int numberOfPartitions)
throws AlgebricksException {
String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java
index 1bc8eb8..9ad93df 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java
@@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -48,7 +49,7 @@
List<Row> scanFiles = createMockRows(rowCount);
DeltaReaderFactory d = new DeltaReaderFactory() {
@Override
- protected void configureJobConf(JobConf conf, Map<String, String> configuration)
+ protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration)
throws AlgebricksException {
}
@@ -75,7 +76,7 @@
List<Row> scanFiles = createMockRows(rowCount);
DeltaReaderFactory d = new DeltaReaderFactory() {
@Override
- protected void configureJobConf(JobConf conf, Map<String, String> configuration)
+ protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration)
throws AlgebricksException {
}
@@ -102,7 +103,7 @@
List<Row> scanFiles = createMockRows(rowCount);
DeltaReaderFactory d = new DeltaReaderFactory() {
@Override
- protected void configureJobConf(JobConf conf, Map<String, String> configuration)
+ protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration)
throws AlgebricksException {
}