Refactor ExternalDataUtils/Constants
Change-Id: Ie1f1499f13968d421bee43ec7352aea0c2749423
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15644
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java
index 156b78a..48cf511 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RedactionUtil.java
@@ -20,7 +20,7 @@
import static java.util.regex.Pattern.CASE_INSENSITIVE;
import static java.util.regex.Pattern.DOTALL;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.SECRET_ACCESS_KEY_FIELD_NAME;
import java.util.regex.Pattern;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index f14af53..bbcf9cd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.external.input.record.reader.aws;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.io.IOException;
@@ -32,7 +31,7 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
@@ -109,7 +108,7 @@
}
private boolean shouldRetry(String errorCode, int currentRetry) {
- return currentRetry < MAX_RETRIES && AwsS3.isRetryableError(errorCode);
+ return currentRetry < MAX_RETRIES && S3Utils.isRetryableError(errorCode);
}
@Override
@@ -134,7 +133,7 @@
private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException {
try {
- return ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
+ return S3Utils.buildAwsS3Client(configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 89ea39e..a241354 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -26,6 +26,7 @@
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -53,8 +54,7 @@
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
//Get a list of S3 objects
- List<S3Object> filesOnly =
- ExternalDataUtils.AwsS3.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
+ List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
// Distribute work load amongst the partitions
distributeWorkLoad(filesOnly, getPartitionsCount());
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 803e657..ff93a46 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -28,6 +28,8 @@
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
@@ -52,7 +54,7 @@
//Configure Hadoop S3 input splits
JobConf conf = createHdfsConf(serviceCtx, configuration);
int numberOfPartitions = getPartitionConstraint().getLocations().length;
- ExternalDataUtils.AwsS3.configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions);
+ S3Utils.configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions);
configureHdfsConf(conf, configuration);
}
@@ -89,8 +91,7 @@
throws CompilationException {
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- List<S3Object> filesOnly =
- ExternalDataUtils.AwsS3.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
+ List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
StringBuilder builder = new StringBuilder();
if (!filesOnly.isEmpty()) {
@@ -105,7 +106,7 @@
}
private static void appendFileURI(StringBuilder builder, String container, S3Object file) {
- builder.append(ExternalDataConstants.AwsS3.HADOOP_S3_PROTOCOL);
+ builder.append(S3Constants.HADOOP_S3_PROTOCOL);
builder.append("://");
builder.append(container);
builder.append('/');
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
index cdb3834..bbfece2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.external.input.record.reader.azure.blob;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureBlobClient;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.io.IOException;
@@ -31,7 +32,6 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.LogRedactionUtil;
@@ -86,7 +86,7 @@
private BlobServiceClient buildAzureClient(IApplicationContext appCtx, Map<String, String> configuration)
throws HyracksDataException {
try {
- return ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
+ return buildAzureBlobClient(appCtx, configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
index 064b319..55c0521 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.external.input.record.reader.azure.blob;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureBlobClient;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listBlobItems;
+
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -57,9 +60,9 @@
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
- List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration,
- includeExcludeMatcher, warningCollector);
+ BlobServiceClient blobServiceClient = buildAzureBlobClient(appCtx, configuration);
+ List<BlobItem> filesOnly =
+ listBlobItems(blobServiceClient, configuration, includeExcludeMatcher, warningCollector);
// Distribute work load amongst the partitions
distributeWorkLoad(filesOnly, getPartitionsCount());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
index e34d188..7a95222 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.external.input.record.reader.azure.datalake;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureDatalakeClient;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.io.IOException;
@@ -31,7 +32,6 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.LogRedactionUtil;
@@ -86,7 +86,7 @@
private DataLakeServiceClient buildAzureClient(IApplicationContext appCtx, Map<String, String> configuration)
throws HyracksDataException {
try {
- return ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);
+ return buildAzureDatalakeClient(appCtx, configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
index e9f8d4c..929cb6e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.external.input.record.reader.azure.datalake;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureDatalakeClient;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listDatalakePathItems;
+
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -57,9 +60,9 @@
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- DataLakeServiceClient client = ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);
- List<PathItem> filesOnly = ExternalDataUtils.Azure.listDatalakePathItems(client, configuration,
- includeExcludeMatcher, warningCollector);
+ DataLakeServiceClient client = buildAzureDatalakeClient(appCtx, configuration);
+ List<PathItem> filesOnly =
+ listDatalakePathItems(client, configuration, includeExcludeMatcher, warningCollector);
// Distribute work load amongst the partitions
distributeWorkLoad(filesOnly, getPartitionsCount());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index c2251df..e08013c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -18,6 +18,11 @@
*/
package org.apache.asterix.external.input.record.reader.azure.parquet;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_BLOB_PROTOCOL;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureBlobClient;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.configureAzureHdfsJobConf;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listBlobItems;
+
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -47,7 +52,7 @@
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
- BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
+ BlobServiceClient blobServiceClient = buildAzureBlobClient(appCtx, configuration);
//Get endpoint
String endPoint = extractEndPoint(blobServiceClient.getAccountUrl());
//Get path
@@ -57,7 +62,7 @@
//Configure Hadoop Azure input splits
JobConf conf = createHdfsConf(serviceCtx, configuration);
- ExternalDataUtils.Azure.configureAzureHdfsJobConf(conf, configuration, endPoint);
+ configureAzureHdfsJobConf(conf, configuration, endPoint);
configureHdfsConf(conf, configuration);
}
@@ -94,8 +99,8 @@
private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector,
BlobServiceClient blobServiceClient, String endPoint) throws CompilationException {
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration,
- includeExcludeMatcher, warningCollector);
+ List<BlobItem> filesOnly =
+ listBlobItems(blobServiceClient, configuration, includeExcludeMatcher, warningCollector);
StringBuilder builder = new StringBuilder();
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -118,7 +123,7 @@
}
private static void appendFileURI(StringBuilder builder, String container, String endPoint, BlobItem file) {
- builder.append(ExternalDataConstants.Azure.HADOOP_AZURE_BLOB_PROTOCOL);
+ builder.append(HADOOP_AZURE_BLOB_PROTOCOL);
builder.append("://");
builder.append(container);
builder.append('@');
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
index db87868..c98fc8b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
@@ -18,6 +18,11 @@
*/
package org.apache.asterix.external.input.record.reader.azure.parquet;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_DATALAKE_PROTOCOL;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureDatalakeClient;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.configureAzureHdfsJobConf;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listDatalakePathItems;
+
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -47,8 +52,7 @@
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
- DataLakeServiceClient dataLakeServiceClient =
- ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);
+ DataLakeServiceClient dataLakeServiceClient = buildAzureDatalakeClient(appCtx, configuration);
//Get endpoint
String endPoint = extractEndPoint(dataLakeServiceClient.getAccountUrl());
@@ -61,7 +65,7 @@
//Configure Hadoop Azure input splits
JobConf conf = createHdfsConf(serviceCtx, configuration);
- ExternalDataUtils.Azure.configureAzureHdfsJobConf(conf, configuration, endPoint);
+ configureAzureHdfsJobConf(conf, configuration, endPoint);
configureHdfsConf(conf, configuration);
}
@@ -98,8 +102,8 @@
private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector,
DataLakeServiceClient dataLakeServiceClient, String endPoint) throws CompilationException {
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- List<PathItem> filesOnly = ExternalDataUtils.Azure.listDatalakePathItems(dataLakeServiceClient, configuration,
- includeExcludeMatcher, warningCollector);
+ List<PathItem> filesOnly =
+ listDatalakePathItems(dataLakeServiceClient, configuration, includeExcludeMatcher, warningCollector);
StringBuilder builder = new StringBuilder();
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -122,7 +126,7 @@
}
private static void appendFileURI(StringBuilder builder, String container, String endPoint, PathItem file) {
- builder.append(ExternalDataConstants.Azure.HADOOP_AZURE_DATALAKE_PROTOCOL);
+ builder.append(HADOOP_AZURE_DATALAKE_PROTOCOL);
builder.append("://");
builder.append(container);
builder.append('@');
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
index 652fa3e..007e8be 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStream.java
@@ -32,7 +32,7 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.google.gcs.GCSUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
@@ -134,7 +134,7 @@
private Storage buildClient(Map<String, String> configuration) throws HyracksDataException {
try {
- return ExternalDataUtils.GCS.buildClient(configuration);
+ return GCSUtils.buildClient(configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
index 0e7ea90..1bc51f2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
@@ -19,6 +19,7 @@
package org.apache.asterix.external.input.record.reader.gcs;
import static org.apache.asterix.external.util.ExternalDataUtils.getIncludeExcludeMatchers;
+import static org.apache.asterix.external.util.google.gcs.GCSUtils.buildClient;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.util.ArrayList;
@@ -67,7 +68,7 @@
// Prepare to retrieve the objects
List<Blob> filesOnly = new ArrayList<>();
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- Storage gcs = ExternalDataUtils.GCS.buildClient(configuration);
+ Storage gcs = buildClient(configuration);
Storage.BlobListOption options = Storage.BlobListOption.prefix(ExternalDataUtils.getPrefix(configuration));
Page<Blob> items;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index f0b9c90..429706e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -333,107 +333,4 @@
*/
public static final Set<String> VALID_TIME_ZONES = Set.of(TimeZone.getAvailableIDs());
}
-
- public static class AwsS3 {
- private AwsS3() {
- throw new AssertionError("do not instantiate");
- }
-
- public static final String REGION_FIELD_NAME = "region";
- public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId";
- public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey";
- public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken";
- public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
-
- // AWS S3 specific error codes
- public static final String ERROR_INTERNAL_ERROR = "InternalError";
- public static final String ERROR_SLOW_DOWN = "SlowDown";
- public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented";
-
- public static boolean isRetryableError(String errorCode) {
- return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
- }
-
- /*
- * Hadoop-AWS
- * AWS connectors for s3 and s3n are deprecated.
- */
- public static final String HADOOP_ACCESS_KEY_ID = "fs.s3a.access.key";
- public static final String HADOOP_SECRET_ACCESS_KEY = "fs.s3a.secret.key";
- public static final String HADOOP_SESSION_TOKEN = "fs.s3a.session.token";
- public static final String HADOOP_REGION = "fs.s3a.region";
- public static final String HADOOP_SERVICE_END_POINT = "fs.s3a.endpoint";
-
- /*
- * Internal configurations
- */
- //Allows accessing directories as file system path
- public static final String HADOOP_PATH_STYLE_ACCESS = "fs.s3a.path.style.access";
- //The number of maximum HTTP connections in connection pool
- public static final String HADOOP_S3_CONNECTION_POOL_SIZE = "fs.s3a.connection.maximum";
- //S3 used protocol
- public static final String HADOOP_S3_PROTOCOL = "s3a";
-
- //Hadoop credentials provider key
- public static final String HADOOP_CREDENTIAL_PROVIDER_KEY = "fs.s3a.aws.credentials.provider";
- //Anonymous credential provider
- public static final String HADOOP_ANONYMOUS_ACCESS = "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider";
- //Temporary credential provider
- public static final String HADOOP_TEMP_ACCESS = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider";
-
- }
-
- /*
- * Note: Azure Blob and Azure Datalake use identical authentication, so they are using the same properties.
- * If they end up diverging, then properties for AzureBlob and AzureDataLake need to be created.
- */
- public static class Azure {
- private Azure() {
- throw new AssertionError("do not instantiate");
- }
-
- /*
- * Asterix Configuration Keys
- */
- public static final String MANAGED_IDENTITY_ID_FIELD_NAME = "managedIdentityId";
- public static final String ACCOUNT_NAME_FIELD_NAME = "accountName";
- public static final String ACCOUNT_KEY_FIELD_NAME = "accountKey";
- public static final String SHARED_ACCESS_SIGNATURE_FIELD_NAME = "sharedAccessSignature";
- public static final String TENANT_ID_FIELD_NAME = "tenantId";
- public static final String CLIENT_ID_FIELD_NAME = "clientId";
- public static final String CLIENT_SECRET_FIELD_NAME = "clientSecret";
- public static final String CLIENT_CERTIFICATE_FIELD_NAME = "clientCertificate";
- public static final String CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME = "clientCertificatePassword";
- public static final String ENDPOINT_FIELD_NAME = "endpoint";
-
- // Specific Azure data lake property
- /*
- The behavior of Data Lake (true file system) is to read the files of the specified prefix only, example:
- storage/myData/personal/file1.json
- storage/myData/personal/file2.json
- storage/myData/file3.json
- If the prefix used is "myData", then only the file file3.json is read. However, if the property "recursive"
- is set to "true" when creating the external dataset, then it goes recursively overall the paths, and the result
- is file1.json, file2.json and file3.json.
- */
- public static final String RECURSIVE_FIELD_NAME = "recursive";
-
- /*
- * Hadoop-Azure
- */
- //Used when accountName and accessKey are provided
- public static final String HADOOP_AZURE_FS_ACCOUNT_KEY = "fs.azure.account.key";
- //Used when a connectionString is provided
- public static final String HADOOP_AZURE_FS_SAS = "fs.azure.sas";
- public static final String HADOOP_AZURE_BLOB_PROTOCOL = "wasbs";
- public static final String HADOOP_AZURE_DATALAKE_PROTOCOL = "abfss";
- }
-
- public static class GCS {
- private GCS() {
- throw new AssertionError("do not instantiate");
- }
-
- public static final String JSON_CREDENTIALS_FIELD_NAME = "jsonCredentials";
- }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 8e38eed..702ef42 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,43 +18,9 @@
*/
package org.apache.asterix.external.util;
-import static com.google.cloud.storage.Storage.BlobListOption;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL;
import static org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_NOT_ALLOWED_AT_SAME_TIME;
import static org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_REQUIRED;
-import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
-import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
-import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT;
-import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ACCESS_KEY_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_ACCESS_KEY_ID;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_ANONYMOUS_ACCESS;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_CREDENTIAL_PROVIDER_KEY;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_PATH_STYLE_ACCESS;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_S3_CONNECTION_POOL_SIZE;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_S3_PROTOCOL;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_SECRET_ACCESS_KEY;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_SESSION_TOKEN;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_TEMP_ACCESS;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.ACCOUNT_KEY_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.ACCOUNT_NAME_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_CERTIFICATE_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_SECRET_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.ENDPOINT_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_BLOB_PROTOCOL;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_FS_ACCOUNT_KEY;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_FS_SAS;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.MANAGED_IDENTITY_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.RECURSIVE_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.SHARED_ACCESS_SIGNATURE_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.TENANT_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.GCS.JSON_CREDENTIALS_FIELD_NAME;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ADAPTER_NAME_GCS;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ESCAPE;
@@ -65,18 +31,14 @@
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
+import static org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties;
import static org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
@@ -85,7 +47,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.function.BiPredicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -106,20 +67,17 @@
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.library.JavaLibrary;
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.AUnionType;
import org.apache.asterix.runtime.evaluators.common.NumberUtils;
import org.apache.asterix.runtime.projection.DataProjectionInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
-import org.apache.hadoop.fs.s3a.Constants;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
-import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.data.parsers.BooleanParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
@@ -129,46 +87,6 @@
import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import org.apache.hyracks.util.StorageUtil;
-import com.azure.core.credential.AzureSasCredential;
-import com.azure.core.http.rest.PagedIterable;
-import com.azure.identity.ClientCertificateCredentialBuilder;
-import com.azure.identity.ClientSecretCredentialBuilder;
-import com.azure.identity.ManagedIdentityCredentialBuilder;
-import com.azure.storage.blob.BlobContainerClient;
-import com.azure.storage.blob.BlobServiceClient;
-import com.azure.storage.blob.BlobServiceClientBuilder;
-import com.azure.storage.blob.models.BlobItem;
-import com.azure.storage.blob.models.ListBlobsOptions;
-import com.azure.storage.common.StorageSharedKeyCredential;
-import com.azure.storage.common.policy.RequestRetryOptions;
-import com.azure.storage.file.datalake.DataLakeFileSystemClient;
-import com.azure.storage.file.datalake.DataLakeServiceClient;
-import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
-import com.azure.storage.file.datalake.models.ListPathsOptions;
-import com.azure.storage.file.datalake.models.PathItem;
-import com.google.api.gax.paging.Page;
-import com.google.auth.oauth2.ServiceAccountCredentials;
-import com.google.cloud.storage.Blob;
-import com.google.cloud.storage.Storage;
-import com.google.cloud.storage.StorageOptions;
-
-import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.core.exception.SdkException;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3ClientBuilder;
-import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
-import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
-import software.amazon.awssdk.services.s3.model.S3Exception;
-import software.amazon.awssdk.services.s3.model.S3Object;
-import software.amazon.awssdk.services.s3.model.S3Response;
-
public class ExternalDataUtils {
private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
@@ -604,16 +522,16 @@
switch (type) {
case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
- AwsS3.validateProperties(configuration, srcLoc, collector);
+ S3Utils.validateProperties(configuration, srcLoc, collector);
break;
case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
- Azure.validateAzureBlobProperties(configuration, srcLoc, collector, appCtx);
+ validateAzureBlobProperties(configuration, srcLoc, collector, appCtx);
break;
case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE:
- Azure.validateAzureDataLakeProperties(configuration, srcLoc, collector, appCtx);
+ validateAzureDataLakeProperties(configuration, srcLoc, collector, appCtx);
break;
case KEY_ADAPTER_NAME_GCS:
- GCS.validateProperties(configuration, srcLoc, collector);
+ validateProperties(configuration, srcLoc, collector);
break;
default:
// Nothing needs to be done
@@ -844,7 +762,7 @@
}
}
- private static boolean isParquetFormat(Map<String, String> properties) {
+ public static boolean isParquetFormat(Map<String, String> properties) {
String inputFormat = properties.get(ExternalDataConstants.KEY_INPUT_FORMAT);
return ExternalDataConstants.CLASS_NAME_PARQUET_INPUT_FORMAT.equals(inputFormat)
|| ExternalDataConstants.INPUT_FORMAT_PARQUET.equals(inputFormat)
@@ -893,1008 +811,6 @@
return encoder.encodeToString(byteArrayOutputStream.toByteArray());
}
- public static class AwsS3 {
- private AwsS3() {
- throw new AssertionError("do not instantiate");
- }
-
- /**
- * Builds the S3 client using the provided configuration
- *
- * @param configuration properties
- * @return S3 client
- * @throws CompilationException CompilationException
- */
- public static S3Client buildAwsS3Client(Map<String, String> configuration) throws CompilationException {
- // TODO(Hussain): Need to ensure that all required parameters are present in a previous step
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
- String sessionToken = configuration.get(ExternalDataConstants.AwsS3.SESSION_TOKEN_FIELD_NAME);
- String regionId = configuration.get(ExternalDataConstants.AwsS3.REGION_FIELD_NAME);
- String serviceEndpoint = configuration.get(ExternalDataConstants.AwsS3.SERVICE_END_POINT_FIELD_NAME);
-
- S3ClientBuilder builder = S3Client.builder();
-
- // Credentials
- AwsCredentialsProvider credentialsProvider;
-
- // No auth required
- if (accessKeyId == null) {
- credentialsProvider = AnonymousCredentialsProvider.create();
- } else {
- // auth required, check for temporary or permanent credentials
- if (sessionToken != null) {
- credentialsProvider = StaticCredentialsProvider
- .create(AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
- } else {
- credentialsProvider =
- StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey));
- }
- }
-
- builder.credentialsProvider(credentialsProvider);
-
- // Validate the region
- List<Region> regions = S3Client.serviceMetadata().regions();
- Optional<Region> selectedRegion =
- regions.stream().filter(region -> region.id().equals(regionId)).findFirst();
-
- if (selectedRegion.isEmpty()) {
- throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
- }
- builder.region(selectedRegion.get());
-
- // Validate the service endpoint if present
- if (serviceEndpoint != null) {
- try {
- URI uri = new URI(serviceEndpoint);
- try {
- builder.endpointOverride(uri);
- } catch (NullPointerException ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
- }
- } catch (URISyntaxException ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
- String.format("Invalid service endpoint %s", serviceEndpoint));
- }
- }
-
- return builder.build();
- }
-
- /**
- * Builds the S3 client using the provided configuration
- *
- * @param configuration properties
- * @param numberOfPartitions number of partitions in the cluster
- */
- public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
- int numberOfPartitions) {
- String accessKeyId = configuration.get(ExternalDataConstants.AwsS3.ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey = configuration.get(ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME);
- String sessionToken = configuration.get(ExternalDataConstants.AwsS3.SESSION_TOKEN_FIELD_NAME);
- String serviceEndpoint = configuration.get(ExternalDataConstants.AwsS3.SERVICE_END_POINT_FIELD_NAME);
-
- //Disable caching S3 FileSystem
- HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
-
- /*
- * Authentication Methods:
- * 1- Anonymous: no accessKeyId and no secretAccessKey
- * 2- Temporary: has to provide accessKeyId, secretAccessKey and sessionToken
- * 3- Private: has to provide accessKeyId and secretAccessKey
- */
- if (accessKeyId == null) {
- //Tells hadoop-aws it is an anonymous access
- conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS);
- } else {
- conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId);
- conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey);
- if (sessionToken != null) {
- conf.set(HADOOP_SESSION_TOKEN, sessionToken);
- //Tells hadoop-aws it is a temporary access
- conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS);
- }
- }
-
- /*
- * This is to allow S3 definition to have path-style form. Should always be true to match the current
- * way we access files in S3
- */
- conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
-
- /*
- * Set the size of S3 connection pool to be the number of partitions
- */
- conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
-
- if (serviceEndpoint != null) {
- // Validation of the URL should be done at hadoop-aws level
- conf.set(ExternalDataConstants.AwsS3.HADOOP_SERVICE_END_POINT, serviceEndpoint);
- } else {
- //Region is ignored and buckets could be found by the central endpoint
- conf.set(ExternalDataConstants.AwsS3.HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
- }
- }
-
- /**
- * Validate external dataset properties
- *
- * @param configuration properties
- * @throws CompilationException Compilation exception
- */
- public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
- IWarningCollector collector) throws CompilationException {
-
- // check if the format property is present
- if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
- throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
- }
-
- // Both parameters should be passed, or neither should be passed (for anonymous/no auth)
- String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
- if (accessKeyId == null || secretAccessKey == null) {
- // If one is passed, the other is required
- if (accessKeyId != null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
- ACCESS_KEY_ID_FIELD_NAME);
- } else if (secretAccessKey != null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
- SECRET_ACCESS_KEY_FIELD_NAME);
- }
- }
-
- validateIncludeExclude(configuration);
-
- // Check if the bucket is present
- S3Client s3Client = buildAwsS3Client(configuration);
- S3Response response;
- boolean useOldApi = false;
- String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- String prefix = getPrefix(configuration);
-
- try {
- response = isBucketEmpty(s3Client, container, prefix, false);
- } catch (S3Exception ex) {
- // Method not implemented, try falling back to old API
- try {
- // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
- if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
- useOldApi = true;
- response = isBucketEmpty(s3Client, container, prefix, true);
- } else {
- throw ex;
- }
- } catch (SdkException ex2) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
- }
- } catch (SdkException ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
- } finally {
- if (s3Client != null) {
- CleanupUtils.close(s3Client, null);
- }
- }
-
- boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty()
- : ((ListObjectsV2Response) response).contents().isEmpty();
- if (isEmpty && collector.shouldWarn()) {
- Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
- collector.warn(warning);
- }
-
- // Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to
- // ensure coverage, check if the result is successful as well and not only catch exceptions
- if (!response.sdkHttpResponse().isSuccessful()) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
- }
- }
-
- /**
- * Checks for a single object in the specified bucket to determine if the bucket is empty or not.
- *
- * @param s3Client s3 client
- * @param container the container name
- * @param prefix Prefix to be used
- * @param useOldApi flag whether to use the old API or not
- * @return returns the S3 response
- */
- private static S3Response isBucketEmpty(S3Client s3Client, String container, String prefix, boolean useOldApi) {
- S3Response response;
- if (useOldApi) {
- ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder();
- listObjectsBuilder.prefix(prefix);
- response = s3Client.listObjects(listObjectsBuilder.bucket(container).maxKeys(1).build());
- } else {
- ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder();
- listObjectsBuilder.prefix(prefix);
- response = s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build());
- }
- return response;
- }
-
- /**
- * Returns the lists of S3 objects.
- *
- * @param configuration properties
- * @param includeExcludeMatcher include/exclude matchers to apply
- */
- public static List<S3Object> listS3Objects(Map<String, String> configuration,
- IncludeExcludeMatcher includeExcludeMatcher, IWarningCollector warningCollector)
- throws CompilationException {
- // Prepare to retrieve the objects
- List<S3Object> filesOnly;
- String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- S3Client s3Client = buildAwsS3Client(configuration);
- String prefix = getPrefix(configuration);
-
- try {
- filesOnly = listS3Objects(s3Client, container, prefix, includeExcludeMatcher);
- } catch (S3Exception ex) {
- // New API is not implemented, try falling back to old API
- try {
- // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
- if (ex.awsErrorDetails().errorCode()
- .equals(ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED)) {
- filesOnly = oldApiListS3Objects(s3Client, container, prefix, includeExcludeMatcher);
- } else {
- throw ex;
- }
- } catch (SdkException ex2) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
- }
- } catch (SdkException ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
- } finally {
- if (s3Client != null) {
- CleanupUtils.close(s3Client, null);
- }
- }
-
- // Warn if no files are returned
- if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
- Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
- warningCollector.warn(warning);
- }
-
- return filesOnly;
- }
-
- /**
- * Uses the latest API to retrieve the objects from the storage.
- *
- * @param s3Client S3 client
- * @param container container name
- * @param prefix definition prefix
- * @param includeExcludeMatcher include/exclude matchers to apply
- */
- private static List<S3Object> listS3Objects(S3Client s3Client, String container, String prefix,
- IncludeExcludeMatcher includeExcludeMatcher) {
- String newMarker = null;
- List<S3Object> filesOnly = new ArrayList<>();
-
- ListObjectsV2Response listObjectsResponse;
- ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container);
- listObjectsBuilder.prefix(prefix);
-
- while (true) {
- // List the objects from the start, or from the last marker in case of truncated result
- if (newMarker == null) {
- listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build());
- } else {
- listObjectsResponse =
- s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
- }
-
- // Collect the paths to files only
- collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
- includeExcludeMatcher.getMatchersList(), filesOnly);
-
- // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
- if (!listObjectsResponse.isTruncated()) {
- break;
- } else {
- newMarker = listObjectsResponse.nextContinuationToken();
- }
- }
-
- return filesOnly;
- }
-
- /**
- * Uses the old API (in case the new API is not implemented) to retrieve the objects from the storage
- *
- * @param s3Client S3 client
- * @param container container name
- * @param prefix definition prefix
- * @param includeExcludeMatcher include/exclude matchers to apply
- */
- private static List<S3Object> oldApiListS3Objects(S3Client s3Client, String container, String prefix,
- IncludeExcludeMatcher includeExcludeMatcher) {
- String newMarker = null;
- List<S3Object> filesOnly = new ArrayList<>();
-
- ListObjectsResponse listObjectsResponse;
- ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container);
- listObjectsBuilder.prefix(prefix);
-
- while (true) {
- // List the objects from the start, or from the last marker in case of truncated result
- if (newMarker == null) {
- listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build());
- } else {
- listObjectsResponse = s3Client.listObjects(listObjectsBuilder.marker(newMarker).build());
- }
-
- // Collect the paths to files only
- collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
- includeExcludeMatcher.getMatchersList(), filesOnly);
-
- // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
- if (!listObjectsResponse.isTruncated()) {
- break;
- } else {
- newMarker = listObjectsResponse.nextMarker();
- }
- }
-
- return filesOnly;
- }
-
- /**
- * AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered
- * a file if it does not end up with a "/" which is the separator in a folder structure.
- *
- * @param s3Objects List of returned objects
- */
- private static void collectAndFilterFiles(List<S3Object> s3Objects,
- BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<S3Object> filesOnly) {
- for (S3Object object : s3Objects) {
- // skip folders
- if (object.key().endsWith("/")) {
- continue;
- }
-
- // No filter, add file
- if (predicate.test(matchers, object.key())) {
- filesOnly.add(object);
- }
- }
- }
- }
-
- /*
- * Note: Azure Blob and Azure Datalake use identical authentication, so they are using the same properties.
- * If they end up diverging, then properties for AzureBlob and AzureDataLake need to be created.
- */
- public static class Azure {
- private Azure() {
- throw new AssertionError("do not instantiate");
- }
-
- /**
- * Builds the Azure storage account using the provided configuration
- *
- * @param configuration properties
- * @return client
- */
- public static BlobServiceClient buildAzureBlobClient(IApplicationContext appCtx,
- Map<String, String> configuration) throws CompilationException {
- String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
- String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
- String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
- String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
- String tenantId = configuration.get(TENANT_ID_FIELD_NAME);
- String clientId = configuration.get(CLIENT_ID_FIELD_NAME);
- String clientSecret = configuration.get(CLIENT_SECRET_FIELD_NAME);
- String clientCertificate = configuration.get(CLIENT_CERTIFICATE_FIELD_NAME);
- String clientCertificatePassword = configuration.get(CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME);
- String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
-
- // Client builder
- BlobServiceClientBuilder builder = new BlobServiceClientBuilder();
- int timeout = appCtx.getExternalProperties().getAzureRequestTimeout();
- RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null);
- builder.retryOptions(requestRetryOptions);
-
- // Endpoint is required
- if (endpoint == null) {
- throw new CompilationException(PARAMETERS_REQUIRED, ENDPOINT_FIELD_NAME);
- }
- builder.endpoint(endpoint);
-
- // Shared Key
- if (accountName != null || accountKey != null) {
- if (accountName == null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_NAME_FIELD_NAME,
- ACCOUNT_KEY_FIELD_NAME);
- }
-
- if (accountKey == null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_KEY_FIELD_NAME,
- ACCOUNT_NAME_FIELD_NAME);
- }
-
- Optional<String> provided = getFirstNotNull(configuration, SHARED_ACCESS_SIGNATURE_FIELD_NAME,
- MANAGED_IDENTITY_ID_FIELD_NAME, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME,
- CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
- if (provided.isPresent()) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
- ACCOUNT_KEY_FIELD_NAME);
- }
- StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
- builder.credential(credential);
- }
-
- // Shared access signature
- if (sharedAccessSignature != null) {
- Optional<String> provided = getFirstNotNull(configuration, MANAGED_IDENTITY_ID_FIELD_NAME,
- CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME,
- CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
- if (provided.isPresent()) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
- SHARED_ACCESS_SIGNATURE_FIELD_NAME);
- }
- AzureSasCredential credential = new AzureSasCredential(sharedAccessSignature);
- builder.credential(credential);
- }
-
- // Managed Identity auth
- if (managedIdentityId != null) {
- Optional<String> provided = getFirstNotNull(configuration, CLIENT_ID_FIELD_NAME,
- CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME,
- TENANT_ID_FIELD_NAME);
- if (provided.isPresent()) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
- MANAGED_IDENTITY_ID_FIELD_NAME);
- }
- builder.credential(new ManagedIdentityCredentialBuilder().clientId(managedIdentityId).build());
- }
-
- // Client secret & certificate auth
- if (clientId != null) {
- // Both (or neither) client secret and client secret were provided, only one is allowed
- if ((clientSecret == null) == (clientCertificate == null)) {
- if (clientSecret != null) {
- throw new CompilationException(PARAMETERS_NOT_ALLOWED_AT_SAME_TIME, CLIENT_SECRET_FIELD_NAME,
- CLIENT_CERTIFICATE_FIELD_NAME);
- } else {
- throw new CompilationException(REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT,
- CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_ID_FIELD_NAME);
- }
- }
-
- // Tenant ID is required
- if (tenantId == null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, TENANT_ID_FIELD_NAME,
- CLIENT_ID_FIELD_NAME);
- }
-
- // Client certificate password is not allowed if client secret is used
- if (clientCertificatePassword != null && clientSecret != null) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
- CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, CLIENT_SECRET_FIELD_NAME);
- }
-
- // Use AD authentication
- if (clientSecret != null) {
- ClientSecretCredentialBuilder secret = new ClientSecretCredentialBuilder();
- secret.clientId(clientId);
- secret.tenantId(tenantId);
- secret.clientSecret(clientSecret);
- builder.credential(secret.build());
- } else {
- // Certificate
- ClientCertificateCredentialBuilder certificate = new ClientCertificateCredentialBuilder();
- certificate.clientId(clientId);
- certificate.tenantId(tenantId);
- try {
- InputStream certificateContent = new ByteArrayInputStream(clientCertificate.getBytes(UTF_8));
- if (clientCertificatePassword == null) {
- Method pemCertificate = ClientCertificateCredentialBuilder.class
- .getDeclaredMethod("pemCertificate", InputStream.class);
- pemCertificate.setAccessible(true);
- pemCertificate.invoke(certificate, certificateContent);
- } else {
- Method pemCertificate = ClientCertificateCredentialBuilder.class
- .getDeclaredMethod("pfxCertificate", InputStream.class, String.class);
- pemCertificate.setAccessible(true);
- pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword);
- }
- } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
- throw new CompilationException(EXTERNAL_SOURCE_ERROR, ex.getMessage());
- }
- builder.credential(certificate.build());
- }
- }
-
- // If client id is not present, ensure client secret, certificate, tenant id and client certificate
- // password are not present
- if (clientId == null) {
- Optional<String> provided = getFirstNotNull(configuration, CLIENT_SECRET_FIELD_NAME,
- CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
- if (provided.isPresent()) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
- SHARED_ACCESS_SIGNATURE_FIELD_NAME);
- }
- }
-
- try {
- return builder.buildClient();
- } catch (Exception ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
- }
- }
-
- /**
- * Builds the Azure data lake storage account using the provided configuration
- *
- * @param configuration properties
- * @return client
- */
- public static DataLakeServiceClient buildAzureDatalakeClient(IApplicationContext appCtx,
- Map<String, String> configuration) throws CompilationException {
- String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
- String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
- String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
- String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
- String tenantId = configuration.get(TENANT_ID_FIELD_NAME);
- String clientId = configuration.get(CLIENT_ID_FIELD_NAME);
- String clientSecret = configuration.get(CLIENT_SECRET_FIELD_NAME);
- String clientCertificate = configuration.get(CLIENT_CERTIFICATE_FIELD_NAME);
- String clientCertificatePassword = configuration.get(CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME);
- String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
-
- // Client builder
- DataLakeServiceClientBuilder builder = new DataLakeServiceClientBuilder();
- int timeout = appCtx.getExternalProperties().getAzureRequestTimeout();
- RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null);
- builder.retryOptions(requestRetryOptions);
-
- // Endpoint is required
- if (endpoint == null) {
- throw new CompilationException(PARAMETERS_REQUIRED, ENDPOINT_FIELD_NAME);
- }
- builder.endpoint(endpoint);
-
- // Shared Key
- if (accountName != null || accountKey != null) {
- if (accountName == null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_NAME_FIELD_NAME,
- ACCOUNT_KEY_FIELD_NAME);
- }
-
- if (accountKey == null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_KEY_FIELD_NAME,
- ACCOUNT_NAME_FIELD_NAME);
- }
-
- Optional<String> provided = getFirstNotNull(configuration, SHARED_ACCESS_SIGNATURE_FIELD_NAME,
- MANAGED_IDENTITY_ID_FIELD_NAME, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME,
- CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
- if (provided.isPresent()) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
- ACCOUNT_KEY_FIELD_NAME);
- }
- StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
- builder.credential(credential);
- }
-
- // Shared access signature
- if (sharedAccessSignature != null) {
- Optional<String> provided = getFirstNotNull(configuration, MANAGED_IDENTITY_ID_FIELD_NAME,
- CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME,
- CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
- if (provided.isPresent()) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
- SHARED_ACCESS_SIGNATURE_FIELD_NAME);
- }
- AzureSasCredential credential = new AzureSasCredential(sharedAccessSignature);
- builder.credential(credential);
- }
-
- // Managed Identity auth
- if (managedIdentityId != null) {
- Optional<String> provided = getFirstNotNull(configuration, CLIENT_ID_FIELD_NAME,
- CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME,
- TENANT_ID_FIELD_NAME);
- if (provided.isPresent()) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
- MANAGED_IDENTITY_ID_FIELD_NAME);
- }
- builder.credential(new ManagedIdentityCredentialBuilder().clientId(managedIdentityId).build());
- }
-
- // Client secret & certificate auth
- if (clientId != null) {
- // Both (or neither) client secret and client secret were provided, only one is allowed
- if ((clientSecret == null) == (clientCertificate == null)) {
- if (clientSecret != null) {
- throw new CompilationException(PARAMETERS_NOT_ALLOWED_AT_SAME_TIME, CLIENT_SECRET_FIELD_NAME,
- CLIENT_CERTIFICATE_FIELD_NAME);
- } else {
- throw new CompilationException(REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT,
- CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_ID_FIELD_NAME);
- }
- }
-
- // Tenant ID is required
- if (tenantId == null) {
- throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, TENANT_ID_FIELD_NAME,
- CLIENT_ID_FIELD_NAME);
- }
-
- // Client certificate password is not allowed if client secret is used
- if (clientCertificatePassword != null && clientSecret != null) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
- CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, CLIENT_SECRET_FIELD_NAME);
- }
-
- // Use AD authentication
- if (clientSecret != null) {
- ClientSecretCredentialBuilder secret = new ClientSecretCredentialBuilder();
- secret.clientId(clientId);
- secret.tenantId(tenantId);
- secret.clientSecret(clientSecret);
- builder.credential(secret.build());
- } else {
- // Certificate
- ClientCertificateCredentialBuilder certificate = new ClientCertificateCredentialBuilder();
- certificate.clientId(clientId);
- certificate.tenantId(tenantId);
- try {
- InputStream certificateContent = new ByteArrayInputStream(clientCertificate.getBytes(UTF_8));
- if (clientCertificatePassword == null) {
- Method pemCertificate = ClientCertificateCredentialBuilder.class
- .getDeclaredMethod("pemCertificate", InputStream.class);
- pemCertificate.setAccessible(true);
- pemCertificate.invoke(certificate, certificateContent);
- } else {
- Method pemCertificate = ClientCertificateCredentialBuilder.class
- .getDeclaredMethod("pfxCertificate", InputStream.class, String.class);
- pemCertificate.setAccessible(true);
- pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword);
- }
- } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
- throw new CompilationException(EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
- }
- builder.credential(certificate.build());
- }
- }
-
- // If client id is not present, ensure client secret, certificate, tenant id and client certificate
- // password are not present
- if (clientId == null) {
- Optional<String> provided = getFirstNotNull(configuration, CLIENT_SECRET_FIELD_NAME,
- CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
- if (provided.isPresent()) {
- throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
- SHARED_ACCESS_SIGNATURE_FIELD_NAME);
- }
- }
-
- try {
- return builder.buildClient();
- } catch (Exception ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
- }
- }
-
- public static List<BlobItem> listBlobItems(BlobServiceClient blobServiceClient,
- Map<String, String> configuration, IncludeExcludeMatcher includeExcludeMatcher,
- IWarningCollector warningCollector) throws CompilationException {
- String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-
- List<BlobItem> filesOnly = new ArrayList<>();
-
- // Ensure the validity of include/exclude
- ExternalDataUtils.validateIncludeExclude(configuration);
-
- BlobContainerClient blobContainer;
- try {
- blobContainer = blobServiceClient.getBlobContainerClient(container);
-
- // Get all objects in a container and extract the paths to files
- ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
- listBlobsOptions.setPrefix(ExternalDataUtils.getPrefix(configuration));
- Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
-
- // Collect the paths to files only
- collectAndFilterBlobFiles(blobItems, includeExcludeMatcher.getPredicate(),
- includeExcludeMatcher.getMatchersList(), filesOnly);
-
- // Warn if no files are returned
- if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
- Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
- warningCollector.warn(warning);
- }
- } catch (Exception ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
- }
-
- return filesOnly;
- }
-
- /**
- * Collects and filters the files only, and excludes any folders
- *
- * @param items storage items
- * @param predicate predicate to test with for file filtration
- * @param matchers include/exclude matchers to test against
- * @param filesOnly List containing the files only (excluding folders)
- */
- private static void collectAndFilterBlobFiles(Iterable<BlobItem> items,
- BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<BlobItem> filesOnly) {
- for (BlobItem item : items) {
- String uri = item.getName();
-
- // skip folders
- if (uri.endsWith("/")) {
- continue;
- }
-
- // No filter, add file
- if (predicate.test(matchers, uri)) {
- filesOnly.add(item);
- }
- }
- }
-
- public static List<PathItem> listDatalakePathItems(DataLakeServiceClient client,
- Map<String, String> configuration, IncludeExcludeMatcher includeExcludeMatcher,
- IWarningCollector warningCollector) throws CompilationException {
- String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-
- List<PathItem> filesOnly = new ArrayList<>();
-
- // Ensure the validity of include/exclude
- ExternalDataUtils.validateIncludeExclude(configuration);
-
- DataLakeFileSystemClient fileSystemClient;
- try {
- fileSystemClient = client.getFileSystemClient(container);
-
- // Get all objects in a container and extract the paths to files
- ListPathsOptions listOptions = new ListPathsOptions();
- boolean recursive = Boolean.parseBoolean(configuration.get(RECURSIVE_FIELD_NAME));
- listOptions.setRecursive(recursive);
- listOptions.setPath(ExternalDataUtils.getPrefix(configuration, false));
- PagedIterable<PathItem> pathItems = fileSystemClient.listPaths(listOptions, null);
-
- // Collect the paths to files only
- collectAndFilterDatalakeFiles(pathItems, includeExcludeMatcher.getPredicate(),
- includeExcludeMatcher.getMatchersList(), filesOnly);
-
- // Warn if no files are returned
- if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
- Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
- warningCollector.warn(warning);
- }
- } catch (Exception ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
- }
-
- return filesOnly;
- }
-
- /**
- * Collects and filters the files only, and excludes any folders
- *
- * @param items storage items
- * @param predicate predicate to test with for file filtration
- * @param matchers include/exclude matchers to test against
- * @param filesOnly List containing the files only (excluding folders)
- */
- private static void collectAndFilterDatalakeFiles(Iterable<PathItem> items,
- BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<PathItem> filesOnly) {
- for (PathItem item : items) {
- String uri = item.getName();
-
- // skip folders
- if (uri.endsWith("/")) {
- continue;
- }
-
- // No filter, add file
- if (predicate.test(matchers, uri)) {
- filesOnly.add(item);
- }
- }
- }
-
- /**
- * Validate external dataset properties
- *
- * @param configuration properties
- * @throws CompilationException Compilation exception
- */
- public static void validateAzureBlobProperties(Map<String, String> configuration, SourceLocation srcLoc,
- IWarningCollector collector, IApplicationContext appCtx) throws CompilationException {
-
- // check if the format property is present
- if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
- throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
- }
-
- validateIncludeExclude(configuration);
-
- // Check if the bucket is present
- BlobServiceClient blobServiceClient;
- try {
- String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- blobServiceClient = buildAzureBlobClient(appCtx, configuration);
- BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container);
-
- // Get all objects in a container and extract the paths to files
- ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
- listBlobsOptions.setPrefix(getPrefix(configuration));
- Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
-
- if (!blobItems.iterator().hasNext() && collector.shouldWarn()) {
- Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
- collector.warn(warning);
- }
- } catch (CompilationException ex) {
- throw ex;
- } catch (Exception ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
- }
- }
-
- /**
- * Validate external dataset properties
- *
- * @param configuration properties
- * @throws CompilationException Compilation exception
- */
- public static void validateAzureDataLakeProperties(Map<String, String> configuration, SourceLocation srcLoc,
- IWarningCollector collector, IApplicationContext appCtx) throws CompilationException {
-
- // check if the format property is present
- if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
- throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
- }
-
- validateIncludeExclude(configuration);
-
- // Check if the bucket is present
- DataLakeServiceClient dataLakeServiceClient;
- try {
- String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- dataLakeServiceClient = buildAzureDatalakeClient(appCtx, configuration);
- DataLakeFileSystemClient fileSystemClient = dataLakeServiceClient.getFileSystemClient(container);
-
- // Get all objects in a container and extract the paths to files
- ListPathsOptions listPathsOptions = new ListPathsOptions();
- listPathsOptions.setPath(getPrefix(configuration));
- Iterable<PathItem> blobItems = fileSystemClient.listPaths(listPathsOptions, null);
-
- if (!blobItems.iterator().hasNext() && collector.shouldWarn()) {
- Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
- collector.warn(warning);
- }
- } catch (CompilationException ex) {
- throw ex;
- } catch (Exception ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
- }
- }
-
- /**
- * Builds the Azure Blob storage client using the provided configuration
- *
- * @param configuration properties
- * @see <a href="https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/azure/azure-storage">Azure
- * Blob storage</a>
- */
- public static void configureAzureHdfsJobConf(JobConf conf, Map<String, String> configuration, String endPoint) {
- String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
- String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
-
- //Disable caching S3 FileSystem
- HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_AZURE_BLOB_PROTOCOL);
-
- //Key for Hadoop configuration
- StringBuilder hadoopKey = new StringBuilder();
- //Value for Hadoop configuration
- String hadoopValue;
- if (accountKey != null || sharedAccessSignature != null) {
- if (accountKey != null) {
- hadoopKey.append(HADOOP_AZURE_FS_ACCOUNT_KEY).append('.');
- //Set only the AccountKey
- hadoopValue = accountKey;
- } else {
- //Use SAS for Hadoop FS as connectionString is provided
- hadoopKey.append(HADOOP_AZURE_FS_SAS).append('.');
- //Setting the container is required for SAS
- hadoopKey.append(container).append('.');
- //Set the connection string for SAS
- hadoopValue = sharedAccessSignature;
- }
- //Set the endPoint, which includes the AccountName
- hadoopKey.append(endPoint);
- //Tells Hadoop we are reading from Blob Storage
- conf.set(hadoopKey.toString(), hadoopValue);
- }
- }
- }
-
- public static class GCS {
- private GCS() {
- throw new AssertionError("do not instantiate");
-
- }
-
- //TODO(htowaileb): Add validation step similar to other externals, which also checks if empty bucket
- //upon creating the external dataset
-
- /**
- * Builds the client using the provided configuration
- *
- * @param configuration properties
- * @return clientasterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
- * @throws CompilationException CompilationException
- */
- public static Storage buildClient(Map<String, String> configuration) throws CompilationException {
- String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
-
- StorageOptions.Builder builder = StorageOptions.newBuilder();
-
- // Use credentials if available
- if (jsonCredentials != null) {
- try (InputStream credentialsStream = new ByteArrayInputStream(jsonCredentials.getBytes())) {
- builder.setCredentials(ServiceAccountCredentials.fromStream(credentialsStream));
- } catch (IOException ex) {
- throw new CompilationException(EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
- }
- }
-
- return builder.build().getService();
- }
-
- /**
- * Validate external dataset properties
- *
- * @param configuration properties
- * @throws CompilationException Compilation exception
- */
- public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
- IWarningCollector collector) throws CompilationException {
-
- // check if the format property is present
- if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
- throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
- }
-
- // parquet is not supported for google cloud storage
- if (isParquetFormat(configuration)) {
- throw new CompilationException(INVALID_REQ_PARAM_VAL, srcLoc, KEY_FORMAT,
- configuration.get(KEY_FORMAT));
- }
-
- validateIncludeExclude(configuration);
- String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-
- try {
- BlobListOption limitOption = BlobListOption.pageSize(1);
- BlobListOption prefixOption = BlobListOption.prefix(getPrefix(configuration));
- Storage storage = buildClient(configuration);
- Page<Blob> items = storage.list(container, limitOption, prefixOption);
-
- if (!items.iterateAll().iterator().hasNext() && collector.shouldWarn()) {
- Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
- collector.warn(warning);
- }
- } catch (CompilationException ex) {
- throw ex;
- } catch (Exception ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
- }
- }
- }
-
public static int roundUpToNearestFrameSize(int size, int framesize) {
return ((size / framesize) + 1) * framesize;
}
@@ -1911,7 +827,7 @@
return maxArgSz;
}
- private static Optional<String> getFirstNotNull(Map<String, String> configuration, String... parameters) {
+ public static Optional<String> getFirstNotNull(Map<String, String> configuration, String... parameters) {
return Arrays.stream(parameters).filter(field -> configuration.get(field) != null).findFirst();
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
new file mode 100644
index 0000000..e1c10ad
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.aws.s3;
+
+public class S3Constants {
+ private S3Constants() {
+ throw new AssertionError("do not instantiate");
+ }
+
+ public static final String REGION_FIELD_NAME = "region";
+ public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId";
+ public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey";
+ public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken";
+ public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
+
+ // AWS S3 specific error codes
+ public static final String ERROR_INTERNAL_ERROR = "InternalError";
+ public static final String ERROR_SLOW_DOWN = "SlowDown";
+ public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented";
+
+ /*
+ * Hadoop-AWS
+ * AWS connectors for s3 and s3n are deprecated.
+ */
+ public static final String HADOOP_ACCESS_KEY_ID = "fs.s3a.access.key";
+ public static final String HADOOP_SECRET_ACCESS_KEY = "fs.s3a.secret.key";
+ public static final String HADOOP_SESSION_TOKEN = "fs.s3a.session.token";
+ public static final String HADOOP_REGION = "fs.s3a.region";
+ public static final String HADOOP_SERVICE_END_POINT = "fs.s3a.endpoint";
+
+ /*
+ * Internal configurations
+ */
+ //Allows accessing directories as file system path
+ public static final String HADOOP_PATH_STYLE_ACCESS = "fs.s3a.path.style.access";
+ //The number of maximum HTTP connections in connection pool
+ public static final String HADOOP_S3_CONNECTION_POOL_SIZE = "fs.s3a.connection.maximum";
+ //S3 used protocol
+ public static final String HADOOP_S3_PROTOCOL = "s3a";
+
+ //Hadoop credentials provider key
+ public static final String HADOOP_CREDENTIAL_PROVIDER_KEY = "fs.s3a.aws.credentials.provider";
+ //Anonymous credential provider
+ public static final String HADOOP_ANONYMOUS_ACCESS = "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider";
+ //Temporary credential provider
+ public static final String HADOOP_TEMP_ACCESS = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider";
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
new file mode 100644
index 0000000..a88d59b
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.aws.s3;
+
+import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
+import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
+import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.*;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiPredicate;
+import java.util.regex.Matcher;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.util.CleanupUtils;
+
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.model.S3Response;
+
+public class S3Utils {
+ private S3Utils() {
+ throw new AssertionError("do not instantiate");
+ }
+
+ public static boolean isRetryableError(String errorCode) {
+ return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
+ }
+
+ /**
+ * Builds the S3 client using the provided configuration
+ *
+ * @param configuration properties
+ * @return S3 client
+ * @throws CompilationException CompilationException
+ */
+ public static S3Client buildAwsS3Client(Map<String, String> configuration) throws CompilationException {
+ // TODO(Hussain): Need to ensure that all required parameters are present in a previous step
+ String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+ String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
+ String regionId = configuration.get(REGION_FIELD_NAME);
+ String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
+
+ S3ClientBuilder builder = S3Client.builder();
+
+ // Credentials
+ AwsCredentialsProvider credentialsProvider;
+
+ // No auth required
+ if (accessKeyId == null) {
+ credentialsProvider = AnonymousCredentialsProvider.create();
+ } else {
+ // auth required, check for temporary or permanent credentials
+ if (sessionToken != null) {
+ credentialsProvider = StaticCredentialsProvider
+ .create(AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
+ } else {
+ credentialsProvider =
+ StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey));
+ }
+ }
+
+ builder.credentialsProvider(credentialsProvider);
+
+ // Validate the region
+ List<Region> regions = S3Client.serviceMetadata().regions();
+ Optional<Region> selectedRegion = regions.stream().filter(region -> region.id().equals(regionId)).findFirst();
+
+ if (selectedRegion.isEmpty()) {
+ throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
+ }
+ builder.region(selectedRegion.get());
+
+ // Validate the service endpoint if present
+ if (serviceEndpoint != null) {
+ try {
+ URI uri = new URI(serviceEndpoint);
+ try {
+ builder.endpointOverride(uri);
+ } catch (NullPointerException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+ } catch (URISyntaxException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
+ String.format("Invalid service endpoint %s", serviceEndpoint));
+ }
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Builds the S3 client using the provided configuration
+ *
+ * @param configuration properties
+ * @param numberOfPartitions number of partitions in the cluster
+ */
+ public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
+ int numberOfPartitions) {
+ String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+ String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
+ String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
+
+ //Disable caching S3 FileSystem
+ HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
+
+ /*
+ * Authentication Methods:
+ * 1- Anonymous: no accessKeyId and no secretAccessKey
+ * 2- Temporary: has to provide accessKeyId, secretAccessKey and sessionToken
+ * 3- Private: has to provide accessKeyId and secretAccessKey
+ */
+ if (accessKeyId == null) {
+ //Tells hadoop-aws it is an anonymous access
+ conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS);
+ } else {
+ conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId);
+ conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey);
+ if (sessionToken != null) {
+ conf.set(HADOOP_SESSION_TOKEN, sessionToken);
+ //Tells hadoop-aws it is a temporary access
+ conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS);
+ }
+ }
+
+ /*
+ * This is to allow S3 definition to have path-style form. Should always be true to match the current
+ * way we access files in S3
+ */
+ conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
+
+ /*
+ * Set the size of S3 connection pool to be the number of partitions
+ */
+ conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
+
+ if (serviceEndpoint != null) {
+ // Validation of the URL should be done at hadoop-aws level
+ conf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
+ } else {
+ //Region is ignored and buckets could be found by the central endpoint
+ conf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
+ }
+ }
+
+ /**
+ * Validate external dataset properties
+ *
+ * @param configuration properties
+ * @throws CompilationException Compilation exception
+ */
+ public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
+ IWarningCollector collector) throws CompilationException {
+
+ // check if the format property is present
+ if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
+ }
+
+ // Both parameters should be passed, or neither should be passed (for anonymous/no auth)
+ String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+ if (accessKeyId == null || secretAccessKey == null) {
+ // If one is passed, the other is required
+ if (accessKeyId != null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
+ ACCESS_KEY_ID_FIELD_NAME);
+ } else if (secretAccessKey != null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
+ SECRET_ACCESS_KEY_FIELD_NAME);
+ }
+ }
+
+ validateIncludeExclude(configuration);
+
+ // Check if the bucket is present
+ S3Client s3Client = buildAwsS3Client(configuration);
+ S3Response response;
+ boolean useOldApi = false;
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ String prefix = getPrefix(configuration);
+
+ try {
+ response = isBucketEmpty(s3Client, container, prefix, false);
+ } catch (S3Exception ex) {
+ // Method not implemented, try falling back to old API
+ try {
+ // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
+ if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
+ useOldApi = true;
+ response = isBucketEmpty(s3Client, container, prefix, true);
+ } else {
+ throw ex;
+ }
+ } catch (SdkException ex2) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+ } catch (SdkException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ } finally {
+ if (s3Client != null) {
+ CleanupUtils.close(s3Client, null);
+ }
+ }
+
+ boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty()
+ : ((ListObjectsV2Response) response).contents().isEmpty();
+ if (isEmpty && collector.shouldWarn()) {
+ Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+ collector.warn(warning);
+ }
+
+ // Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to
+ // ensure coverage, check if the result is successful as well and not only catch exceptions
+ if (!response.sdkHttpResponse().isSuccessful()) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
+ }
+ }
+
+ /**
+ * Checks for a single object in the specified bucket to determine if the bucket is empty or not.
+ *
+ * @param s3Client s3 client
+ * @param container the container name
+ * @param prefix Prefix to be used
+ * @param useOldApi flag whether to use the old API or not
+ * @return returns the S3 response
+ */
+ private static S3Response isBucketEmpty(S3Client s3Client, String container, String prefix, boolean useOldApi) {
+ S3Response response;
+ if (useOldApi) {
+ ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder();
+ listObjectsBuilder.prefix(prefix);
+ response = s3Client.listObjects(listObjectsBuilder.bucket(container).maxKeys(1).build());
+ } else {
+ ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder();
+ listObjectsBuilder.prefix(prefix);
+ response = s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build());
+ }
+ return response;
+ }
+
+ /**
+ * Returns the lists of S3 objects.
+ *
+ * @param configuration properties
+ * @param includeExcludeMatcher include/exclude matchers to apply
+ */
+ public static List<S3Object> listS3Objects(Map<String, String> configuration,
+ AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
+ IWarningCollector warningCollector) throws CompilationException {
+ // Prepare to retrieve the objects
+ List<S3Object> filesOnly;
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ S3Client s3Client = buildAwsS3Client(configuration);
+ String prefix = getPrefix(configuration);
+
+ try {
+ filesOnly = listS3Objects(s3Client, container, prefix, includeExcludeMatcher);
+ } catch (S3Exception ex) {
+ // New API is not implemented, try falling back to old API
+ try {
+ // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
+ if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
+ filesOnly = oldApiListS3Objects(s3Client, container, prefix, includeExcludeMatcher);
+ } else {
+ throw ex;
+ }
+ } catch (SdkException ex2) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+ } catch (SdkException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ } finally {
+ if (s3Client != null) {
+ CleanupUtils.close(s3Client, null);
+ }
+ }
+
+ // Warn if no files are returned
+ if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
+ Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+ warningCollector.warn(warning);
+ }
+
+ return filesOnly;
+ }
+
+ /**
+ * Uses the latest API to retrieve the objects from the storage.
+ *
+ * @param s3Client S3 client
+ * @param container container name
+ * @param prefix definition prefix
+ * @param includeExcludeMatcher include/exclude matchers to apply
+ */
+ private static List<S3Object> listS3Objects(S3Client s3Client, String container, String prefix,
+ AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher) {
+ String newMarker = null;
+ List<S3Object> filesOnly = new ArrayList<>();
+
+ ListObjectsV2Response listObjectsResponse;
+ ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container);
+ listObjectsBuilder.prefix(prefix);
+
+ while (true) {
+ // List the objects from the start, or from the last marker in case of truncated result
+ if (newMarker == null) {
+ listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build());
+ } else {
+ listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
+ }
+
+ // Collect the paths to files only
+ collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
+ includeExcludeMatcher.getMatchersList(), filesOnly);
+
+ // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
+ if (!listObjectsResponse.isTruncated()) {
+ break;
+ } else {
+ newMarker = listObjectsResponse.nextContinuationToken();
+ }
+ }
+
+ return filesOnly;
+ }
+
+ /**
+ * Uses the old API (in case the new API is not implemented) to retrieve the objects from the storage
+ *
+ * @param s3Client S3 client
+ * @param container container name
+ * @param prefix definition prefix
+ * @param includeExcludeMatcher include/exclude matchers to apply
+ */
+ private static List<S3Object> oldApiListS3Objects(S3Client s3Client, String container, String prefix,
+ AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher) {
+ String newMarker = null;
+ List<S3Object> filesOnly = new ArrayList<>();
+
+ ListObjectsResponse listObjectsResponse;
+ ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container);
+ listObjectsBuilder.prefix(prefix);
+
+ while (true) {
+ // List the objects from the start, or from the last marker in case of truncated result
+ if (newMarker == null) {
+ listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build());
+ } else {
+ listObjectsResponse = s3Client.listObjects(listObjectsBuilder.marker(newMarker).build());
+ }
+
+ // Collect the paths to files only
+ collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
+ includeExcludeMatcher.getMatchersList(), filesOnly);
+
+ // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
+ if (!listObjectsResponse.isTruncated()) {
+ break;
+ } else {
+ newMarker = listObjectsResponse.nextMarker();
+ }
+ }
+
+ return filesOnly;
+ }
+
+ /**
+ * AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered
+ * a file if it does not end up with a "/" which is the separator in a folder structure.
+ *
+ * @param s3Objects List of returned objects
+ */
+ private static void collectAndFilterFiles(List<S3Object> s3Objects, BiPredicate<List<Matcher>, String> predicate,
+ List<Matcher> matchers, List<S3Object> filesOnly) {
+ for (S3Object object : s3Objects) {
+ // skip folders
+ if (object.key().endsWith("/")) {
+ continue;
+ }
+
+ // No filter, add file
+ if (predicate.test(matchers, object.key())) {
+ filesOnly.add(object);
+ }
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureConstants.java
new file mode 100644
index 0000000..9ade27b
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureConstants.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.azure.blob_storage;
+
+/*
+ * Note: Azure Blob and Azure Datalake use identical authentication, so they are using the same properties.
+ * If they end up diverging, then properties for AzureBlob and AzureDataLake need to be created.
+ */
+public class AzureConstants {
+ private AzureConstants() {
+ throw new AssertionError("do not instantiate");
+ }
+
+ /*
+ * Asterix Configuration Keys
+ */
+ public static final String MANAGED_IDENTITY_ID_FIELD_NAME = "managedIdentityId";
+ public static final String ACCOUNT_NAME_FIELD_NAME = "accountName";
+ public static final String ACCOUNT_KEY_FIELD_NAME = "accountKey";
+ public static final String SHARED_ACCESS_SIGNATURE_FIELD_NAME = "sharedAccessSignature";
+ public static final String TENANT_ID_FIELD_NAME = "tenantId";
+ public static final String CLIENT_ID_FIELD_NAME = "clientId";
+ public static final String CLIENT_SECRET_FIELD_NAME = "clientSecret";
+ public static final String CLIENT_CERTIFICATE_FIELD_NAME = "clientCertificate";
+ public static final String CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME = "clientCertificatePassword";
+ public static final String ENDPOINT_FIELD_NAME = "endpoint";
+
+ // Specific Azure data lake property
+ /*
+ The behavior of Data Lake (true file system) is to read the files of the specified prefix only, example:
+ storage/myData/personal/file1.json
+ storage/myData/personal/file2.json
+ storage/myData/file3.json
+ If the prefix used is "myData", then only the file file3.json is read. However, if the property "recursive"
+ is set to "true" when creating the external dataset, then it goes recursively overall the paths, and the result
+ is file1.json, file2.json and file3.json.
+ */
+ public static final String RECURSIVE_FIELD_NAME = "recursive";
+
+ /*
+ * Hadoop-Azure
+ */
+ //Used when accountName and accessKey are provided
+ public static final String HADOOP_AZURE_FS_ACCOUNT_KEY = "fs.azure.account.key";
+ //Used when a connectionString is provided
+ public static final String HADOOP_AZURE_FS_SAS = "fs.azure.sas";
+ public static final String HADOOP_AZURE_BLOB_PROTOCOL = "wasbs";
+ public static final String HADOOP_AZURE_DATALAKE_PROTOCOL = "abfss";
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureUtils.java
new file mode 100644
index 0000000..0dc9ad2
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureUtils.java
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.azure.blob_storage;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
+import static org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_NOT_ALLOWED_AT_SAME_TIME;
+import static org.apache.asterix.common.exceptions.ErrorCode.PARAMETERS_REQUIRED;
+import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
+import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
+import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT;
+import static org.apache.asterix.external.util.ExternalDataUtils.getFirstNotNull;
+import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.ACCOUNT_KEY_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.ACCOUNT_NAME_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.CLIENT_CERTIFICATE_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.CLIENT_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.CLIENT_SECRET_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.ENDPOINT_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_BLOB_PROTOCOL;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_FS_ACCOUNT_KEY;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.HADOOP_AZURE_FS_SAS;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.MANAGED_IDENTITY_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.RECURSIVE_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.SHARED_ACCESS_SIGNATURE_FIELD_NAME;
+import static org.apache.asterix.external.util.azure.blob_storage.AzureConstants.TENANT_ID_FIELD_NAME;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiPredicate;
+import java.util.regex.Matcher;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+
+import com.azure.core.credential.AzureSasCredential;
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.identity.ClientCertificateCredentialBuilder;
+import com.azure.identity.ClientSecretCredentialBuilder;
+import com.azure.identity.ManagedIdentityCredentialBuilder;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.common.policy.RequestRetryOptions;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import com.azure.storage.file.datalake.models.ListPathsOptions;
+import com.azure.storage.file.datalake.models.PathItem;
+
+public class AzureUtils {
+ private AzureUtils() {
+ throw new AssertionError("do not instantiate");
+ }
+
+ /**
+ * Builds the Azure storage account using the provided configuration
+ *
+ * @param configuration properties
+ * @return client
+ */
+ public static BlobServiceClient buildAzureBlobClient(IApplicationContext appCtx, Map<String, String> configuration)
+ throws CompilationException {
+ String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
+ String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
+ String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
+ String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+ String tenantId = configuration.get(TENANT_ID_FIELD_NAME);
+ String clientId = configuration.get(CLIENT_ID_FIELD_NAME);
+ String clientSecret = configuration.get(CLIENT_SECRET_FIELD_NAME);
+ String clientCertificate = configuration.get(CLIENT_CERTIFICATE_FIELD_NAME);
+ String clientCertificatePassword = configuration.get(CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME);
+ String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
+
+ // Client builder
+ BlobServiceClientBuilder builder = new BlobServiceClientBuilder();
+ int timeout = appCtx.getExternalProperties().getAzureRequestTimeout();
+ RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null);
+ builder.retryOptions(requestRetryOptions);
+
+ // Endpoint is required
+ if (endpoint == null) {
+ throw new CompilationException(PARAMETERS_REQUIRED, ENDPOINT_FIELD_NAME);
+ }
+ builder.endpoint(endpoint);
+
+ // Shared Key
+ if (accountName != null || accountKey != null) {
+ if (accountName == null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_NAME_FIELD_NAME,
+ ACCOUNT_KEY_FIELD_NAME);
+ }
+
+ if (accountKey == null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_KEY_FIELD_NAME,
+ ACCOUNT_NAME_FIELD_NAME);
+ }
+
+ Optional<String> provided = getFirstNotNull(configuration, SHARED_ACCESS_SIGNATURE_FIELD_NAME,
+ MANAGED_IDENTITY_ID_FIELD_NAME, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME,
+ CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+ if (provided.isPresent()) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+ ACCOUNT_KEY_FIELD_NAME);
+ }
+ StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
+ builder.credential(credential);
+ }
+
+ // Shared access signature
+ if (sharedAccessSignature != null) {
+ Optional<String> provided = getFirstNotNull(configuration, MANAGED_IDENTITY_ID_FIELD_NAME,
+ CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME,
+ CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+ if (provided.isPresent()) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+ SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+ }
+ AzureSasCredential credential = new AzureSasCredential(sharedAccessSignature);
+ builder.credential(credential);
+ }
+
+ // Managed Identity auth
+ if (managedIdentityId != null) {
+ Optional<String> provided = getFirstNotNull(configuration, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME,
+ CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+ if (provided.isPresent()) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+ MANAGED_IDENTITY_ID_FIELD_NAME);
+ }
+ builder.credential(new ManagedIdentityCredentialBuilder().clientId(managedIdentityId).build());
+ }
+
+ // Client secret & certificate auth
+ if (clientId != null) {
+ // Both (or neither) client secret and client secret were provided, only one is allowed
+ if ((clientSecret == null) == (clientCertificate == null)) {
+ if (clientSecret != null) {
+ throw new CompilationException(PARAMETERS_NOT_ALLOWED_AT_SAME_TIME, CLIENT_SECRET_FIELD_NAME,
+ CLIENT_CERTIFICATE_FIELD_NAME);
+ } else {
+ throw new CompilationException(REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT,
+ CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_ID_FIELD_NAME);
+ }
+ }
+
+ // Tenant ID is required
+ if (tenantId == null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, TENANT_ID_FIELD_NAME,
+ CLIENT_ID_FIELD_NAME);
+ }
+
+ // Client certificate password is not allowed if client secret is used
+ if (clientCertificatePassword != null && clientSecret != null) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
+ CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, CLIENT_SECRET_FIELD_NAME);
+ }
+
+ // Use AD authentication
+ if (clientSecret != null) {
+ ClientSecretCredentialBuilder secret = new ClientSecretCredentialBuilder();
+ secret.clientId(clientId);
+ secret.tenantId(tenantId);
+ secret.clientSecret(clientSecret);
+ builder.credential(secret.build());
+ } else {
+ // Certificate
+ ClientCertificateCredentialBuilder certificate = new ClientCertificateCredentialBuilder();
+ certificate.clientId(clientId);
+ certificate.tenantId(tenantId);
+ try {
+ InputStream certificateContent = new ByteArrayInputStream(clientCertificate.getBytes(UTF_8));
+ if (clientCertificatePassword == null) {
+ Method pemCertificate = ClientCertificateCredentialBuilder.class
+ .getDeclaredMethod("pemCertificate", InputStream.class);
+ pemCertificate.setAccessible(true);
+ pemCertificate.invoke(certificate, certificateContent);
+ } else {
+ Method pemCertificate = ClientCertificateCredentialBuilder.class
+ .getDeclaredMethod("pfxCertificate", InputStream.class, String.class);
+ pemCertificate.setAccessible(true);
+ pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword);
+ }
+ } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
+ throw new CompilationException(EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ }
+ builder.credential(certificate.build());
+ }
+ }
+
+ // If client id is not present, ensure client secret, certificate, tenant id and client certificate
+ // password are not present
+ if (clientId == null) {
+ Optional<String> provided = getFirstNotNull(configuration, CLIENT_SECRET_FIELD_NAME,
+ CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+ if (provided.isPresent()) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+ SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+ }
+ }
+
+ try {
+ return builder.buildClient();
+ } catch (Exception ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+ }
+
+ /**
+ * Builds the Azure data lake storage account using the provided configuration
+ *
+ * @param configuration properties
+ * @return client
+ */
+ public static DataLakeServiceClient buildAzureDatalakeClient(IApplicationContext appCtx,
+ Map<String, String> configuration) throws CompilationException {
+ String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
+ String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
+ String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
+ String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+ String tenantId = configuration.get(TENANT_ID_FIELD_NAME);
+ String clientId = configuration.get(CLIENT_ID_FIELD_NAME);
+ String clientSecret = configuration.get(CLIENT_SECRET_FIELD_NAME);
+ String clientCertificate = configuration.get(CLIENT_CERTIFICATE_FIELD_NAME);
+ String clientCertificatePassword = configuration.get(CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME);
+ String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
+
+ // Client builder
+ DataLakeServiceClientBuilder builder = new DataLakeServiceClientBuilder();
+ int timeout = appCtx.getExternalProperties().getAzureRequestTimeout();
+ RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null);
+ builder.retryOptions(requestRetryOptions);
+
+ // Endpoint is required
+ if (endpoint == null) {
+ throw new CompilationException(PARAMETERS_REQUIRED, ENDPOINT_FIELD_NAME);
+ }
+ builder.endpoint(endpoint);
+
+ // Shared Key
+ if (accountName != null || accountKey != null) {
+ if (accountName == null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_NAME_FIELD_NAME,
+ ACCOUNT_KEY_FIELD_NAME);
+ }
+
+ if (accountKey == null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_KEY_FIELD_NAME,
+ ACCOUNT_NAME_FIELD_NAME);
+ }
+
+ Optional<String> provided = getFirstNotNull(configuration, SHARED_ACCESS_SIGNATURE_FIELD_NAME,
+ MANAGED_IDENTITY_ID_FIELD_NAME, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME,
+ CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+ if (provided.isPresent()) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+ ACCOUNT_KEY_FIELD_NAME);
+ }
+ StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
+ builder.credential(credential);
+ }
+
+ // Shared access signature
+ if (sharedAccessSignature != null) {
+ Optional<String> provided = getFirstNotNull(configuration, MANAGED_IDENTITY_ID_FIELD_NAME,
+ CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME,
+ CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+ if (provided.isPresent()) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+ SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+ }
+ AzureSasCredential credential = new AzureSasCredential(sharedAccessSignature);
+ builder.credential(credential);
+ }
+
+ // Managed Identity auth
+ if (managedIdentityId != null) {
+ Optional<String> provided = getFirstNotNull(configuration, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME,
+ CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+ if (provided.isPresent()) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+ MANAGED_IDENTITY_ID_FIELD_NAME);
+ }
+ builder.credential(new ManagedIdentityCredentialBuilder().clientId(managedIdentityId).build());
+ }
+
+ // Client secret & certificate auth
+ if (clientId != null) {
+ // Both (or neither) client secret and client secret were provided, only one is allowed
+ if ((clientSecret == null) == (clientCertificate == null)) {
+ if (clientSecret != null) {
+ throw new CompilationException(PARAMETERS_NOT_ALLOWED_AT_SAME_TIME, CLIENT_SECRET_FIELD_NAME,
+ CLIENT_CERTIFICATE_FIELD_NAME);
+ } else {
+ throw new CompilationException(REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT,
+ CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_ID_FIELD_NAME);
+ }
+ }
+
+ // Tenant ID is required
+ if (tenantId == null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, TENANT_ID_FIELD_NAME,
+ CLIENT_ID_FIELD_NAME);
+ }
+
+ // Client certificate password is not allowed if client secret is used
+ if (clientCertificatePassword != null && clientSecret != null) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
+ CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, CLIENT_SECRET_FIELD_NAME);
+ }
+
+ // Use AD authentication
+ if (clientSecret != null) {
+ ClientSecretCredentialBuilder secret = new ClientSecretCredentialBuilder();
+ secret.clientId(clientId);
+ secret.tenantId(tenantId);
+ secret.clientSecret(clientSecret);
+ builder.credential(secret.build());
+ } else {
+ // Certificate
+ ClientCertificateCredentialBuilder certificate = new ClientCertificateCredentialBuilder();
+ certificate.clientId(clientId);
+ certificate.tenantId(tenantId);
+ try {
+ InputStream certificateContent = new ByteArrayInputStream(clientCertificate.getBytes(UTF_8));
+ if (clientCertificatePassword == null) {
+ Method pemCertificate = ClientCertificateCredentialBuilder.class
+ .getDeclaredMethod("pemCertificate", InputStream.class);
+ pemCertificate.setAccessible(true);
+ pemCertificate.invoke(certificate, certificateContent);
+ } else {
+ Method pemCertificate = ClientCertificateCredentialBuilder.class
+ .getDeclaredMethod("pfxCertificate", InputStream.class, String.class);
+ pemCertificate.setAccessible(true);
+ pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword);
+ }
+ } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
+ throw new CompilationException(EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+ builder.credential(certificate.build());
+ }
+ }
+
+ // If client id is not present, ensure client secret, certificate, tenant id and client certificate
+ // password are not present
+ if (clientId == null) {
+ Optional<String> provided = getFirstNotNull(configuration, CLIENT_SECRET_FIELD_NAME,
+ CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+ if (provided.isPresent()) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+ SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+ }
+ }
+
+ try {
+ return builder.buildClient();
+ } catch (Exception ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+ }
+
+ public static List<BlobItem> listBlobItems(BlobServiceClient blobServiceClient, Map<String, String> configuration,
+ AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
+ IWarningCollector warningCollector) throws CompilationException {
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+ List<BlobItem> filesOnly = new ArrayList<>();
+
+ // Ensure the validity of include/exclude
+ validateIncludeExclude(configuration);
+
+ BlobContainerClient blobContainer;
+ try {
+ blobContainer = blobServiceClient.getBlobContainerClient(container);
+
+ // Get all objects in a container and extract the paths to files
+ ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
+ listBlobsOptions.setPrefix(getPrefix(configuration));
+ Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
+
+ // Collect the paths to files only
+ collectAndFilterBlobFiles(blobItems, includeExcludeMatcher.getPredicate(),
+ includeExcludeMatcher.getMatchersList(), filesOnly);
+
+ // Warn if no files are returned
+ if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
+ Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+ warningCollector.warn(warning);
+ }
+ } catch (Exception ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+
+ return filesOnly;
+ }
+
+ /**
+ * Collects and filters the files only, and excludes any folders
+ *
+ * @param items storage items
+ * @param predicate predicate to test with for file filtration
+ * @param matchers include/exclude matchers to test against
+ * @param filesOnly List containing the files only (excluding folders)
+ */
+ private static void collectAndFilterBlobFiles(Iterable<BlobItem> items,
+ BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<BlobItem> filesOnly) {
+ for (BlobItem item : items) {
+ String uri = item.getName();
+
+ // skip folders
+ if (uri.endsWith("/")) {
+ continue;
+ }
+
+ // No filter, add file
+ if (predicate.test(matchers, uri)) {
+ filesOnly.add(item);
+ }
+ }
+ }
+
+ public static List<PathItem> listDatalakePathItems(DataLakeServiceClient client, Map<String, String> configuration,
+ AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
+ IWarningCollector warningCollector) throws CompilationException {
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+ List<PathItem> filesOnly = new ArrayList<>();
+
+ // Ensure the validity of include/exclude
+ validateIncludeExclude(configuration);
+
+ DataLakeFileSystemClient fileSystemClient;
+ try {
+ fileSystemClient = client.getFileSystemClient(container);
+
+ // Get all objects in a container and extract the paths to files
+ ListPathsOptions listOptions = new ListPathsOptions();
+ boolean recursive = Boolean.parseBoolean(configuration.get(RECURSIVE_FIELD_NAME));
+ listOptions.setRecursive(recursive);
+ listOptions.setPath(getPrefix(configuration, false));
+ PagedIterable<PathItem> pathItems = fileSystemClient.listPaths(listOptions, null);
+
+ // Collect the paths to files only
+ collectAndFilterDatalakeFiles(pathItems, includeExcludeMatcher.getPredicate(),
+ includeExcludeMatcher.getMatchersList(), filesOnly);
+
+ // Warn if no files are returned
+ if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
+ Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+ warningCollector.warn(warning);
+ }
+ } catch (Exception ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+
+ return filesOnly;
+ }
+
+ /**
+ * Collects and filters the files only, and excludes any folders
+ *
+ * @param items storage items
+ * @param predicate predicate to test with for file filtration
+ * @param matchers include/exclude matchers to test against
+ * @param filesOnly List containing the files only (excluding folders)
+ */
+ private static void collectAndFilterDatalakeFiles(Iterable<PathItem> items,
+ BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<PathItem> filesOnly) {
+ for (PathItem item : items) {
+ String uri = item.getName();
+
+ // skip folders
+ if (uri.endsWith("/")) {
+ continue;
+ }
+
+ // No filter, add file
+ if (predicate.test(matchers, uri)) {
+ filesOnly.add(item);
+ }
+ }
+ }
+
+ /**
+ * Validate external dataset properties
+ *
+ * @param configuration properties
+ * @throws CompilationException Compilation exception
+ */
+ public static void validateAzureBlobProperties(Map<String, String> configuration, SourceLocation srcLoc,
+ IWarningCollector collector, IApplicationContext appCtx) throws CompilationException {
+
+ // check if the format property is present
+ if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
+ }
+
+ validateIncludeExclude(configuration);
+
+ // Check if the bucket is present
+ BlobServiceClient blobServiceClient;
+ try {
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ blobServiceClient = buildAzureBlobClient(appCtx, configuration);
+ BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container);
+
+ // Get all objects in a container and extract the paths to files
+ ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
+ listBlobsOptions.setPrefix(getPrefix(configuration));
+ Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
+
+ if (!blobItems.iterator().hasNext() && collector.shouldWarn()) {
+ Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+ collector.warn(warning);
+ }
+ } catch (CompilationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+ }
+
+ /**
+ * Validate external dataset properties
+ *
+ * @param configuration properties
+ * @throws CompilationException Compilation exception
+ */
+ public static void validateAzureDataLakeProperties(Map<String, String> configuration, SourceLocation srcLoc,
+ IWarningCollector collector, IApplicationContext appCtx) throws CompilationException {
+
+ // check if the format property is present
+ if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
+ }
+
+ validateIncludeExclude(configuration);
+
+ // Check if the bucket is present
+ DataLakeServiceClient dataLakeServiceClient;
+ try {
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ dataLakeServiceClient = buildAzureDatalakeClient(appCtx, configuration);
+ DataLakeFileSystemClient fileSystemClient = dataLakeServiceClient.getFileSystemClient(container);
+
+ // Get all objects in a container and extract the paths to files
+ ListPathsOptions listPathsOptions = new ListPathsOptions();
+ listPathsOptions.setPath(getPrefix(configuration));
+ Iterable<PathItem> blobItems = fileSystemClient.listPaths(listPathsOptions, null);
+
+ if (!blobItems.iterator().hasNext() && collector.shouldWarn()) {
+ Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+ collector.warn(warning);
+ }
+ } catch (CompilationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+ }
+
+ /**
+ * Builds the Azure Blob storage client using the provided configuration
+ *
+ * @param configuration properties
+ * @see <a href="https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/azure/azure-storage">Azure
+ * Blob storage</a>
+ */
+ public static void configureAzureHdfsJobConf(JobConf conf, Map<String, String> configuration, String endPoint) {
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
+ String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+
+ //Disable caching S3 FileSystem
+ HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_AZURE_BLOB_PROTOCOL);
+
+ //Key for Hadoop configuration
+ StringBuilder hadoopKey = new StringBuilder();
+ //Value for Hadoop configuration
+ String hadoopValue;
+ if (accountKey != null || sharedAccessSignature != null) {
+ if (accountKey != null) {
+ hadoopKey.append(HADOOP_AZURE_FS_ACCOUNT_KEY).append('.');
+ //Set only the AccountKey
+ hadoopValue = accountKey;
+ } else {
+ //Use SAS for Hadoop FS as connectionString is provided
+ hadoopKey.append(HADOOP_AZURE_FS_SAS).append('.');
+ //Setting the container is required for SAS
+ hadoopKey.append(container).append('.');
+ //Set the connection string for SAS
+ hadoopValue = sharedAccessSignature;
+ }
+ //Set the endPoint, which includes the AccountName
+ hadoopKey.append(endPoint);
+ //Tells Hadoop we are reading from Blob Storage
+ conf.set(hadoopKey.toString(), hadoopValue);
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
new file mode 100644
index 0000000..8a0be99
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.google.gcs;
+
+public class GCSConstants {
+ private GCSConstants() {
+ throw new AssertionError("do not instantiate");
+ }
+
+ public static final String JSON_CREDENTIALS_FIELD_NAME = "jsonCredentials";
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
new file mode 100644
index 0000000..553733f
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.google.gcs;
+
+import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
+import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL;
+import static org.apache.asterix.external.util.ExternalDataConstants.KEY_FORMAT;
+import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.isParquetFormat;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.JSON_CREDENTIALS_FIELD_NAME;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+
+import com.google.api.gax.paging.Page;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+
+public class GCSUtils {
+ private GCSUtils() {
+ throw new AssertionError("do not instantiate");
+
+ }
+
+ //TODO(htowaileb): Add validation step similar to other externals, which also checks if empty bucket
+ //upon creating the external dataset
+
+ /**
+ * Builds the client using the provided configuration
+ *
+ * @param configuration properties
+ * @return clientasterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+ * @throws CompilationException CompilationException
+ */
+ public static Storage buildClient(Map<String, String> configuration) throws CompilationException {
+ String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
+
+ StorageOptions.Builder builder = StorageOptions.newBuilder();
+
+ // Use credentials if available
+ if (jsonCredentials != null) {
+ try (InputStream credentialsStream = new ByteArrayInputStream(jsonCredentials.getBytes())) {
+ builder.setCredentials(ServiceAccountCredentials.fromStream(credentialsStream));
+ } catch (IOException ex) {
+ throw new CompilationException(EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+ }
+
+ return builder.build().getService();
+ }
+
+ /**
+ * Validate external dataset properties
+ *
+ * @param configuration properties
+ * @throws CompilationException Compilation exception
+ */
+ public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
+ IWarningCollector collector) throws CompilationException {
+
+ // check if the format property is present
+ if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
+ }
+
+ // parquet is not supported for google cloud storage
+ if (isParquetFormat(configuration)) {
+ throw new CompilationException(INVALID_REQ_PARAM_VAL, srcLoc, KEY_FORMAT, configuration.get(KEY_FORMAT));
+ }
+
+ validateIncludeExclude(configuration);
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+ try {
+ Storage.BlobListOption limitOption = Storage.BlobListOption.pageSize(1);
+ Storage.BlobListOption prefixOption = Storage.BlobListOption.prefix(getPrefix(configuration));
+ Storage storage = buildClient(configuration);
+ Page<Blob> items = storage.list(container, limitOption, prefixOption);
+
+ if (!items.iterateAll().iterator().hasNext() && collector.shouldWarn()) {
+ Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+ collector.warn(warning);
+ }
+ } catch (CompilationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
index 90ea04b..91afbd8 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.external.input.record.reader.awss3;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_INTERNAL_ERROR;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_SLOW_DOWN;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_SLOW_DOWN;
import java.lang.reflect.Field;
import java.lang.reflect.Method;