[ASTERIXDB-3616][EXT]: Add retry logic for external cloud writers

Ext-ref: MB-66723
Change-Id: Ie9fcea374a4a04286d82e9323c5d677f975707fc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19825
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index d332944..0baebc7 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -23,11 +23,13 @@
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Set;
+import java.util.function.Predicate;
 
 import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.cloud.io.request.ICloudRetryPredicate;
 import org.apache.hyracks.control.nc.io.IOManager;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -168,4 +170,10 @@
      * Performs any necessary closing and cleaning up
      */
     void close() throws HyracksDataException;
+
+    Predicate<Exception> getObjectNotFoundExceptionPredicate();
+
+    default ICloudRetryPredicate getRetryUnlessNotFound() {
+        return ex -> Predicate.not(getObjectNotFoundExceptionPredicate()).test(ex);
+    }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
index ccd6da1..30148f4 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
@@ -26,6 +26,7 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 
 import org.apache.asterix.cloud.CloudResettableInputStream;
 import org.apache.asterix.cloud.IWriteBufferProvider;
@@ -143,6 +144,11 @@
         cloudClient.close();
     }
 
+    @Override
+    public Predicate<Exception> getObjectNotFoundExceptionPredicate() {
+        return cloudClient.getObjectNotFoundExceptionPredicate();
+    }
+
     private static void fail() throws HyracksDataException {
         double prob = RANDOM.nextInt(100) / 100.0d;
         if (prob < ERROR_RATE.get()) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 01a8b02..1195b8c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -34,6 +34,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Predicate;
 
 import org.apache.asterix.cloud.CloudResettableInputStream;
 import org.apache.asterix.cloud.IWriteBufferProvider;
@@ -315,6 +316,11 @@
         s3Client.close();
     }
 
+    @Override
+    public Predicate<Exception> getObjectNotFoundExceptionPredicate() {
+        return ex -> ex instanceof NoSuchKeyException;
+    }
+
     /**
      * FOR TESTING ONLY
      */
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
index 84d68e3..91a4fba 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
@@ -34,6 +34,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -390,6 +391,11 @@
         // Hence this implementation is a no op.
     }
 
+    @Override
+    public Predicate<Exception> getObjectNotFoundExceptionPredicate() {
+        return ex -> (ex instanceof BlobStorageException bse) && bse.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND);
+    }
+
     private static BlobContainerClient buildClient(AzBlobStorageClientConfig config) {
         BlobContainerClientBuilder blobContainerClientBuilder =
                 new BlobContainerClientBuilder().containerName(config.getBucket()).endpoint(getEndpoint(config));
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 62c1835..bd0a044 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -32,6 +32,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Predicate;
 
 import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.cloud.clients.CloudFile;
@@ -301,6 +302,11 @@
         }
     }
 
+    @Override
+    public Predicate<Exception> getObjectNotFoundExceptionPredicate() {
+        return ex -> (ex instanceof BaseServiceException bse)&& bse.getCode() == 404;
+    }
+
     private static Storage buildClient(GCSClientConfig config) throws HyracksDataException {
         StorageOptions.Builder builder = StorageOptions.newBuilder().setCredentials(config.createCredentialsProvider());
         builder.setStorageRetryStrategy(DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
index d02b34e..f93ae92 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.cloud.writer;
 
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+import static org.apache.hyracks.cloud.util.CloudRetryableRequestUtil.runWithNoRetryOnInterruption;
 
 import org.apache.asterix.cloud.CloudOutputStream;
 import org.apache.asterix.cloud.IWriteBufferProvider;
@@ -33,6 +34,7 @@
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
 import org.apache.hyracks.data.std.api.IValueReference;
 
 import com.google.common.base.Utf8;
@@ -60,7 +62,7 @@
 
     @Override
     public final void open() throws HyracksDataException {
-        printer.open();
+        runWithRetryIfSdkException(printer::open);
     }
 
     @Override
@@ -69,7 +71,8 @@
             return;
         }
 
-        if (partitionedPath && !cloudClient.isEmptyPrefix(bucket, directory)) {
+        boolean emptyPrefix = runWithNoRetryOnInterruption(() -> cloudClient.isEmptyPrefix(bucket, directory));
+        if (partitionedPath && !emptyPrefix) {
             throw new RuntimeDataException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, directory);
         }
     }
@@ -91,7 +94,7 @@
     @Override
     public final void write(IValueReference value) throws HyracksDataException {
         try {
-            printer.print(value);
+            runWithRetryIfSdkException(() -> printer.print(value));
         } catch (HyracksDataException e) {
             throw e;
         } catch (Exception e) {
@@ -106,9 +109,9 @@
     public final void abort() throws HyracksDataException {
         try {
             if (cloudWriter != null) {
-                cloudWriter.abort();
+                runWithRetryIfSdkException(cloudWriter::abort);
             }
-            printer.close();
+            runWithRetryIfSdkException(printer::close);
         } catch (HyracksDataException e) {
             throw e;
         } catch (Exception e) {
@@ -122,7 +125,7 @@
     @Override
     public final void close() throws HyracksDataException {
         try {
-            printer.close();
+            runWithRetryIfSdkException(printer::close);
         } catch (HyracksDataException e) {
             throw e;
         } catch (Exception e) {
@@ -139,6 +142,10 @@
 
     abstract boolean isSdkException(Exception e);
 
+    private void runWithRetryIfSdkException(ICloudRequest request) throws HyracksDataException {
+        runWithNoRetryOnInterruption(request, this::isSdkException);
+    }
+
     private boolean checkAndWarnExceedingMaxLength(String fullPath) {
         boolean exceeding = isExceedingMaxLength(fullPath, getPathMaxLengthInBytes());
         if (exceeding && warningCollector.shouldWarn()) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
index 1de64d0..2007e54 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java
@@ -20,6 +20,7 @@
 
 import static org.apache.asterix.cloud.writer.AbstractCloudExternalFileWriter.isExceedingMaxLength;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+import static org.apache.hyracks.cloud.util.CloudRetryableRequestUtil.runWithNoRetryOnInterruption;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -40,6 +41,7 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.cloud.io.request.ICloudReturnableRequest;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -119,7 +121,7 @@
                         ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
             }
 
-            if (!testClient.isEmptyPrefix(bucket, staticPath)) {
+            if (!runWithNoRetryOnInterruption(() -> testClient.isEmptyPrefix(bucket, staticPath))) {
                 // Ensure that the static path is empty
                 throw new CompilationException(ErrorCode.DIRECTORY_IS_NOT_EMPTY, pathSourceLocation, staticPath);
             }
@@ -135,10 +137,16 @@
         Random random = new Random();
         String pathPrefix = "testFile";
         String path = pathPrefix + random.nextInt();
-        while (testClient.exists(bucket, path)) {
+
+        String existsFinalPath = path;
+        ICloudReturnableRequest<Boolean> existsRequest = () -> testClient.exists(bucket, existsFinalPath);
+        while (runWithNoRetryOnInterruption(existsRequest, testClient.getRetryUnlessNotFound())) {
             path = pathPrefix + random.nextInt();
+            String existsFinalPathUpdated = path;
+            existsRequest = () -> testClient.exists(bucket, existsFinalPathUpdated);
         }
 
+        final String finalPath = path;
         long writeValue = random.nextLong();
         byte[] data = new byte[Long.BYTES];
         LongPointable.setLong(data, 0, writeValue);
@@ -146,28 +154,29 @@
         ICloudWriter writer = testClient.createWriter(bucket, path, bufferProvider);
         boolean aborted = false;
         try {
-            writer.write(data, 0, data.length);
+            runWithNoRetryOnInterruption(() -> writer.write(data, 0, data.length));
         } catch (HyracksDataException e) {
-            writer.abort();
+            runWithNoRetryOnInterruption(writer::abort);
             aborted = true;
             throw e;
         } finally {
             if (writer != null && !aborted) {
-                writer.finish();
+                runWithNoRetryOnInterruption(writer::finish);
             }
         }
 
         try {
-            long readValue = LongPointable.getLong(testClient.readAllBytes(bucket, path), 0);
+            byte[] bytes = runWithNoRetryOnInterruption(() -> testClient.readAllBytes(bucket, finalPath));
+            long readValue = LongPointable.getLong(bytes, 0);
             if (writeValue != readValue) {
-                // This should never happen unless S3 is messed up. But log for sanity check
+                // This should never happen unless cloud storage is messed up. But log for sanity check
                 LOGGER.warn(
                         "The writer can write but the written values wasn't successfully read back (wrote: {}, read:{})",
                         writeValue, readValue);
             }
         } finally {
             // Delete the written file
-            testClient.deleteObjects(bucket, Collections.singleton(path));
+            runWithNoRetryOnInterruption(() -> testClient.deleteObjects(bucket, Collections.singleton(finalPath)));
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
index 4dab80d..3ace317 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.cloud.io.request;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
 
 /**
@@ -30,5 +31,5 @@
     /**
      * Run pre-retry routine before reattempting {@link ICloudRequest} or {@link ICloudReturnableRequest}
      */
-    void beforeRetry();
+    void beforeRetry() throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRetryPredicate.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRetryPredicate.java
new file mode 100644
index 0000000..bcaae11
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRetryPredicate.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+import java.util.function.Predicate;
+
+/**
+ * An interface for a condition that determines whether a cloud operation should stop or not.
+ * If met, the encountered exception is re-thrown as root cause in HyracksDataException
+ */
+@FunctionalInterface
+public interface ICloudRetryPredicate extends Predicate<Exception> {
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
index da9c8fb..e011900 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
 import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRetryPredicate;
 import org.apache.hyracks.cloud.io.request.ICloudReturnableRequest;
 import org.apache.hyracks.util.ExponentialRetryPolicy;
 import org.apache.hyracks.util.IRetryPolicy;
@@ -55,7 +56,8 @@
     private static final int NUMBER_OF_RETRIES = getNumberOfRetries();
     private static final long MAX_DELAY_BETWEEN_RETRIES = getMaxDelayBetweenRetries();
 
-    private static final ICloudBeforeRetryRequest NO_OP_RETRY = () -> {
+    private static final ICloudRetryPredicate RETRY_ALWAYS_PREDICATE = e -> true;
+    private static final ICloudBeforeRetryRequest NO_OP_BEFORE_RETRY = () -> {
     };
 
     private CloudRetryableRequestUtil() {
@@ -67,7 +69,7 @@
      * @param request request to run
      */
     public static void run(ICloudRequest request) throws HyracksDataException {
-        run(request, NO_OP_RETRY);
+        run(request, NO_OP_BEFORE_RETRY);
     }
 
     /**
@@ -89,7 +91,7 @@
      * @return a value of return type
      */
     public static <T> T run(ICloudReturnableRequest<T> request) throws HyracksDataException {
-        return run(request, NO_OP_RETRY);
+        return run(request, NO_OP_BEFORE_RETRY);
     }
 
     /**
@@ -107,7 +109,7 @@
         try {
             while (true) {
                 try {
-                    return doRun(request, retry);
+                    return doRun(request, retry, RETRY_ALWAYS_PREDICATE);
                 } catch (Throwable e) {
                     // First, clear the interrupted flag
                     interrupted |= Thread.interrupted();
@@ -135,7 +137,7 @@
      * @param request request to run
      */
     public static void runWithNoRetryOnInterruption(ICloudRequest request) throws HyracksDataException {
-        doRun(request, NO_OP_RETRY);
+        runWithNoRetryOnInterruption(request, RETRY_ALWAYS_PREDICATE);
     }
 
     /**
@@ -151,14 +153,41 @@
         doRun(request, retry);
     }
 
-    private static <T> T doRun(ICloudReturnableRequest<T> request, ICloudBeforeRetryRequest retry)
+    public static void runWithNoRetryOnInterruption(ICloudRequest request, ICloudRetryPredicate shouldRetry)
             throws HyracksDataException {
+        doRun(request, NO_OP_BEFORE_RETRY, shouldRetry);
+    }
+
+    public static <T> T runWithNoRetryOnInterruption(ICloudReturnableRequest<T> request) throws HyracksDataException {
+        return runWithNoRetryOnInterruption(request, NO_OP_BEFORE_RETRY, RETRY_ALWAYS_PREDICATE);
+    }
+
+    public static <T> T runWithNoRetryOnInterruption(ICloudReturnableRequest<T> request, ICloudBeforeRetryRequest retry)
+            throws HyracksDataException {
+        return runWithNoRetryOnInterruption(request, retry, RETRY_ALWAYS_PREDICATE);
+    }
+
+    public static <T> T runWithNoRetryOnInterruption(ICloudReturnableRequest<T> request,
+            ICloudRetryPredicate shouldRetry) throws HyracksDataException {
+        return runWithNoRetryOnInterruption(request, NO_OP_BEFORE_RETRY, shouldRetry);
+    }
+
+    public static <T> T runWithNoRetryOnInterruption(ICloudReturnableRequest<T> request, ICloudBeforeRetryRequest retry,
+            ICloudRetryPredicate shouldRetry) throws HyracksDataException {
+        return doRun(request, retry, shouldRetry);
+    }
+
+    private static <T> T doRun(ICloudReturnableRequest<T> request, ICloudBeforeRetryRequest retry,
+            ICloudRetryPredicate shouldRetry) throws HyracksDataException {
         int attempt = 1;
         IRetryPolicy retryPolicy = null;
         while (true) {
             try {
                 return request.call();
             } catch (IOException | BaseServiceException | SdkException e) {
+                if (!shouldRetry.test(e)) {
+                    throw HyracksDataException.create(e);
+                }
                 if (retryPolicy == null) {
                     retryPolicy = new ExponentialRetryPolicy(NUMBER_OF_RETRIES, MAX_DELAY_BETWEEN_RETRIES);
                 }
@@ -182,7 +211,12 @@
     }
 
     private static void doRun(ICloudRequest request, ICloudBeforeRetryRequest retry) throws HyracksDataException {
-        doRun(asReturnableRequest(request), retry);
+        doRun(request, retry, RETRY_ALWAYS_PREDICATE);
+    }
+
+    private static void doRun(ICloudRequest request, ICloudBeforeRetryRequest retry, ICloudRetryPredicate shouldRetry)
+            throws HyracksDataException {
+        doRun(asReturnableRequest(request), retry, shouldRetry);
     }
 
     private static int getNumberOfRetries() {