[ASTERIXDB-3565][EXT]: Add impersonate service account auth for GCS
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Add support to impersonating a service account as
an authentication method for GCS links. This works
by providing a target service account to impersonate
and provide source credentials (ours) to use to
impersonate the account. There is no need to store
any temporary credentials/tokens as the SDK automatically
picks up the token generated, and if expired, automatically
refresh it for subsequent requests.
Ext-ref: MB-65121
Change-Id: Ie1d69faa45a03550c8e0fe66eb18c2ae53a8454a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19444
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
index 4c382d0..10f536b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java
@@ -78,14 +78,15 @@
String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(type)) {
- credentials = generateAwsCredentials(configuration);
+ return generateAwsCredentials(configuration);
+ } else {
+ // this should never happen
+ throw new IllegalArgumentException("Unsupported external source type: " + type);
}
-
- return credentials;
}
// TODO: this can probably be refactored out into something that is AWS-specific
- private Object generateAwsCredentials(Map<String, String> configuration)
+ private AwsSessionCredentials generateAwsCredentials(Map<String, String> configuration)
throws HyracksDataException, CompilationException {
String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID);
AwsSessionCredentials credentials;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 3ffb080..c6d7ba9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -61,6 +61,7 @@
"compiler\.textsearchmemory" : 163840,
"compiler\.windowmemory" : 196608,
"default\.dir" : "target/io/dir/asterixdb",
+ "gcp.impersonate.service.account.duration" : 900,
"library\.deploy\.timeout" : 1800,
"log\.dir" : "logs/",
"log\.level" : "DEBUG",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index aeb3cea..562e195 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -61,6 +61,7 @@
"compiler\.textsearchmemory" : 163840,
"compiler\.windowmemory" : 196608,
"default\.dir" : "target/io/dir/asterixdb",
+ "gcp.impersonate.service.account.duration" : 900,
"library\.deploy\.timeout" : 1800,
"log\.dir" : "logs/",
"log\.level" : "WARN",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index b475612..132fa5b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -61,6 +61,7 @@
"compiler\.textsearchmemory" : 163840,
"compiler\.windowmemory" : 196608,
"default\.dir" : "target/io/dir/asterixdb",
+ "gcp.impersonate.service.account.duration" : 900,
"library\.deploy\.timeout" : 1800,
"log\.dir" : "logs/",
"log\.level" : "WARN",
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 3a34365..02e2881 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -27,7 +27,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.google.gcs.GCSUtils;
+import org.apache.asterix.external.util.google.gcs.GCSAuthUtils;
import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
import org.apache.asterix.runtime.writer.IExternalFileWriter;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
@@ -66,7 +66,7 @@
@Override
ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException {
GCSClientConfig config = GCSClientConfig.of(configuration, writeBufferSize);
- return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
+ return new GCSCloudClient(config, GCSAuthUtils.buildClient(appCtx, configuration),
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
index 62437b2..d233606 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
@@ -64,7 +64,12 @@
75,
"Percentage of duration passed before assume role credentials need to be refreshed, the value ranges "
+ "from 25 to 90, default is 75. For example, if the value is set to 65, this means the "
- + "credentials need to be refreshed if 65% of the total expiration duration is already passed");
+ + "credentials need to be refreshed if 65% of the total expiration duration is already passed"),
+ GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION(
+ getRangedIntegerType(60, 3600),
+ 900,
+ "GCS impersonating service account duration in seconds. "
+ + "Range from 60 seconds (1 min) to 3600 seconds (1 hour)");
private final IOptionType type;
private final Object defaultValue;
@@ -94,6 +99,7 @@
case AZURE_REQUEST_TIMEOUT:
case AWS_ASSUME_ROLE_DURATION:
case AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE:
+ case GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION:
return Section.COMMON;
case CC_JAVA_OPTS:
case NC_JAVA_OPTS:
@@ -182,4 +188,8 @@
public int getAwsRefreshAssumeRoleThresholdPercentage() {
return accessor.getInt(Option.AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE);
}
+
+ public int getGcpImpersonateServiceAccountDuration() {
+ return accessor.getInt(Option.GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 46d63bb..463695e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -326,6 +326,8 @@
CSV_INVALID_FORCE_QUOTE(1218),
CSV_INVALID_ESCAPE(1219),
CANNOT_TRUNCATE_DATASET_TYPE(1220),
+ NO_VALID_AUTHENTICATION_PARAMS_PROVIDED(1221),
+ NO_VALID_AUTHENTICATION_PARAMS_PROVIDED_TO_IMPERSONATE_SERVICE_ACCOUNT(1222),
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index d6a171f..df08016 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -328,6 +328,8 @@
1218 = '%1$s' is not a valid force-quote input. The length of a force-quote input should be 1 character
1219 = '%1$s' is not a valid escape. The length of a escape should be 1
1220 = Cannot truncate %1$s '%2$s'
+1221 = No valid authentication parameters were provided
+1222 = No valid authentication parameters were provided to impersonate service account
# Feed Errors
3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
index 89da065..1ef3fcd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
@@ -27,13 +27,14 @@
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.google.gcs.GCSUtils;
+import org.apache.asterix.external.util.google.gcs.GCSAuthUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
@@ -46,13 +47,15 @@
public class GCSInputStream extends AbstractExternalInputStream {
+ private final IApplicationContext ncAppCtx;
private final Storage client;
private final String container;
private static final int MAX_ATTEMPTS = 5; // We try a total of 5 times in case of retryable errors
- public GCSInputStream(Map<String, String> configuration, List<String> filePaths,
+ public GCSInputStream(IApplicationContext ncAppCtx, Map<String, String> configuration, List<String> filePaths,
IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException {
super(configuration, filePaths, valueEmbedder);
+ this.ncAppCtx = ncAppCtx;
this.client = buildClient(configuration);
this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}
@@ -136,7 +139,7 @@
private Storage buildClient(Map<String, String> configuration) throws HyracksDataException {
try {
- return GCSUtils.buildClient(configuration);
+ return GCSAuthUtils.buildClient(ncAppCtx, configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
index b6ad3cd..2de55a0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.PriorityQueue;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
@@ -47,7 +48,9 @@
public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException {
IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder();
int partition = context.getPartition();
- return new GCSInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(),
+ IApplicationContext ncAppCtx = (IApplicationContext) context.getTaskContext().getJobletContext()
+ .getServiceContext().getApplicationContext();
+ return new GCSInputStream(ncAppCtx, configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(),
valueEmbedder);
}
@@ -65,7 +68,8 @@
configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
// get the items
- List<Blob> filesOnly = GCSUtils.listItems(configuration, includeExcludeMatcher, warningCollector,
+ IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext();
+ List<Blob> filesOnly = GCSUtils.listItems(appCtx, configuration, includeExcludeMatcher, warningCollector,
externalDataPrefix, evaluator);
// Distribute work load amongst the partitions
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
index db7673c..301bfc4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
@@ -25,6 +25,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.google.gcs.GCSAuthUtils;
import org.apache.asterix.external.util.google.gcs.GCSUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -37,7 +38,8 @@
@Override
protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration)
throws AlgebricksException {
- GCSUtils.configureHdfsJobConf(conf, configuration);
+ int numberOfPartitions = getPartitionConstraint().getLocations().length;
+ GCSAuthUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
index 17cad3e..874c3bd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
@@ -30,6 +31,7 @@
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.google.gcs.GCSAuthUtils;
import org.apache.asterix.external.util.google.gcs.GCSConstants;
import org.apache.asterix.external.util.google.gcs.GCSUtils;
import org.apache.hadoop.mapred.JobConf;
@@ -59,7 +61,8 @@
configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- List<Blob> filesOnly = GCSUtils.listItems(configuration, includeExcludeMatcher, warningCollector,
+ IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
+ List<Blob> filesOnly = GCSUtils.listItems(appCtx, configuration, includeExcludeMatcher, warningCollector,
externalDataPrefix, evaluator);
// get path
@@ -71,7 +74,7 @@
// configure hadoop input splits
JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
int numberOfPartitions = getPartitionConstraint().getLocations().length;
- GCSUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions);
+ GCSAuthUtils.configureHdfsJobConf(conf, configuration, numberOfPartitions);
configureHdfsConf(conf, configuration);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 0fed43f..591829b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -35,7 +35,7 @@
import static org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties;
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
-import static org.apache.asterix.external.util.google.gcs.GCSUtils.configureHdfsJobConf;
+import static org.apache.asterix.external.util.google.gcs.GCSAuthUtils.configureHdfsJobConf;
import static org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties;
import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
@@ -727,7 +727,7 @@
validateAzureDataLakeProperties(configuration, srcLoc, collector, appCtx);
break;
case ExternalDataConstants.KEY_ADAPTER_NAME_GCS:
- validateProperties(configuration, srcLoc, collector);
+ validateProperties(appCtx, configuration, srcLoc, collector);
break;
case ExternalDataConstants.KEY_ADAPTER_NAME_HDFS:
HDFSUtils.validateProperties(configuration, srcLoc, collector);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index f6ca480..035415d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -105,7 +105,7 @@
public class S3AuthUtils {
enum AuthenticationType {
ANONYMOUS,
- ARN,
+ ARN_ASSUME_ROLE,
INSTANCE_PROFILE,
ACCESS_KEYS,
BAD_AUTHENTICATION
@@ -120,7 +120,8 @@
}
public static boolean isArnAssumedRoleExpiredToken(Map<String, String> configuration, String errorCode) {
- return ERROR_EXPIRED_TOKEN.equals(errorCode) && getAuthenticationType(configuration) == AuthenticationType.ARN;
+ return ERROR_EXPIRED_TOKEN.equals(errorCode)
+ && getAuthenticationType(configuration) == AuthenticationType.ARN_ASSUME_ROLE;
}
/**
@@ -168,7 +169,7 @@
switch (authenticationType) {
case ANONYMOUS:
return AnonymousCredentialsProvider.create();
- case ARN:
+ case ARN_ASSUME_ROLE:
return getTrustAccountCredentials(appCtx, configuration);
case INSTANCE_PROFILE:
return getInstanceProfileCredentials(configuration);
@@ -206,7 +207,7 @@
if (noAuth(configuration)) {
return AuthenticationType.ANONYMOUS;
} else if (roleArn != null) {
- return AuthenticationType.ARN;
+ return AuthenticationType.ARN_ASSUME_ROLE;
} else if (instanceProfile != null) {
return AuthenticationType.INSTANCE_PROFILE;
} else if (accessKeyId != null || secretAccessKey != null) {
@@ -232,11 +233,11 @@
}
/**
- * Returns the cached credentials if valid, otherwise, generates new credentials by assume a role
+ * Returns the cached credentials if valid, otherwise, generates new credentials
*
* @param appCtx application context
* @param configuration configuration
- * @return returns the cached credentials if valid, otherwise, generates new credentials by assume a role
+ * @return returns the cached credentials if valid, otherwise, generates new credentials
* @throws CompilationException CompilationException
*/
public static AwsCredentialsProvider getTrustAccountCredentials(IApplicationContext appCtx,
@@ -278,20 +279,8 @@
if (externalId != null) {
builder.externalId(externalId);
}
-
- // credentials to be used to assume the role
- AwsCredentialsProvider credentialsProvider;
AssumeRoleRequest request = builder.build();
- String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
- if ("true".equalsIgnoreCase(instanceProfile)) {
- credentialsProvider = getInstanceProfileCredentials(configuration, true);
- } else if (accessKeyId != null && secretAccessKey != null) {
- credentialsProvider = getAccessKeyCredentials(configuration, true);
- } else {
- throw new CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION);
- }
+ AwsCredentialsProvider credentialsProvider = getCredentialsToAssumeRole(configuration);
// assume the role from the provided arn
try (StsClient stsClient =
@@ -305,13 +294,22 @@
}
}
- private static AwsCredentialsProvider getInstanceProfileCredentials(Map<String, String> configuration)
+ private static AwsCredentialsProvider getCredentialsToAssumeRole(Map<String, String> configuration)
throws CompilationException {
- return getInstanceProfileCredentials(configuration, false);
+ String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+ String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+ if (instanceProfile != null) {
+ return getInstanceProfileCredentials(configuration);
+ } else if (accessKeyId != null || secretAccessKey != null) {
+ return getAccessKeyCredentials(configuration);
+ } else {
+ throw new CompilationException(ErrorCode.NO_AWS_VALID_PARAMS_FOUND_FOR_CROSS_ACCOUNT_TRUST_AUTHENTICATION);
+ }
}
- private static AwsCredentialsProvider getInstanceProfileCredentials(Map<String, String> configuration,
- boolean assumeRoleAuthentication) throws CompilationException {
+ private static AwsCredentialsProvider getInstanceProfileCredentials(Map<String, String> configuration)
+ throws CompilationException {
String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
// only "true" value is allowed
@@ -319,24 +317,17 @@
throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, INSTANCE_PROFILE_FIELD_NAME, "true");
}
- if (!assumeRoleAuthentication) {
- String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
- SESSION_TOKEN_FIELD_NAME);
- if (notAllowed != null) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
- INSTANCE_PROFILE_FIELD_NAME);
- }
+ String notAllowed = getNonNull(configuration, ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME,
+ SESSION_TOKEN_FIELD_NAME);
+ if (notAllowed != null) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
+ INSTANCE_PROFILE_FIELD_NAME);
}
return InstanceProfileCredentialsProvider.create();
}
private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, String> configuration)
throws CompilationException {
- return getAccessKeyCredentials(configuration, false);
- }
-
- private static AwsCredentialsProvider getAccessKeyCredentials(Map<String, String> configuration,
- boolean assumeRoleAuthentication) throws CompilationException {
String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
@@ -350,14 +341,6 @@
ACCESS_KEY_ID_FIELD_NAME);
}
- if (!assumeRoleAuthentication) {
- String notAllowed = getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, EXTERNAL_ID_FIELD_NAME);
- if (notAllowed != null) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
- INSTANCE_PROFILE_FIELD_NAME);
- }
- }
-
// use session token if provided
if (sessionToken != null) {
return StaticCredentialsProvider
@@ -423,13 +406,13 @@
* @param configuration external details configuration
*/
private static void setHadoopCredentials(IApplicationContext appCtx, JobConf jobConf,
- Map<String, String> configuration) {
+ Map<String, String> configuration) throws CompilationException {
AuthenticationType authenticationType = getAuthenticationType(configuration);
switch (authenticationType) {
case ANONYMOUS:
jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS);
break;
- case ARN:
+ case ARN_ASSUME_ROLE:
jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ASSUMED_ROLE);
jobConf.set(HADOOP_ASSUME_ROLE_ARN, configuration.get(ROLE_ARN_FIELD_NAME));
jobConf.set(HADOOP_ASSUME_ROLE_EXTERNAL_ID, configuration.get(EXTERNAL_ID_FIELD_NAME));
@@ -458,6 +441,7 @@
}
break;
case BAD_AUTHENTICATION:
+ throw new CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSAuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSAuthUtils.java
new file mode 100644
index 0000000..4473ad7
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSAuthUtils.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.google.gcs;
+
+import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
+import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_TYPE;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_UNAUTHENTICATED;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_ENDPOINT;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_GCS_PROTOCOL;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.JSON_CREDENTIALS_FIELD_NAME;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.HadoopAuthServiceAccount.IMPERSONATE_SERVICE_ACCOUNT;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.json.JsonReadFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ImpersonatedCredentials;
+import com.google.cloud.NoCredentials;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+
+public class GCSAuthUtils {
+ enum AuthenticationType {
+ ANONYMOUS,
+ IMPERSONATE_SERVICE_ACCOUNT,
+ APPLICATION_DEFAULT_CREDENTIALS,
+ SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS,
+ BAD_AUTHENTICATION
+ }
+
+ private static final ObjectMapper JSON_CREDENTIALS_OBJECT_MAPPER = new ObjectMapper();
+ private static final List<String> READ_WRITE_SCOPE_PERMISSION =
+ Collections.singletonList("https://www.googleapis.com/auth/devstorage.read_write");
+ static {
+ JSON_CREDENTIALS_OBJECT_MAPPER.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
+ }
+
+ private GCSAuthUtils() {
+ throw new AssertionError("do not instantiate");
+ }
+
+ /**
+ * Builds the client using the provided configuration
+ *
+ * @param appCtx application context
+ * @param configuration properties
+ * @return Storage client
+ * @throws CompilationException CompilationException
+ */
+ public static Storage buildClient(IApplicationContext appCtx, Map<String, String> configuration)
+ throws CompilationException {
+ String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
+
+ Credentials credentials = buildCredentials(appCtx, configuration);
+ StorageOptions.Builder builder = StorageOptions.newBuilder();
+ builder.setCredentials(credentials);
+
+ if (endpoint != null) {
+ builder.setHost(endpoint);
+ }
+
+ return builder.build().getService();
+ }
+
+ public static Credentials buildCredentials(IApplicationContext appCtx, Map<String, String> configuration) throws CompilationException {
+ AuthenticationType authenticationType = getAuthenticationType(configuration);
+ return switch (authenticationType) {
+ case ANONYMOUS -> NoCredentials.getInstance();
+ case IMPERSONATE_SERVICE_ACCOUNT -> getImpersonatedServiceAccountCredentials(appCtx, configuration);
+ case APPLICATION_DEFAULT_CREDENTIALS -> getApplicationDefaultCredentials(configuration);
+ case SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS -> getServiceAccountKeyCredentials(configuration);
+ case BAD_AUTHENTICATION -> throw new CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED);
+ };
+ }
+
+ private static AuthenticationType getAuthenticationType(Map<String, String> configuration) {
+ String impersonateServiceAccount = configuration.get(IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME);
+ String applicationDefaultCredentials = configuration.get(APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
+ String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
+
+ if (noAuth(configuration)) {
+ return AuthenticationType.ANONYMOUS;
+ } else if (impersonateServiceAccount != null) {
+ return AuthenticationType.IMPERSONATE_SERVICE_ACCOUNT;
+ } else if (applicationDefaultCredentials != null) {
+ return AuthenticationType.APPLICATION_DEFAULT_CREDENTIALS;
+ } else if (jsonCredentials != null) {
+ return AuthenticationType.SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS;
+ } else {
+ return AuthenticationType.BAD_AUTHENTICATION;
+ }
+ }
+
+ private static boolean noAuth(Map<String, String> configuration) {
+ return getNonNull(configuration, APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME, JSON_CREDENTIALS_FIELD_NAME,
+ IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME) == null;
+ }
+
+ /**
+ * Returns the cached credentials if valid, otherwise, generates new credentials
+ *
+ * @param appCtx application context
+ * @param configuration configuration
+ * @return returns the cached credentials if valid, otherwise, generates new credentials
+ * @throws CompilationException CompilationException
+ */
+ public static GoogleCredentials getImpersonatedServiceAccountCredentials(IApplicationContext appCtx,
+ Map<String, String> configuration) throws CompilationException {
+ GoogleCredentials sourceCredentials = getCredentialsToImpersonateServiceAccount(configuration);
+ String impersonateServiceAccount = configuration.get(IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME);
+ int duration = appCtx.getExternalProperties().getGcpImpersonateServiceAccountDuration();
+
+ // Create impersonated credentials
+ return ImpersonatedCredentials.create(sourceCredentials, impersonateServiceAccount, null,
+ READ_WRITE_SCOPE_PERMISSION, duration);
+ }
+
+ private static GoogleCredentials getCredentialsToImpersonateServiceAccount(Map<String, String> configuration)
+ throws CompilationException {
+ String applicationDefaultCredentials = configuration.get(APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
+ String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
+
+ if (applicationDefaultCredentials != null) {
+ return getApplicationDefaultCredentials(configuration);
+ } else if (jsonCredentials != null) {
+ return getServiceAccountKeyCredentials(configuration);
+ } else {
+ throw new CompilationException(
+ ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED_TO_IMPERSONATE_SERVICE_ACCOUNT);
+ }
+ }
+
+ private static GoogleCredentials getApplicationDefaultCredentials(Map<String, String> configuration)
+ throws CompilationException {
+ try {
+ String notAllowed = getNonNull(configuration, JSON_CREDENTIALS_FIELD_NAME);
+ if (notAllowed != null) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
+ APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
+ }
+ return GoogleCredentials.getApplicationDefault();
+ } catch (Exception ex) {
+ throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+ }
+ }
+
+ private static GoogleCredentials getServiceAccountKeyCredentials(Map<String, String> configuration)
+ throws CompilationException {
+ String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
+ try (InputStream credentialsStream = new ByteArrayInputStream(jsonCredentials.getBytes())) {
+ return GoogleCredentials.fromStream(credentialsStream);
+ } catch (IOException ex) {
+ throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
+ } catch (Exception ex) {
+ throw new CompilationException(EXTERNAL_SOURCE_ERROR,
+ "Encountered an issue while processing the JSON credentials. Please ensure the provided credentials are valid.");
+ }
+ }
+
+ public static void configureHdfsJobConf(JobConf jobConf, Map<String, String> configuration)
+ throws AlgebricksException {
+ configureHdfsJobConf(jobConf, configuration, 0);
+ }
+
+ /**
+ * Builds the client using the provided configuration
+ *
+ * @param configuration properties
+ * @param numberOfPartitions number of partitions
+ */
+ public static void configureHdfsJobConf(JobConf jobConf, Map<String, String> configuration, int numberOfPartitions)
+ throws AlgebricksException {
+ setHadoopCredentials(jobConf, configuration);
+
+ // set endpoint if provided, default is https://storage.googleapis.com/
+ String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
+ if (endpoint != null) {
+ jobConf.set(HADOOP_ENDPOINT, endpoint);
+ }
+
+ // disable caching FileSystem
+ HDFSUtils.disableHadoopFileSystemCache(jobConf, HADOOP_GCS_PROTOCOL);
+
+ // TODO(htowaileb): make configurable, in case we hit rate limits then we can reduce it, default is 15
+ if (numberOfPartitions != 0) {
+ jobConf.set(GCSConstants.MAX_BATCH_THREADS, String.valueOf(numberOfPartitions));
+ }
+
+ // recommended to be disabled by GCP hadoop team
+ jobConf.set(GCSConstants.HADOOP_SUPPORT_COMPRESSED, ExternalDataConstants.FALSE);
+ }
+
+ /**
+ * Sets the credentials provider type and the credentials to hadoop based on the provided configuration
+ *
+ * @param jobConf hadoop job config
+ * @param configuration external details configuration
+ * @throws CompilationException CompilationException
+ */
+ private static void setHadoopCredentials(JobConf jobConf, Map<String, String> configuration)
+ throws CompilationException {
+ AuthenticationType authenticationType = getAuthenticationType(configuration);
+ switch (authenticationType) {
+ case ANONYMOUS:
+ jobConf.set(HADOOP_AUTH_TYPE, HADOOP_AUTH_UNAUTHENTICATED);
+ break;
+ case IMPERSONATE_SERVICE_ACCOUNT:
+ String impersonateServiceAccount = configuration.get(IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME);
+ jobConf.set(IMPERSONATE_SERVICE_ACCOUNT, impersonateServiceAccount);
+ setJsonCredentials(jobConf, configuration);
+ break;
+ case SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS:
+ setJsonCredentials(jobConf, configuration);
+ break;
+ case BAD_AUTHENTICATION:
+ throw new CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED);
+ }
+ }
+
+ /**
+ * Sets the Json credentials to hadoop job configuration
+ * Note:
+ * Setting these values instead of HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH is supported
+ * in com.google.cloud.bigdataoss:util-hadoop only up to version hadoop3-2.2.x and is removed in
+ * version 3.x.y, which also removed support for hadoop-2
+ *
+ * @param jobConf hadoop job config
+ * @param configuration external details configuration
+ * @throws CompilationException CompilationException
+ */
+ private static void setJsonCredentials(JobConf jobConf, Map<String, String> configuration)
+ throws CompilationException {
+ try {
+ String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
+ JsonNode jsonCreds = JSON_CREDENTIALS_OBJECT_MAPPER.readTree(jsonCredentials);
+ jobConf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY_ID,
+ jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY_ID).asText());
+ jobConf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY,
+ jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY).asText());
+ jobConf.set(GCSConstants.HadoopAuthServiceAccount.CLIENT_EMAIL,
+ jsonCreds.get(GCSConstants.JsonCredentials.CLIENT_EMAIL).asText());
+ } catch (JsonProcessingException e) {
+ throw CompilationException.create(EXTERNAL_SOURCE_ERROR, e, "Unable to parse Json Credentials",
+ getMessageOrToString(e));
+ }
+ }
+
+ private static String getNonNull(Map<String, String> configuration, String... fieldNames) {
+ for (String fieldName : fieldNames) {
+ if (configuration.get(fieldName) != null) {
+ return fieldName;
+ }
+ }
+ return null;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
index 739dbde..19fd74f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
@@ -40,23 +40,21 @@
}
public static final String APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME = "applicationDefaultCredentials";
+ public static final String IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME = "impersonateServiceAccount";
public static final String JSON_CREDENTIALS_FIELD_NAME = "jsonCredentials";
public static final String ENDPOINT_FIELD_NAME = "endpoint";
public static final String STORAGE_PREFIX = "prefix";
- /*
- * Hadoop internal configuration
- */
+ // hadoop internal configuration
public static final String HADOOP_GCS_PROTOCOL = "gs";
+ public static final String MAX_BATCH_THREADS = "fs.gs.batch.threads";
+ public static final String HADOOP_ENDPOINT = "fs.gs.storage.root.url";
+ public static final String HADOOP_SUPPORT_COMPRESSED = "fs.gs.inputstream.support.gzip.encoding.enable";
// hadoop credentials
public static final String HADOOP_AUTH_TYPE = "fs.gs.auth.type";
public static final String HADOOP_AUTH_UNAUTHENTICATED = "UNAUTHENTICATED";
- // gs hadoop parameters
- public static final String HADOOP_SUPPORT_COMPRESSED = "fs.gs.inputstream.support.gzip.encoding.enable";
- public static final String HADOOP_ENDPOINT = "fs.gs.storage.root.url";
-
public static class JsonCredentials {
public static final String PRIVATE_KEY_ID = "private_key_id";
public static final String PRIVATE_KEY = "private_key";
@@ -64,6 +62,7 @@
}
public static class HadoopAuthServiceAccount {
+ public static final String IMPERSONATE_SERVICE_ACCOUNT = "fs.gs.auth.impersonation.service.account";
public static final String PRIVATE_KEY_ID = "fs.gs.auth.service.account.private.key.id";
public static final String PRIVATE_KEY = "fs.gs.auth.service.account.private.key";
public static final String CLIENT_EMAIL = "fs.gs.auth.service.account.email";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index d768c23..032f537 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -18,32 +18,20 @@
*/
package org.apache.asterix.external.util.google.gcs;
-import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
-import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
-import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_TYPE;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_UNAUTHENTICATED;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_ENDPOINT;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_GCS_PROTOCOL;
-import static org.apache.asterix.external.util.google.gcs.GCSConstants.JSON_CREDENTIALS_FIELD_NAME;
+import static org.apache.asterix.external.util.google.gcs.GCSAuthUtils.buildClient;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.external.IExternalFilterEvaluator;
@@ -51,25 +39,15 @@
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.HDFSUtils;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.exceptions.Warning;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.json.JsonReadFeature;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.gax.paging.Page;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.BaseServiceException;
-import com.google.cloud.NoCredentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
-import com.google.cloud.storage.StorageOptions;
public class GCSUtils {
private GCSUtils() {
@@ -77,73 +55,17 @@
}
- private static final ObjectMapper JSON_CREDENTIALS_OBJECT_MAPPER = new ObjectMapper();
- static {
- JSON_CREDENTIALS_OBJECT_MAPPER.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
- }
-
- /**
- * Builds the client using the provided configuration
- *
- * @param configuration properties
- * @return clientasterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
- * @throws CompilationException CompilationException
- */
- public static Storage buildClient(Map<String, String> configuration) throws CompilationException {
- String applicationDefaultCredentials = configuration.get(APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
- String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
- String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
-
- StorageOptions.Builder builder = StorageOptions.newBuilder();
- builder.setStorageRetryStrategy(DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY);
-
- // default credentials provider
- if (applicationDefaultCredentials != null) {
- // only "true" value is allowed
- if (!applicationDefaultCredentials.equalsIgnoreCase("true")) {
- throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE,
- APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME, "true");
- }
-
- // no other authentication parameters are allowed
- if (jsonCredentials != null) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, JSON_CREDENTIALS_FIELD_NAME,
- APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
- }
-
- try {
- builder.setCredentials(GoogleCredentials.getApplicationDefault());
- } catch (Exception ex) {
- throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
- }
- } else if (jsonCredentials != null) {
- try (InputStream credentialsStream = new ByteArrayInputStream(jsonCredentials.getBytes())) {
- builder.setCredentials(GoogleCredentials.fromStream(credentialsStream));
- } catch (IOException ex) {
- throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
- } catch (Exception ex) {
- throw new CompilationException(EXTERNAL_SOURCE_ERROR,
- "Encountered an issue while processing the JSON credentials. Please ensure the provided credentials are valid.");
- }
- } else {
- builder.setCredentials(NoCredentials.getInstance());
- }
-
- if (endpoint != null) {
- builder.setHost(endpoint);
- }
-
- return builder.build().getService();
- }
-
/**
* Validate external dataset properties
*
+ * @param appCtx application context
* @param configuration properties
+ * @param srcLoc source location
+ * @param collector warning collector
* @throws CompilationException Compilation exception
*/
- public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
- IWarningCollector collector) throws CompilationException {
+ public static void validateProperties(IApplicationContext appCtx, Map<String, String> configuration,
+ SourceLocation srcLoc, IWarningCollector collector) throws CompilationException {
if (isDeltaTable(configuration)) {
validateDeltaTableProperties(configuration);
}
@@ -154,7 +76,6 @@
validateIncludeExclude(configuration);
try {
- // TODO(htowaileb): maybe something better, this will check to ensure type is supported before creation
new ExternalDataPrefix(configuration);
} catch (AlgebricksException ex) {
throw new CompilationException(ErrorCode.FAILED_TO_CALCULATE_COMPUTED_FIELDS, ex);
@@ -162,36 +83,33 @@
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- try {
+ try (Storage storage = buildClient(appCtx, configuration)) {
Storage.BlobListOption limitOption = Storage.BlobListOption.pageSize(1);
Storage.BlobListOption prefixOption = Storage.BlobListOption.prefix(getPrefix(configuration));
- Storage storage = buildClient(configuration);
Page<Blob> items = storage.list(container, limitOption, prefixOption);
if (!items.iterateAll().iterator().hasNext() && collector.shouldWarn()) {
Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
collector.warn(warning);
}
- } catch (CompilationException ex) {
- throw ex;
} catch (Exception ex) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
}
}
- public static List<Blob> listItems(Map<String, String> configuration, IncludeExcludeMatcher includeExcludeMatcher,
- IWarningCollector warningCollector, ExternalDataPrefix externalDataPrefix,
- IExternalFilterEvaluator evaluator) throws CompilationException, HyracksDataException {
+ public static List<Blob> listItems(IApplicationContext appCtx, Map<String, String> configuration,
+ IncludeExcludeMatcher includeExcludeMatcher, IWarningCollector warningCollector,
+ ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator)
+ throws CompilationException, HyracksDataException {
// Prepare to retrieve the objects
List<Blob> filesOnly = new ArrayList<>();
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- Storage gcs = buildClient(configuration);
Storage.BlobListOption options = Storage.BlobListOption.prefix(ExternalDataUtils.getPrefix(configuration));
Page<Blob> items;
- try {
+ try (Storage gcs = buildClient(appCtx, configuration)) {
items = gcs.list(container, options);
- } catch (BaseServiceException ex) {
+ } catch (Exception ex) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
}
@@ -224,61 +142,6 @@
}
}
- public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration)
- throws AlgebricksException {
- configureHdfsJobConf(conf, configuration, 0);
- }
-
- /**
- * Builds the client using the provided configuration
- *
- * @param configuration properties
- * @param numberOfPartitions number of partitions in the cluster
- */
- public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration, int numberOfPartitions)
- throws AlgebricksException {
- String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
- String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
-
- // disable caching FileSystem
- HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_GCS_PROTOCOL);
-
- // TODO(htowaileb): needs further testing, recommended to disable by gcs-hadoop team
- conf.set(GCSConstants.HADOOP_SUPPORT_COMPRESSED, ExternalDataConstants.FALSE);
-
- // TODO(htowaileb): needs further testing
- // set number of threads
- // conf.set(GCSConstants.HADOOP_MAX_REQUESTS_PER_BATCH, String.valueOf(numberOfPartitions));
- // conf.set(GCSConstants.HADOOP_BATCH_THREADS, String.valueOf(numberOfPartitions));
-
- // authentication method
- if (jsonCredentials == null) {
- // anonymous access
- conf.set(HADOOP_AUTH_TYPE, HADOOP_AUTH_UNAUTHENTICATED);
- } else {
- try {
- JsonNode jsonCreds = JSON_CREDENTIALS_OBJECT_MAPPER.readTree(jsonCredentials);
- // Setting these values instead of HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH is supported
- // in com.google.cloud.bigdataoss:util-hadoop only up to version hadoop3-2.2.x and is removed in
- // version 3.x.y, which also removed support for hadoop-2
- conf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY_ID,
- jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY_ID).asText());
- conf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY,
- jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY).asText());
- conf.set(GCSConstants.HadoopAuthServiceAccount.CLIENT_EMAIL,
- jsonCreds.get(GCSConstants.JsonCredentials.CLIENT_EMAIL).asText());
- } catch (JsonProcessingException e) {
- throw CompilationException.create(EXTERNAL_SOURCE_ERROR, "Unable to parse Json Credentials",
- getMessageOrToString(e));
- }
- }
-
- // set endpoint if provided, default is https://storage.googleapis.com/
- if (endpoint != null) {
- conf.set(HADOOP_ENDPOINT, endpoint);
- }
- }
-
public static String getPath(Map<String, String> configuration) {
return GCSConstants.HADOOP_GCS_PROTOCOL + "://"
+ configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'