[NO ISSUE][EXT]: Retry upon failure for S3 retryable errors
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Retry upon failure for S3 retryable errors.
Change-Id: I639fd7d43c2a6c28b3cc4247bf9ac5d3a23a387e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11883
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 8bd7a51..4d5288c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import org.apache.asterix.common.exceptions.CompilationException;
@@ -41,6 +42,7 @@
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+import software.amazon.awssdk.services.s3.model.S3Exception;
public class AwsS3InputStream extends AbstractMultipleInputStream {
@@ -51,6 +53,7 @@
private final int bufferSize;
private final S3Client s3Client;
+ private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service
// File fields
private final List<String> filePaths;
@@ -84,17 +87,10 @@
GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder();
GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(fileName).build();
- // Have a reference to the S3 stream to ensure that if GZipInputStream causes an IOException because of reading
- // the header, then the S3 stream gets closed in the close method
- try {
- in = s3Client.getObject(getObjectRequest);
- } catch (NoSuchKeyException ex) {
- LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(getObjectRequest.key()) + " was not found in bucket "
- + getObjectRequest.bucket());
- nextFileIndex++;
+ boolean isAvailableStream = doGetInputStream(getObjectRequest);
+ nextFileIndex++;
+ if (!isAvailableStream) {
return advance();
- } catch (SdkException ex) {
- throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
}
// Use gzip stream if needed
@@ -103,13 +99,50 @@
}
// Current file ready, point to the next file
- nextFileIndex++;
if (notificationHandler != null) {
notificationHandler.notifyNewSource();
}
return true;
}
+ /**
+ * Get the input stream. If an error is encountered, depending on the error code, a retry might be favorable.
+ *
+ * @return true
+ */
+ private boolean doGetInputStream(GetObjectRequest request) throws RuntimeDataException {
+ int retries = 0;
+ while (retries < MAX_RETRIES) {
+ try {
+ in = s3Client.getObject(request);
+ break;
+ } catch (NoSuchKeyException ex) {
+ LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(request.key()) + " was not found in bucket "
+ + request.bucket());
+ return false;
+ } catch (S3Exception ex) {
+ if (!shouldRetry(ex.awsErrorDetails().errorCode(), retries++)) {
+ throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ }
+ LOGGER.debug(() -> "S3 retryable error: " + LogRedactionUtil.userData(ex.getMessage()));
+
+ // Backoff for 1 sec for the first 2 retries, and 2 seconds from there onward
+ try {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(retries < 3 ? 1 : 2));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ } catch (SdkException ex) {
+ throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ }
+ }
+ return true;
+ }
+
+ private boolean shouldRetry(String errorCode, int currentRetry) {
+ return currentRetry < MAX_RETRIES && AwsS3.isRetryableError(errorCode);
+ }
+
private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException {
try {
return ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 0bc4c40..715c5df 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -136,7 +136,7 @@
// New API is not implemented, try falling back to old API
try {
// For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
- if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) {
+ if (ex.awsErrorDetails().errorCode().equals(AwsS3.ERROR_METHOD_NOT_IMPLEMENTED)) {
filesOnly = oldApiListS3Objects(s3Client, container, matchersList, p);
} else {
throw ex;
@@ -312,7 +312,7 @@
return smallest;
}
- private static class PartitionWorkLoadBasedOnSize implements Serializable {
+ public static class PartitionWorkLoadBasedOnSize implements Serializable {
private static final long serialVersionUID = 1L;
private final List<String> filePaths = new ArrayList<>();
private long totalSize = 0;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 1911083..0f4117b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -291,5 +291,14 @@
public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey";
public static final String CONTAINER_NAME_FIELD_NAME = "container";
public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
+
+ // AWS S3 specific error codes
+ public static final String ERROR_INTERNAL_ERROR = "InternalError";
+ public static final String ERROR_SLOW_DOWN = "SlowDown";
+ public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented";
+
+ public static boolean isRetryableError(String errorCode) {
+ return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
+ }
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 363ec74..a6b8b47 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -710,7 +710,8 @@
// Method not implemented, try falling back to old API
try {
// For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
- if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) {
+ if (ex.awsErrorDetails().errorCode()
+ .equals(ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED)) {
useOldApi = true;
response = isBucketEmpty(s3Client, container, prefix, true);
} else {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/awss3/AwsS3Test.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/awss3/AwsS3Test.java
new file mode 100644
index 0000000..f0a2223
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/awss3/AwsS3Test.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.awss3;
+
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_INTERNAL_ERROR;
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_SLOW_DOWN;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStream;
+import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStreamFactory;
+import org.apache.hyracks.api.exceptions.IFormattedException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class AwsS3Test {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testWorkloadDistribution() throws Exception {
+ AwsS3InputStreamFactory factory = new AwsS3InputStreamFactory();
+
+ List<S3Object> s3Objects = new ArrayList<>();
+ final int partitionsCount = 3;
+
+ // Create S3 objects, 9 objects, on 3 partitions, they should be 600 total size on each partition
+ S3Object.Builder builder = S3Object.builder();
+ s3Objects.add(builder.key("1.json").size(100L).build());
+ s3Objects.add(builder.key("2.json").size(100L).build());
+ s3Objects.add(builder.key("3.json").size(100L).build());
+ s3Objects.add(builder.key("4.json").size(200L).build());
+ s3Objects.add(builder.key("5.json").size(200L).build());
+ s3Objects.add(builder.key("6.json").size(200L).build());
+ s3Objects.add(builder.key("7.json").size(300L).build());
+ s3Objects.add(builder.key("8.json").size(300L).build());
+ s3Objects.add(builder.key("9.json").size(300L).build());
+
+ // invoke the distributeWorkLoad method
+ Method distributeWorkloadMethod =
+ AwsS3InputStreamFactory.class.getDeclaredMethod("distributeWorkLoad", List.class, int.class);
+ distributeWorkloadMethod.setAccessible(true);
+ distributeWorkloadMethod.invoke(factory, s3Objects, partitionsCount);
+
+ // get the partitionWorkLoadsBasedOnSize field and verify the result
+ Field distributeWorkloadField = AwsS3InputStreamFactory.class.getDeclaredField("partitionWorkLoadsBasedOnSize");
+ distributeWorkloadField.setAccessible(true);
+ List<AwsS3InputStreamFactory.PartitionWorkLoadBasedOnSize> workloads =
+ (List<AwsS3InputStreamFactory.PartitionWorkLoadBasedOnSize>) distributeWorkloadField.get(factory);
+
+ for (AwsS3InputStreamFactory.PartitionWorkLoadBasedOnSize workload : workloads) {
+ Assert.assertEquals(workload.getTotalSize(), 600);
+ }
+ }
+
+ @Test
+ public void s3InternalError() throws Exception {
+ // S3Client mock
+ S3Client s3ClientMock = Mockito.mock(S3Client.class);
+
+ // Prepare S3Exception with internal error code
+ AwsErrorDetails errorDetails = AwsErrorDetails.builder().errorCode(ERROR_INTERNAL_ERROR)
+ .errorMessage("Internal Error from AWS").build();
+ S3Exception internalErrorEx = (S3Exception) S3Exception.builder().awsErrorDetails(errorDetails).build();
+ Mockito.when(s3ClientMock.getObject(GetObjectRequest.builder().build())).thenThrow(internalErrorEx);
+
+ // Set S3Client mock
+ AwsS3InputStream inputStreamMock = Mockito.mock(AwsS3InputStream.class);
+ Field s3ClientField = AwsS3InputStream.class.getDeclaredField("s3Client");
+ s3ClientField.setAccessible(true);
+ s3ClientField.set(inputStreamMock, s3ClientMock);
+
+ // doGetInputStream method
+ Method doGetInputStreamMethod =
+ AwsS3InputStream.class.getDeclaredMethod("doGetInputStream", GetObjectRequest.class);
+ doGetInputStreamMethod.setAccessible(true);
+
+ try {
+ doGetInputStreamMethod.invoke(inputStreamMock, GetObjectRequest.builder().build());
+ } catch (Exception ex) {
+ Assert.assertTrue("Not internal error", ex.getCause() instanceof IFormattedException
+ && ex.getCause().toString().contains("ASX1108: External source error. Internal Error from AWS"));
+ }
+ }
+
+ @Test
+ public void s3SlowDown() throws Exception {
+ // S3Client mock
+ S3Client s3ClientMock = Mockito.mock(S3Client.class);
+
+ // Prepare S3Exception with slow down error code
+ AwsErrorDetails errorDetails =
+ AwsErrorDetails.builder().errorCode(ERROR_SLOW_DOWN).errorMessage("SlowDown Error from AWS").build();
+ S3Exception slowDownEx = (S3Exception) S3Exception.builder().awsErrorDetails(errorDetails).build();
+ Mockito.when(s3ClientMock.getObject(GetObjectRequest.builder().build())).thenThrow(slowDownEx);
+
+ // Set S3Client mock
+ AwsS3InputStream inputStreamMock = Mockito.mock(AwsS3InputStream.class);
+
+ // Set S3Client
+ Field s3ClientField = AwsS3InputStream.class.getDeclaredField("s3Client");
+ s3ClientField.setAccessible(true);
+ s3ClientField.set(inputStreamMock, s3ClientMock);
+
+ // doGetInputStream method
+ Method doGetInputStreamMethod =
+ AwsS3InputStream.class.getDeclaredMethod("doGetInputStream", GetObjectRequest.class);
+ doGetInputStreamMethod.setAccessible(true);
+
+ try {
+ doGetInputStreamMethod.invoke(inputStreamMock, GetObjectRequest.builder().build());
+ } catch (Exception ex) {
+ Assert.assertTrue("Not SlowDown error", ex.getCause() instanceof IFormattedException
+ && ex.getCause().toString().contains("ASX1108: External source error. SlowDown Error from AWS"));
+ }
+ }
+}