[ASTERIXDB-2858][EXT]: Retry upon failure for S3 retryable errors

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Retry upon failure for S3 retryable errors.

Change-Id: Icec828b119fd959760281c4f4cc49449b779546e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10743
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Till Westmann
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
index 898f828..b37bce7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
@@ -52,9 +52,6 @@
 
         // Finished reading all the files
         if (nextFileIndex >= filePaths.size()) {
-            if (in != null) {
-                CleanupUtils.close(in, null);
-            }
             return false;
         }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 48035f3..4288cc4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.asterix.common.exceptions.CompilationException;
@@ -31,18 +32,22 @@
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.util.LogRedactionUtil;
 
 import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+import software.amazon.awssdk.services.s3.model.S3Exception;
 
 public class AwsS3InputStream extends AbstractExternalInputStream {
 
     private final S3Client s3Client;
     private final String bucket;
+    private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service
 
     public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
         super(configuration, filePaths);
@@ -58,24 +63,75 @@
 
         // Have a reference to the S3 stream to ensure that if GZipInputStream causes an IOException because of reading
         // the header, then the S3 stream gets closed in the close method
-        try {
-            in = s3Client.getObject(getObjectRequest);
-        } catch (NoSuchKeyException ex) {
-            LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(getObjectRequest.key()) + " was not found in bucket "
-                    + getObjectRequest.bucket());
+        if (!doGetInputStream(getObjectRequest)) {
             return false;
-        } catch (SdkException ex) {
-            throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
         }
 
         // Use gzip stream if needed
-        String lowerCaseFileName = fileName.toLowerCase();
-        if (lowerCaseFileName.endsWith(".gz") || lowerCaseFileName.endsWith(".gzip")) {
+        if (StringUtils.endsWithIgnoreCase(fileName, ".gz") || StringUtils.endsWithIgnoreCase(fileName, ".gzip")) {
             in = new GZIPInputStream(in, ExternalDataConstants.DEFAULT_BUFFER_SIZE);
         }
         return true;
     }
 
+    /**
+     * Get the input stream. If an error is encountered, depending on the error code, a retry might be favorable.
+     *
+     * @return true
+     */
+    private boolean doGetInputStream(GetObjectRequest request) throws RuntimeDataException {
+        int retries = 0;
+        while (retries < MAX_RETRIES) {
+            try {
+                in = s3Client.getObject(request);
+                break;
+            } catch (NoSuchKeyException ex) {
+                LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(request.key()) + " was not found in bucket "
+                        + request.bucket());
+                return false;
+            } catch (S3Exception ex) {
+                if (!shouldRetry(ex.awsErrorDetails().errorCode(), retries++)) {
+                    throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+                }
+                LOGGER.debug(() -> "S3 retryable error: " + LogRedactionUtil.userData(ex.getMessage()));
+
+                // Backoff for 1 sec for the first 2 retries, and 2 seconds from there onward
+                try {
+                    Thread.sleep(TimeUnit.SECONDS.toMillis(retries < 3 ? 1 : 2));
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            } catch (SdkException ex) {
+                throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+            }
+        }
+        return true;
+    }
+
+    private boolean shouldRetry(String errorCode, int currentRetry) {
+        return currentRetry < MAX_RETRIES && AwsS3.isRetryableError(errorCode);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (in != null) {
+            CleanupUtils.close(in, null);
+        }
+        if (s3Client != null) {
+            CleanupUtils.close(s3Client, null);
+        }
+    }
+
+    @Override
+    public boolean stop() {
+        try {
+            close();
+        } catch (IOException e) {
+            // Ignore
+        }
+        return false;
+    }
+
     private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException {
         try {
             return ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 747fcca..8197524 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -78,7 +78,7 @@
             // New API is not implemented, try falling back to old API
             try {
                 // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-                if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) {
+                if (ex.awsErrorDetails().errorCode().equals(ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED)) {
                     filesOnly = oldApiListS3Objects(s3Client, container, includeExcludeMatcher);
                 } else {
                     throw ex;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 45d15df..047fe2a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -300,6 +300,15 @@
         public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey";
         public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken";
         public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
+
+        // AWS S3 specific error codes
+        public static final String ERROR_INTERNAL_ERROR = "InternalError";
+        public static final String ERROR_SLOW_DOWN = "SlowDown";
+        public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented";
+
+        public static boolean isRetryableError(String errorCode) {
+            return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
+        }
     }
 
     public static class AzureBlob {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index d1bbe89..2d25d08 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.util;
 
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED;
 import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ACCOUNT_KEY_FIELD_NAME;
 import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ACCOUNT_NAME_FIELD_NAME;
 import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.BLOB_ENDPOINT_FIELD_NAME;
@@ -785,7 +786,7 @@
                 // Method not implemented, try falling back to old API
                 try {
                     // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-                    if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) {
+                    if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
                         useOldApi = true;
                         response = isBucketEmpty(s3Client, container, prefix, true);
                     } else {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
index 9cdeb16..90ea04b 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/AwsS3Test.java
@@ -18,16 +18,26 @@
  */
 package org.apache.asterix.external.input.record.reader.awss3;
 
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_INTERNAL_ERROR;
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_SLOW_DOWN;
+
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStream;
 import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStreamFactory;
+import org.apache.hyracks.api.exceptions.IFormattedException;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
 public class AwsS3Test {
@@ -70,4 +80,66 @@
             Assert.assertEquals(workload.getTotalSize(), 600);
         }
     }
+
+    @Test
+    public void s3InternalError() throws Exception {
+        // S3Client mock
+        S3Client s3ClientMock = Mockito.mock(S3Client.class);
+
+        // Prepare S3Exception with internal error code
+        AwsErrorDetails errorDetails = AwsErrorDetails.builder().errorCode(ERROR_INTERNAL_ERROR)
+                .errorMessage("Internal Error from AWS").build();
+        S3Exception internalErrorEx = (S3Exception) S3Exception.builder().awsErrorDetails(errorDetails).build();
+        Mockito.when(s3ClientMock.getObject(GetObjectRequest.builder().build())).thenThrow(internalErrorEx);
+
+        // Set S3Client mock
+        AwsS3InputStream inputStreamMock = Mockito.mock(AwsS3InputStream.class);
+        Field s3ClientField = AwsS3InputStream.class.getDeclaredField("s3Client");
+        s3ClientField.setAccessible(true);
+        s3ClientField.set(inputStreamMock, s3ClientMock);
+
+        // doGetInputStream method
+        Method doGetInputStreamMethod =
+                AwsS3InputStream.class.getDeclaredMethod("doGetInputStream", GetObjectRequest.class);
+        doGetInputStreamMethod.setAccessible(true);
+
+        try {
+            doGetInputStreamMethod.invoke(inputStreamMock, GetObjectRequest.builder().build());
+        } catch (Exception ex) {
+            Assert.assertTrue("Not internal error", ex.getCause() instanceof IFormattedException
+                    && ex.getCause().toString().contains("ASX1108: External source error. Internal Error from AWS"));
+        }
+    }
+
+    @Test
+    public void s3SlowDown() throws Exception {
+        // S3Client mock
+        S3Client s3ClientMock = Mockito.mock(S3Client.class);
+
+        // Prepare S3Exception with slow down error code
+        AwsErrorDetails errorDetails =
+                AwsErrorDetails.builder().errorCode(ERROR_SLOW_DOWN).errorMessage("SlowDown Error from AWS").build();
+        S3Exception slowDownEx = (S3Exception) S3Exception.builder().awsErrorDetails(errorDetails).build();
+        Mockito.when(s3ClientMock.getObject(GetObjectRequest.builder().build())).thenThrow(slowDownEx);
+
+        // Set S3Client mock
+        AwsS3InputStream inputStreamMock = Mockito.mock(AwsS3InputStream.class);
+
+        // Set S3Client
+        Field s3ClientField = AwsS3InputStream.class.getDeclaredField("s3Client");
+        s3ClientField.setAccessible(true);
+        s3ClientField.set(inputStreamMock, s3ClientMock);
+
+        // doGetInputStream method
+        Method doGetInputStreamMethod =
+                AwsS3InputStream.class.getDeclaredMethod("doGetInputStream", GetObjectRequest.class);
+        doGetInputStreamMethod.setAccessible(true);
+
+        try {
+            doGetInputStreamMethod.invoke(inputStreamMock, GetObjectRequest.builder().build());
+        } catch (Exception ex) {
+            Assert.assertTrue("Not SlowDown error", ex.getCause() instanceof IFormattedException
+                    && ex.getCause().toString().contains("ASX1108: External source error. SlowDown Error from AWS"));
+        }
+    }
 }