[ASTERIXDB-2827][EXT]: S3 external dataset: properly fallback to old API
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Properly fallback to old API if the new API is not supported.
if the old API fails as well, then report the error properly.
Change-Id: Ib453eb396def92218951b9e45a89b6c0f48a54f6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9844
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/NOTICE b/asterixdb/NOTICE
index b4729a8..4aabe27 100644
--- a/asterixdb/NOTICE
+++ b/asterixdb/NOTICE
@@ -1,5 +1,5 @@
Apache AsterixDB
-Copyright 2015-2020 The Apache Software Foundation
+Copyright 2015-2021 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 8270d71..169bcb6 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -437,6 +437,10 @@
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
+ <artifactId>aws-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
<artifactId>http-client-spi</artifactId>
</dependency>
<dependency>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index f3a36ff..0bc4c40 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -50,8 +50,11 @@
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
public class AwsS3InputStreamFactory implements IInputStreamFactory {
@@ -88,10 +91,6 @@
this.configuration = configuration;
ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext();
- String container = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
-
- List<S3Object> filesOnly = new ArrayList<>();
-
// Ensure the validity of include/exclude
ExternalDataUtils.AwsS3.validateIncludeExclude(configuration);
@@ -126,35 +125,24 @@
p = (matchers, key) -> true;
}
+ // Get all objects in a bucket and extract the paths to files
+ List<S3Object> filesOnly;
+ String container = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
S3Client s3Client = ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
- // Get all objects in a bucket and extract the paths to files
- ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container);
- ExternalDataUtils.AwsS3.setPrefix(configuration, listObjectsBuilder);
-
- ListObjectsV2Response listObjectsResponse;
- boolean done = false;
- String newMarker = null;
-
try {
- while (!done) {
- // List the objects from the start, or from the last marker in case of truncated result
- if (newMarker == null) {
- listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build());
+ filesOnly = listS3Objects(s3Client, container, matchersList, p);
+ } catch (S3Exception ex) {
+ // New API is not implemented, try falling back to old API
+ try {
+ // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
+ if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) {
+ filesOnly = oldApiListS3Objects(s3Client, container, matchersList, p);
} else {
- listObjectsResponse =
- s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
+ throw ex;
}
-
- // Collect the paths to files only
- collectAndFilterFiles(listObjectsResponse.contents(), p, matchersList, filesOnly);
-
- // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
- if (!listObjectsResponse.isTruncated()) {
- done = true;
- } else {
- newMarker = listObjectsResponse.nextContinuationToken();
- }
+ } catch (SdkException ex2) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2.getMessage());
}
} catch (SdkException ex) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
@@ -179,6 +167,84 @@
}
/**
+ * Uses the latest API to retrieve the objects from the storage.
+ *
+ * @param s3Client S3 client
+ * @param container container name
+ * @param matchersList include/exclude matchers to apply
+ * @param predicate predicate to use for comparison
+ */
+ private List<S3Object> listS3Objects(S3Client s3Client, String container, List<Matcher> matchersList,
+ BiPredicate<List<Matcher>, String> predicate) {
+ String newMarker = null;
+ List<S3Object> filesOnly = new ArrayList<>();
+
+ ListObjectsV2Response listObjectsResponse;
+ ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container);
+ listObjectsBuilder.prefix(ExternalDataUtils.getPrefix(configuration));
+
+ while (true) {
+ // List the objects from the start, or from the last marker in case of truncated result
+ if (newMarker == null) {
+ listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build());
+ } else {
+ listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
+ }
+
+ // Collect the paths to files only
+ collectAndFilterFiles(listObjectsResponse.contents(), predicate, matchersList, filesOnly);
+
+ // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
+ if (!listObjectsResponse.isTruncated()) {
+ break;
+ } else {
+ newMarker = listObjectsResponse.nextContinuationToken();
+ }
+ }
+
+ return filesOnly;
+ }
+
+ /**
+ * Uses the old API (in case the new API is not implemented) to retrieve the objects from the storage
+ *
+ * @param s3Client S3 client
+ * @param container container name
+ * @param matchersList include/exclude matchers to apply
+ * @param predicate predicate to use for comparison
+ */
+ private List<S3Object> oldApiListS3Objects(S3Client s3Client, String container, List<Matcher> matchersList,
+ BiPredicate<List<Matcher>, String> predicate) {
+ String newMarker = null;
+ List<S3Object> filesOnly = new ArrayList<>();
+
+ ListObjectsResponse listObjectsResponse;
+ ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container);
+ listObjectsBuilder.prefix(ExternalDataUtils.getPrefix(configuration));
+
+ while (true) {
+ // List the objects from the start, or from the last marker in case of truncated result
+ if (newMarker == null) {
+ listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build());
+ } else {
+ listObjectsResponse = s3Client.listObjects(listObjectsBuilder.marker(newMarker).build());
+ }
+
+ // Collect the paths to files only
+ collectAndFilterFiles(listObjectsResponse.contents(), predicate, matchersList, filesOnly);
+
+ // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
+ if (!listObjectsResponse.isTruncated()) {
+ break;
+ } else {
+ newMarker = listObjectsResponse.nextMarker();
+ }
+ }
+
+ return filesOnly;
+ }
+
+ /**
* AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered
* a file if it does not end up with a "/" which is the separator in a folder structure.
*
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 252ed5b..53306c5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -278,6 +278,8 @@
public static final String EMPTY_FIELD = "empty value";
public static final String INVALID_VAL = "invalid value";
+ public static final String DEFINITION_FIELD_NAME = "definition";
+
public static class AwsS3 {
private AwsS3() {
throw new AssertionError("do not instantiate");
@@ -287,7 +289,6 @@
public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId";
public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey";
public static final String CONTAINER_NAME_FIELD_NAME = "container";
- public static final String DEFINITION_FIELD_NAME = "definition";
public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index fc31286..8206d4c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -69,8 +69,12 @@
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Response;
public class ExternalDataUtils {
@@ -473,7 +477,7 @@
switch (type) {
case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
- ExternalDataUtils.AwsS3.validateProperties(configuration, srcLoc, collector);
+ AwsS3.validateProperties(configuration, srcLoc, collector);
break;
default:
// Nothing needs to be done
@@ -587,6 +591,19 @@
return result.toString();
}
+ /**
+ * Adjusts the prefix (if needed) and returns it
+ *
+ * @param configuration configuration
+ */
+ public static String getPrefix(Map<String, String> configuration) {
+ String definition = configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+ if (definition != null && !definition.isEmpty()) {
+ return definition + (!definition.endsWith("/") ? "/" : "");
+ }
+ return "";
+ }
+
public static class AwsS3 {
private AwsS3() {
throw new AssertionError("do not instantiate");
@@ -642,23 +659,9 @@
}
/**
- * Sets the prefix for the list objects builder if it is available
- *
- * @param configuration configuration
- * @param builder builder
- */
- public static void setPrefix(Map<String, String> configuration, ListObjectsV2Request.Builder builder) {
- String definition = configuration.get(ExternalDataConstants.AwsS3.DEFINITION_FIELD_NAME);
- if (definition != null) {
- builder.prefix(definition + (!definition.isEmpty() && !definition.endsWith("/") ? "/" : ""));
- }
- }
-
- /**
* Validate external dataset properties
*
* @param configuration properties
- *
* @throws CompilationException Compilation exception
*/
public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
@@ -672,26 +675,26 @@
validateIncludeExclude(configuration);
// Check if the bucket is present
- S3Client s3Client = null;
+ S3Client s3Client = buildAwsS3Client(configuration);;
+ S3Response response;
+ boolean useOldApi = false;
+ String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
+ String prefix = getPrefix(configuration);
+
try {
- String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
- s3Client = buildAwsS3Client(configuration);
- ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder();
- setPrefix(configuration, listObjectsBuilder);
-
- ListObjectsV2Response response =
- s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build());
-
- if (response.contents().isEmpty() && collector.shouldWarn()) {
- Warning warning =
- WarningUtil.forAsterix(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
- collector.warn(warning);
- }
-
- // Returns 200 only in case the bucket exists, however, otherwise, throws an exception. However, to
- // ensure coverage, check if the result is successful as well and not only catch exceptions
- if (!response.sdkHttpResponse().isSuccessful()) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
+ response = isBucketEmpty(s3Client, container, prefix, false);
+ } catch (S3Exception ex) {
+ // Method not implemented, try falling back to old API
+ try {
+ // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
+ if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) {
+ useOldApi = true;
+ response = isBucketEmpty(s3Client, container, prefix, true);
+ } else {
+ throw ex;
+ }
+ } catch (SdkException ex2) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2.getMessage());
}
} catch (SdkException ex) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
@@ -700,6 +703,44 @@
CleanupUtils.close(s3Client, null);
}
}
+
+ boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty()
+ : ((ListObjectsV2Response) response).contents().isEmpty();
+ if (isEmpty && collector.shouldWarn()) {
+ Warning warning =
+ WarningUtil.forAsterix(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+ collector.warn(warning);
+ }
+
+ // Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to
+ // ensure coverage, check if the result is successful as well and not only catch exceptions
+ if (!response.sdkHttpResponse().isSuccessful()) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
+ }
+ }
+
+ /**
+ * Checks for a single object in the specified bucket to determine if the bucket is empty or not.
+ *
+ * @param s3Client s3 client
+ * @param container the container name
+ * @param prefix Prefix to be used
+ * @param useOldApi flag whether to use the old API or not
+ *
+ * @return returns the S3 response
+ */
+ private static S3Response isBucketEmpty(S3Client s3Client, String container, String prefix, boolean useOldApi) {
+ S3Response response;
+ if (useOldApi) {
+ ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder();
+ listObjectsBuilder.prefix(prefix);
+ response = s3Client.listObjects(listObjectsBuilder.bucket(container).maxKeys(1).build());
+ } else {
+ ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder();
+ listObjectsBuilder.prefix(prefix);
+ response = s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build());
+ }
+ return response;
}
/**
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 72a658e..5b7e6c9 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -1424,6 +1424,11 @@
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
+ <artifactId>aws-core</artifactId>
+ <version>${awsjavasdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
<version>${awsjavasdk.version}</version>
<exclusions>
diff --git a/hyracks-fullstack/NOTICE b/hyracks-fullstack/NOTICE
index 95fe98a..57c58439 100644
--- a/hyracks-fullstack/NOTICE
+++ b/hyracks-fullstack/NOTICE
@@ -1,5 +1,5 @@
Apache Hyracks and Algebricks
-Copyright 2015-2020 The Apache Software Foundation
+Copyright 2015-2021 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).