[ASTERIXDB-2858][EXT]: Retry upon failure for S3 retryable errors
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Retry upon failure for S3 retryable errors.
Change-Id: Icec828b119fd959760281c4f4cc49449b779546e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10743
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Till Westmann
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
index 898f828..b37bce7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
@@ -52,9 +52,6 @@
// Finished reading all the files
if (nextFileIndex >= filePaths.size()) {
- if (in != null) {
- CleanupUtils.close(in, null);
- }
return false;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 48035f3..4288cc4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import org.apache.asterix.common.exceptions.CompilationException;
@@ -31,18 +32,22 @@
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.util.LogRedactionUtil;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+import software.amazon.awssdk.services.s3.model.S3Exception;
public class AwsS3InputStream extends AbstractExternalInputStream {
private final S3Client s3Client;
private final String bucket;
+ private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service
public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
super(configuration, filePaths);
@@ -58,24 +63,75 @@
// Have a reference to the S3 stream to ensure that if GZipInputStream causes an IOException because of reading
// the header, then the S3 stream gets closed in the close method
- try {
- in = s3Client.getObject(getObjectRequest);
- } catch (NoSuchKeyException ex) {
- LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(getObjectRequest.key()) + " was not found in bucket "
- + getObjectRequest.bucket());
+ if (!doGetInputStream(getObjectRequest)) {
return false;
- } catch (SdkException ex) {
- throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
}
// Use gzip stream if needed
- String lowerCaseFileName = fileName.toLowerCase();
- if (lowerCaseFileName.endsWith(".gz") || lowerCaseFileName.endsWith(".gzip")) {
+ if (StringUtils.endsWithIgnoreCase(fileName, ".gz") || StringUtils.endsWithIgnoreCase(fileName, ".gzip")) {
in = new GZIPInputStream(in, ExternalDataConstants.DEFAULT_BUFFER_SIZE);
}
return true;
}
+ /**
+ * Get the input stream. If an error is encountered, depending on the error code, a retry might be favorable.
+ *
+ * @return true
+ */
+ private boolean doGetInputStream(GetObjectRequest request) throws RuntimeDataException {
+ int retries = 0;
+ while (retries < MAX_RETRIES) {
+ try {
+ in = s3Client.getObject(request);
+ break;
+ } catch (NoSuchKeyException ex) {
+ LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(request.key()) + " was not found in bucket "
+ + request.bucket());
+ return false;
+ } catch (S3Exception ex) {
+ if (!shouldRetry(ex.awsErrorDetails().errorCode(), retries++)) {
+ throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ }
+ LOGGER.debug(() -> "S3 retryable error: " + LogRedactionUtil.userData(ex.getMessage()));
+
+ // Backoff for 1 sec for the first 2 retries, and 2 seconds from there onward
+ try {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(retries < 3 ? 1 : 2));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ } catch (SdkException ex) {
+ throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ }
+ }
+ return true;
+ }
+
+ private boolean shouldRetry(String errorCode, int currentRetry) {
+ return currentRetry < MAX_RETRIES && AwsS3.isRetryableError(errorCode);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ CleanupUtils.close(in, null);
+ }
+ if (s3Client != null) {
+ CleanupUtils.close(s3Client, null);
+ }
+ }
+
+ @Override
+ public boolean stop() {
+ try {
+ close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ return false;
+ }
+
private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException {
try {
return ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 747fcca..8197524 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -78,7 +78,7 @@
// New API is not implemented, try falling back to old API
try {
// For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
- if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) {
+ if (ex.awsErrorDetails().errorCode().equals(ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED)) {
filesOnly = oldApiListS3Objects(s3Client, container, includeExcludeMatcher);
} else {
throw ex;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 45d15df..047fe2a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -300,6 +300,15 @@
public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey";
public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken";
public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
+
+ // AWS S3 specific error codes
+ public static final String ERROR_INTERNAL_ERROR = "InternalError";
+ public static final String ERROR_SLOW_DOWN = "SlowDown";
+ public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented";
+
+ public static boolean isRetryableError(String errorCode) {
+ return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
+ }
}
public static class AzureBlob {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index d1bbe89..2d25d08 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.external.util;
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED;
import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ACCOUNT_KEY_FIELD_NAME;
import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ACCOUNT_NAME_FIELD_NAME;
import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.BLOB_ENDPOINT_FIELD_NAME;
@@ -785,7 +786,7 @@
// Method not implemented, try falling back to old API
try {
// For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
- if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) {
+ if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
useOldApi = true;
response = isBucketEmpty(s3Client, container, prefix, true);
} else {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
index 9cdeb16..90ea04b 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
@@ -18,16 +18,26 @@
*/
package org.apache.asterix.external.input.record.reader.awss3;
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_INTERNAL_ERROR;
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_SLOW_DOWN;
+
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStream;
import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStreamFactory;
+import org.apache.hyracks.api.exceptions.IFormattedException;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
public class AwsS3Test {
@@ -70,4 +80,66 @@
Assert.assertEquals(workload.getTotalSize(), 600);
}
}
+
+ @Test
+ public void s3InternalError() throws Exception {
+ // S3Client mock
+ S3Client s3ClientMock = Mockito.mock(S3Client.class);
+
+ // Prepare S3Exception with internal error code
+ AwsErrorDetails errorDetails = AwsErrorDetails.builder().errorCode(ERROR_INTERNAL_ERROR)
+ .errorMessage("Internal Error from AWS").build();
+ S3Exception internalErrorEx = (S3Exception) S3Exception.builder().awsErrorDetails(errorDetails).build();
+ Mockito.when(s3ClientMock.getObject(GetObjectRequest.builder().build())).thenThrow(internalErrorEx);
+
+ // Set S3Client mock
+ AwsS3InputStream inputStreamMock = Mockito.mock(AwsS3InputStream.class);
+ Field s3ClientField = AwsS3InputStream.class.getDeclaredField("s3Client");
+ s3ClientField.setAccessible(true);
+ s3ClientField.set(inputStreamMock, s3ClientMock);
+
+ // doGetInputStream method
+ Method doGetInputStreamMethod =
+ AwsS3InputStream.class.getDeclaredMethod("doGetInputStream", GetObjectRequest.class);
+ doGetInputStreamMethod.setAccessible(true);
+
+ try {
+ doGetInputStreamMethod.invoke(inputStreamMock, GetObjectRequest.builder().build());
+ } catch (Exception ex) {
+ Assert.assertTrue("Not internal error", ex.getCause() instanceof IFormattedException
+ && ex.getCause().toString().contains("ASX1108: External source error. Internal Error from AWS"));
+ }
+ }
+
+ @Test
+ public void s3SlowDown() throws Exception {
+ // S3Client mock
+ S3Client s3ClientMock = Mockito.mock(S3Client.class);
+
+ // Prepare S3Exception with slow down error code
+ AwsErrorDetails errorDetails =
+ AwsErrorDetails.builder().errorCode(ERROR_SLOW_DOWN).errorMessage("SlowDown Error from AWS").build();
+ S3Exception slowDownEx = (S3Exception) S3Exception.builder().awsErrorDetails(errorDetails).build();
+ Mockito.when(s3ClientMock.getObject(GetObjectRequest.builder().build())).thenThrow(slowDownEx);
+
+ // Set S3Client mock
+ AwsS3InputStream inputStreamMock = Mockito.mock(AwsS3InputStream.class);
+
+ // Set S3Client
+ Field s3ClientField = AwsS3InputStream.class.getDeclaredField("s3Client");
+ s3ClientField.setAccessible(true);
+ s3ClientField.set(inputStreamMock, s3ClientMock);
+
+ // doGetInputStream method
+ Method doGetInputStreamMethod =
+ AwsS3InputStream.class.getDeclaredMethod("doGetInputStream", GetObjectRequest.class);
+ doGetInputStreamMethod.setAccessible(true);
+
+ try {
+ doGetInputStreamMethod.invoke(inputStreamMock, GetObjectRequest.builder().build());
+ } catch (Exception ex) {
+ Assert.assertTrue("Not SlowDown error", ex.getCause() instanceof IFormattedException
+ && ex.getCause().toString().contains("ASX1108: External source error. SlowDown Error from AWS"));
+ }
+ }
}