diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 8bd7a51..4d5288c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.asterix.common.exceptions.CompilationException;
@@ -41,6 +42,7 @@
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+import software.amazon.awssdk.services.s3.model.S3Exception;
 
 public class AwsS3InputStream extends AbstractMultipleInputStream {
 
@@ -51,6 +53,7 @@
     private final int bufferSize;
 
     private final S3Client s3Client;
+    private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service
 
     // File fields
     private final List<String> filePaths;
@@ -84,17 +87,10 @@
         GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder();
         GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(fileName).build();
 
-        // Have a reference to the S3 stream to ensure that if GZipInputStream causes an IOException because of reading
-        // the header, then the S3 stream gets closed in the close method
-        try {
-            in = s3Client.getObject(getObjectRequest);
-        } catch (NoSuchKeyException ex) {
-            LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(getObjectRequest.key()) + " was not found in bucket "
-                    + getObjectRequest.bucket());
-            nextFileIndex++;
+        boolean isAvailableStream = doGetInputStream(getObjectRequest);
+        nextFileIndex++;
+        if (!isAvailableStream) {
             return advance();
-        } catch (SdkException ex) {
-            throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
         }
 
         // Use gzip stream if needed
@@ -103,13 +99,50 @@
         }
 
         // Current file ready, point to the next file
-        nextFileIndex++;
         if (notificationHandler != null) {
             notificationHandler.notifyNewSource();
         }
         return true;
     }
 
+    /**
+     * Get the input stream. If an error is encountered, depending on the error code, a retry might be favorable.
+     *
+     * @return true
+     */
+    private boolean doGetInputStream(GetObjectRequest request) throws RuntimeDataException {
+        int retries = 0;
+        while (retries < MAX_RETRIES) {
+            try {
+                in = s3Client.getObject(request);
+                break;
+            } catch (NoSuchKeyException ex) {
+                LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(request.key()) + " was not found in bucket "
+                        + request.bucket());
+                return false;
+            } catch (S3Exception ex) {
+                if (!shouldRetry(ex.awsErrorDetails().errorCode(), retries++)) {
+                    throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+                }
+                LOGGER.debug(() -> "S3 retryable error: " + LogRedactionUtil.userData(ex.getMessage()));
+
+                // Backoff for 1 sec for the first 2 retries, and 2 seconds from there onward
+                try {
+                    Thread.sleep(TimeUnit.SECONDS.toMillis(retries < 3 ? 1 : 2));
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            } catch (SdkException ex) {
+                throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+            }
+        }
+        return true;
+    }
+
+    private boolean shouldRetry(String errorCode, int currentRetry) {
+        return currentRetry < MAX_RETRIES && AwsS3.isRetryableError(errorCode);
+    }
+
     private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException {
         try {
             return ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 0bc4c40..715c5df 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -136,7 +136,7 @@
             // New API is not implemented, try falling back to old API
             try {
                 // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-                if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) {
+                if (ex.awsErrorDetails().errorCode().equals(AwsS3.ERROR_METHOD_NOT_IMPLEMENTED)) {
                     filesOnly = oldApiListS3Objects(s3Client, container, matchersList, p);
                 } else {
                     throw ex;
@@ -312,7 +312,7 @@
         return smallest;
     }
 
-    private static class PartitionWorkLoadBasedOnSize implements Serializable {
+    public static class PartitionWorkLoadBasedOnSize implements Serializable {
         private static final long serialVersionUID = 1L;
         private final List<String> filePaths = new ArrayList<>();
         private long totalSize = 0;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 1911083..0f4117b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -291,5 +291,14 @@
         public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey";
         public static final String CONTAINER_NAME_FIELD_NAME = "container";
         public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
+
+        // AWS S3 specific error codes
+        public static final String ERROR_INTERNAL_ERROR = "InternalError";
+        public static final String ERROR_SLOW_DOWN = "SlowDown";
+        public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented";
+
+        public static boolean isRetryableError(String errorCode) {
+            return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
+        }
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 363ec74..a6b8b47 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -710,7 +710,8 @@
                 // Method not implemented, try falling back to old API
                 try {
                     // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
-                    if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) {
+                    if (ex.awsErrorDetails().errorCode()
+                            .equals(ExternalDataConstants.AwsS3.ERROR_METHOD_NOT_IMPLEMENTED)) {
                         useOldApi = true;
                         response = isBucketEmpty(s3Client, container, prefix, true);
                     } else {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/awss3/AwsS3Test.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/awss3/AwsS3Test.java
new file mode 100644
index 0000000..f0a2223
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/awss3/AwsS3Test.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.awss3;
+
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_INTERNAL_ERROR;
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.ERROR_SLOW_DOWN;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStream;
+import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStreamFactory;
+import org.apache.hyracks.api.exceptions.IFormattedException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class AwsS3Test {
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testWorkloadDistribution() throws Exception {
+        AwsS3InputStreamFactory factory = new AwsS3InputStreamFactory();
+
+        List<S3Object> s3Objects = new ArrayList<>();
+        final int partitionsCount = 3;
+
+        // Create S3 objects, 9 objects, on 3 partitions, they should be 600 total size on each partition
+        S3Object.Builder builder = S3Object.builder();
+        s3Objects.add(builder.key("1.json").size(100L).build());
+        s3Objects.add(builder.key("2.json").size(100L).build());
+        s3Objects.add(builder.key("3.json").size(100L).build());
+        s3Objects.add(builder.key("4.json").size(200L).build());
+        s3Objects.add(builder.key("5.json").size(200L).build());
+        s3Objects.add(builder.key("6.json").size(200L).build());
+        s3Objects.add(builder.key("7.json").size(300L).build());
+        s3Objects.add(builder.key("8.json").size(300L).build());
+        s3Objects.add(builder.key("9.json").size(300L).build());
+
+        // invoke the distributeWorkLoad method
+        Method distributeWorkloadMethod =
+                AwsS3InputStreamFactory.class.getDeclaredMethod("distributeWorkLoad", List.class, int.class);
+        distributeWorkloadMethod.setAccessible(true);
+        distributeWorkloadMethod.invoke(factory, s3Objects, partitionsCount);
+
+        // get the partitionWorkLoadsBasedOnSize field and verify the result
+        Field distributeWorkloadField = AwsS3InputStreamFactory.class.getDeclaredField("partitionWorkLoadsBasedOnSize");
+        distributeWorkloadField.setAccessible(true);
+        List<AwsS3InputStreamFactory.PartitionWorkLoadBasedOnSize> workloads =
+                (List<AwsS3InputStreamFactory.PartitionWorkLoadBasedOnSize>) distributeWorkloadField.get(factory);
+
+        for (AwsS3InputStreamFactory.PartitionWorkLoadBasedOnSize workload : workloads) {
+            Assert.assertEquals(workload.getTotalSize(), 600);
+        }
+    }
+
+    @Test
+    public void s3InternalError() throws Exception {
+        // S3Client mock
+        S3Client s3ClientMock = Mockito.mock(S3Client.class);
+
+        // Prepare S3Exception with internal error code
+        AwsErrorDetails errorDetails = AwsErrorDetails.builder().errorCode(ERROR_INTERNAL_ERROR)
+                .errorMessage("Internal Error from AWS").build();
+        S3Exception internalErrorEx = (S3Exception) S3Exception.builder().awsErrorDetails(errorDetails).build();
+        Mockito.when(s3ClientMock.getObject(GetObjectRequest.builder().build())).thenThrow(internalErrorEx);
+
+        // Set S3Client mock
+        AwsS3InputStream inputStreamMock = Mockito.mock(AwsS3InputStream.class);
+        Field s3ClientField = AwsS3InputStream.class.getDeclaredField("s3Client");
+        s3ClientField.setAccessible(true);
+        s3ClientField.set(inputStreamMock, s3ClientMock);
+
+        // doGetInputStream method
+        Method doGetInputStreamMethod =
+                AwsS3InputStream.class.getDeclaredMethod("doGetInputStream", GetObjectRequest.class);
+        doGetInputStreamMethod.setAccessible(true);
+
+        try {
+            doGetInputStreamMethod.invoke(inputStreamMock, GetObjectRequest.builder().build());
+        } catch (Exception ex) {
+            Assert.assertTrue("Not internal error", ex.getCause() instanceof IFormattedException
+                    && ex.getCause().toString().contains("ASX1108: External source error. Internal Error from AWS"));
+        }
+    }
+
+    @Test
+    public void s3SlowDown() throws Exception {
+        // S3Client mock
+        S3Client s3ClientMock = Mockito.mock(S3Client.class);
+
+        // Prepare S3Exception with slow down error code
+        AwsErrorDetails errorDetails =
+                AwsErrorDetails.builder().errorCode(ERROR_SLOW_DOWN).errorMessage("SlowDown Error from AWS").build();
+        S3Exception slowDownEx = (S3Exception) S3Exception.builder().awsErrorDetails(errorDetails).build();
+        Mockito.when(s3ClientMock.getObject(GetObjectRequest.builder().build())).thenThrow(slowDownEx);
+
+        // Set S3Client mock
+        AwsS3InputStream inputStreamMock = Mockito.mock(AwsS3InputStream.class);
+
+        // Set S3Client
+        Field s3ClientField = AwsS3InputStream.class.getDeclaredField("s3Client");
+        s3ClientField.setAccessible(true);
+        s3ClientField.set(inputStreamMock, s3ClientMock);
+
+        // doGetInputStream method
+        Method doGetInputStreamMethod =
+                AwsS3InputStream.class.getDeclaredMethod("doGetInputStream", GetObjectRequest.class);
+        doGetInputStreamMethod.setAccessible(true);
+
+        try {
+            doGetInputStreamMethod.invoke(inputStreamMock, GetObjectRequest.builder().build());
+        } catch (Exception ex) {
+            Assert.assertTrue("Not SlowDown error", ex.getCause() instanceof IFormattedException
+                    && ex.getCause().toString().contains("ASX1108: External source error. SlowDown Error from AWS"));
+        }
+    }
+}
