[ASTERIXDB-3581][EXT]: Do not retry GCS SDK requests if thread is interrupted

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Do not retry any GCS requests if the thread
  has been interrupted.

Ext-ref: MB-65548
Change-Id: I5deb956bda11ee2ca41a1c05616788b2d697622f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19525
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Hussain Towaileb <hussainht@gmail.com>
Integration-Tests: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 007826b..6416df9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -233,7 +233,7 @@
         IBufferCacheReadContext defaultContext;
         if (isCloudDeployment()) {
             cloudConfigurator = CloudConfigurator.of(cloudProperties, ioManager, namespacePathResolver,
-                    getCloudGuardian(cloudProperties), threadExecutor);
+                    getCloudGuardian(cloudProperties));
             persistenceIOManager = cloudConfigurator.getCloudIoManager();
             partitionBootstrapper = cloudConfigurator.getPartitionBootstrapper();
             lockNotifier = cloudConfigurator.getLockNotifier();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 5eafe05..6928b64 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -191,7 +191,7 @@
         if (cloudDeployment) {
             cloudProperties = new CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
             ioManager = CloudConfigurator.createIOManager(ioManager, cloudProperties, namespacePathResolver,
-                    getCloudGuardian(cloudProperties), controllerService.getExecutor());
+                    getCloudGuardian(cloudProperties));
         }
         IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager);
         appCtx = createApplicationContext(null, globalRecoveryManager, lifecycleCoordinator, Receptionist::new,
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 516ede2..8fefcf5 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -31,7 +31,6 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
 import java.util.function.Predicate;
 
 import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
@@ -85,13 +84,12 @@
     private final List<FileStore> drivePaths;
 
     public AbstractCloudIOManager(IOManager ioManager, CloudProperties cloudProperties,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
-            throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
         super(ioManager.getIODevices(), ioManager.getDeviceComputer(), ioManager.getIOParallelism(),
                 ioManager.getQueueSize());
         this.nsPathResolver = nsPathResolver;
         this.bucket = cloudProperties.getStorageBucket();
-        cloudClient = CloudClientProvider.getClient(cloudProperties, guardian, executor);
+        cloudClient = CloudClientProvider.getClient(cloudProperties, guardian);
         this.guardian = guardian;
         int numOfThreads = getIODevices().size() * getIOParallelism();
         writeBufferProvider = new WriteBufferProvider(numOfThreads, cloudClient.getWriteBufferSize());
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
index e494333..dced5b0 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -70,12 +70,11 @@
     private final long diskCacheMonitoringInterval;
 
     private CloudConfigurator(CloudProperties cloudProperties, IIOManager ioManager,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
-            throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
         this.cloudProperties = cloudProperties;
         localIoManager = (IOManager) ioManager;
         diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() == CloudCachePolicy.SELECTIVE;
-        cloudIOManager = createIOManager(ioManager, cloudProperties, nsPathResolver, guardian, executor);
+        cloudIOManager = createIOManager(ioManager, cloudProperties, nsPathResolver, guardian);
         physicalDrive = createPhysicalDrive(diskCacheManagerRequired, cloudProperties, ioManager);
         lockNotifier = createLockNotifier(diskCacheManagerRequired);
         pageAllocator = createPageAllocator(diskCacheManagerRequired);
@@ -132,22 +131,20 @@
     }
 
     public static CloudConfigurator of(CloudProperties cloudProperties, IIOManager ioManager,
-            INamespacePathResolver nsPathResolver, ICloudGuardian cloudGuardian, ExecutorService executor)
-            throws HyracksDataException {
-        return new CloudConfigurator(cloudProperties, ioManager, nsPathResolver, cloudGuardian, executor);
+            INamespacePathResolver nsPathResolver, ICloudGuardian cloudGuardian) throws HyracksDataException {
+        return new CloudConfigurator(cloudProperties, ioManager, nsPathResolver, cloudGuardian);
     }
 
     public static AbstractCloudIOManager createIOManager(IIOManager ioManager, CloudProperties cloudProperties,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
-            throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
         IOManager localIoManager = (IOManager) ioManager;
         CloudCachePolicy policy = cloudProperties.getCloudCachePolicy();
         if (policy == CloudCachePolicy.EAGER) {
-            return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian, executor);
+            return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian);
         }
 
         boolean selective = policy == CloudCachePolicy.SELECTIVE;
-        return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, selective, guardian, executor);
+        return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, selective, guardian);
     }
 
     private static IPhysicalDrive createPhysicalDrive(boolean diskCacheManagerRequired, CloudProperties cloudProperties,
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
index b944161..b49f3ce 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.cloud;
 
-import java.util.concurrent.ExecutorService;
-
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.common.api.INamespacePathResolver;
 import org.apache.asterix.common.cloud.CloudCachePolicy;
@@ -34,14 +32,13 @@
     }
 
     public static IIOManager createIOManager(CloudProperties cloudProperties, IIOManager ioManager,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
-            throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
         IOManager localIoManager = (IOManager) ioManager;
         if (cloudProperties.getCloudCachePolicy() == CloudCachePolicy.LAZY) {
-            return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, false, guardian, executor);
+            return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, false, guardian);
         }
 
-        return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian, executor);
+        return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian);
     }
 
     public static IPartitionBootstrapper getCloudPartitionBootstrapper(IIOManager ioManager) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 29e5da0..1cb6077 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -23,7 +23,6 @@
 import java.io.File;
 import java.util.Collections;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.cloud.clients.ICloudGuardian;
@@ -49,9 +48,8 @@
     private static final Logger LOGGER = LogManager.getLogger();
 
     public EagerCloudIOManager(IOManager ioManager, CloudProperties cloudProperties,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
-            throws HyracksDataException {
-        super(ioManager, cloudProperties, nsPathResolver, guardian, executor);
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
+        super(ioManager, cloudProperties, nsPathResolver, guardian);
     }
 
     /*
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 7a7ea06..1c5efd9 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -29,7 +29,6 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
@@ -71,9 +70,9 @@
     private ILazyAccessor accessor;
 
     public LazyCloudIOManager(IOManager ioManager, CloudProperties cloudProperties,
-            INamespacePathResolver nsPathResolver, boolean selective, ICloudGuardian guardian, ExecutorService executor)
+            INamespacePathResolver nsPathResolver, boolean selective, ICloudGuardian guardian)
             throws HyracksDataException {
-        super(ioManager, cloudProperties, nsPathResolver, guardian, executor);
+        super(ioManager, cloudProperties, nsPathResolver, guardian);
         accessor = new InitialCloudAccessor(cloudClient, bucket, localIoManager);
         puncher = HolePuncherProvider.get(this, cloudProperties, writeBufferProvider);
         if (selective) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index a0c8bd0..c98c6b4 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.cloud.clients;
 
-import java.util.concurrent.ExecutorService;
-
 import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
 import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
 import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig;
@@ -40,8 +38,8 @@
         throw new AssertionError("do not instantiate");
     }
 
-    public static ICloudClient getClient(CloudProperties cloudProperties, ICloudGuardian guardian,
-            ExecutorService executor) throws HyracksDataException {
+    public static ICloudClient getClient(CloudProperties cloudProperties, ICloudGuardian guardian)
+            throws HyracksDataException {
         String storageScheme = cloudProperties.getStorageScheme();
         ICloudClient cloudClient;
         if (S3.equalsIgnoreCase(storageScheme)) {
@@ -49,7 +47,7 @@
             cloudClient = new S3CloudClient(config, guardian);
         } else if (GCS.equalsIgnoreCase(storageScheme)) {
             GCSClientConfig config = GCSClientConfig.of(cloudProperties);
-            cloudClient = new GCSCloudClient(config, guardian, executor);
+            cloudClient = new GCSCloudClient(config, guardian);
         } else if (AZ_BLOB.equalsIgnoreCase(storageScheme)) {
             AzBlobStorageClientConfig config = AzBlobStorageClientConfig.of(cloudProperties);
             cloudClient = new AzBlobStorageCloudClient(config, guardian);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 16fb278..62c1835 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.cloud.clients.google.gcs;
 
 import static org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig.DELETE_BATCH_SIZE;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY;
 
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -31,11 +32,6 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.function.Supplier;
 
 import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.cloud.clients.CloudFile;
@@ -71,7 +67,6 @@
 import com.google.cloud.storage.Storage.CopyRequest;
 import com.google.cloud.storage.StorageBatch;
 import com.google.cloud.storage.StorageBatchResult;
-import com.google.cloud.storage.StorageException;
 import com.google.cloud.storage.StorageOptions;
 
 public class GCSCloudClient implements ICloudClient {
@@ -81,15 +76,12 @@
     private final ICloudGuardian guardian;
     private final IRequestProfilerLimiter profilerLimiter;
     private final int writeBufferSize;
-    private final ExecutorService executor;
 
-    public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian,
-            ExecutorService executor) {
+    public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian) {
         this.gcsClient = gcsClient;
         this.config = config;
         this.guardian = guardian;
         this.writeBufferSize = config.getWriteBufferSize();
-        this.executor = executor;
         long profilerInterval = config.getProfilerLogInterval();
         GCSRequestRateLimiter limiter = new GCSRequestRateLimiter(config);
         if (profilerInterval > 0) {
@@ -100,9 +92,8 @@
         guardian.setCloudClient(this);
     }
 
-    public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian, ExecutorService executor)
-            throws HyracksDataException {
-        this(config, buildClient(config), guardian, executor);
+    public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian) throws HyracksDataException {
+        this(config, buildClient(config), guardian);
     }
 
     @Override
@@ -121,12 +112,11 @@
     }
 
     @Override
-    public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) throws HyracksDataException {
+    public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectsList();
-        // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread
-        Page<Blob> blobs = runOpInterruptibly(() -> gcsClient.list(bucket,
-                BlobListOption.prefix(config.getPrefix() + path), BlobListOption.fields(Storage.BlobField.SIZE)));
+        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path),
+                BlobListOption.fields(Storage.BlobField.SIZE));
         Set<CloudFile> files = new HashSet<>();
         for (Blob blob : blobs.iterateAll()) {
             if (filter.accept(null, IoUtil.getFileNameFromPath(blob.getName()))) {
@@ -148,7 +138,7 @@
                 from.seek(offset + totalRead);
                 totalRead += from.read(buffer);
             }
-        } catch (IOException | StorageException ex) {
+        } catch (IOException | BaseServiceException ex) {
             throw HyracksDataException.create(ex);
         }
 
@@ -163,17 +153,14 @@
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
         BlobId blobId = BlobId.of(bucket, config.getPrefix() + path);
-        // MB-65432: Storage.readAllBytes is not interrupt-safe; we need to offload onto another thread
-        return runOpInterruptibly(() -> {
-            try {
-                return gcsClient.readAllBytes(blobId);
-            } catch (StorageException e) {
-                if (e.getCode() == 404) {
-                    return null;
-                }
-                throw e;
+        try {
+            return gcsClient.readAllBytes(blobId);
+        } catch (BaseServiceException ex) {
+            if (ex.getCode() == 404) {
+                return null;
             }
-        });
+            throw HyracksDataException.create(ex);
+        }
     }
 
     @Override
@@ -185,38 +172,33 @@
             reader = gcsClient.reader(bucket, config.getPrefix() + path).limit(offset + length);
             reader.seek(offset);
             return Channels.newInputStream(reader);
-        } catch (StorageException | IOException ex) {
+        } catch (BaseServiceException | IOException ex) {
             throw new RuntimeException(CleanupUtils.close(reader, ex));
         }
     }
 
     @Override
-    public void write(String bucket, String path, byte[] data) throws HyracksDataException {
+    public void write(String bucket, String path, byte[] data) {
         guardian.checkWriteAccess(bucket, path);
         profilerLimiter.objectWrite();
         BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() + path).build();
-        // MB-65432: Storage.create is not interrupt-safe; we need to offload onto another thread
-        runOpInterruptibly(() -> gcsClient.create(blobInfo, data));
+        gcsClient.create(blobInfo, data);
     }
 
     @Override
-    public void copy(String bucket, String srcPath, FileReference destPath) throws HyracksDataException {
+    public void copy(String bucket, String srcPath, FileReference destPath) {
         guardian.checkReadAccess(bucket, srcPath);
         profilerLimiter.objectsList();
-        // MB-65432: Storage.list & copy are not interrupt-safe; we need to offload onto another thread
-        runOpInterruptibly(() -> {
-            Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath));
-            for (Blob blob : blobs.iterateAll()) {
-                profilerLimiter.objectCopy();
-                BlobId source = blob.getBlobId();
-                String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
-                BlobId target = BlobId.of(bucket, targetName);
-                guardian.checkWriteAccess(bucket, targetName);
-                CopyRequest copyReq = CopyRequest.newBuilder().setSource(source).setTarget(target).build();
-                gcsClient.copy(copyReq);
-            }
-            return null;
-        });
+        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath));
+        for (Blob blob : blobs.iterateAll()) {
+            profilerLimiter.objectCopy();
+            BlobId source = blob.getBlobId();
+            String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
+            BlobId target = BlobId.of(bucket, targetName);
+            guardian.checkWriteAccess(bucket, targetName);
+            CopyRequest copyReq = CopyRequest.newBuilder().setSource(source).setTarget(target).build();
+            gcsClient.copy(copyReq);
+        }
     }
 
     @Override
@@ -234,8 +216,7 @@
                 guardian.checkWriteAccess(bucket, blobId.getName());
                 deleteResponses.add(batchRequest.delete(blobId));
             }
-            // MB-65432: StorageBatch.submit may not be interrupt-safe; we need to offload onto another thread
-            runOpInterruptibly(batchRequest::submit);
+            batchRequest.submit();
             Iterator<String> deletePathIter = paths.iterator();
             for (StorageBatchResult<Boolean> deleteResponse : deleteResponses) {
                 String deletedPath = deletePathIter.next();
@@ -250,7 +231,7 @@
                     }
                 } catch (BaseServiceException e) {
                     LOGGER.warn("Failed to delete object {} while deleting {}", deletedPath, paths, e);
-                    throw new RuntimeDataException(ErrorCode.CLOUD_IO_FAILURE, e, "DELETE", deletedPath,
+                    throw RuntimeDataException.create(ErrorCode.CLOUD_IO_FAILURE, e, "DELETE", deletedPath,
                             paths.toString());
                 }
             }
@@ -259,12 +240,11 @@
     }
 
     @Override
-    public long getObjectSize(String bucket, String path) throws HyracksDataException {
+    public long getObjectSize(String bucket, String path) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        // MB-65432: Storage.get is not interrupt-safe; we need to offload onto another thread
-        Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, config.getPrefix() + path,
-                Storage.BlobGetOption.fields(Storage.BlobField.SIZE)));
+        Blob blob =
+                gcsClient.get(bucket, config.getPrefix() + path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
         if (blob == null) {
             return 0;
         }
@@ -272,22 +252,19 @@
     }
 
     @Override
-    public boolean exists(String bucket, String path) throws HyracksDataException {
+    public boolean exists(String bucket, String path) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        // MB-65432: Storage.get is not interrupt-safe; we need to offload onto another thread
-        Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, config.getPrefix() + path,
-                Storage.BlobGetOption.fields(Storage.BlobField.values())));
+        Blob blob = gcsClient.get(bucket, config.getPrefix() + path,
+                Storage.BlobGetOption.fields(Storage.BlobField.values()));
         return blob != null && blob.exists();
     }
 
     @Override
-    public boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException {
+    public boolean isEmptyPrefix(String bucket, String path) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectsList();
-        // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread
-        Page<Blob> blobs =
-                runOpInterruptibly(() -> gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path)));
+        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path));
         return !blobs.iterateAll().iterator().hasNext();
     }
 
@@ -298,12 +275,10 @@
     }
 
     @Override
-    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) throws HyracksDataException {
+    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
         guardian.checkReadAccess(bucket, "/");
         profilerLimiter.objectsList();
-        // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread
-        Page<Blob> blobs =
-                runOpInterruptibly(() -> gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE)));
+        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE));
         ArrayNode objectsInfo = objectMapper.createArrayNode();
 
         List<Blob> objects = new ArrayList<>();
@@ -328,6 +303,7 @@
 
     private static Storage buildClient(GCSClientConfig config) throws HyracksDataException {
         StorageOptions.Builder builder = StorageOptions.newBuilder().setCredentials(config.createCredentialsProvider());
+        builder.setStorageRetryStrategy(DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY);
 
         if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
             builder.setHost(config.getEndpoint());
@@ -338,42 +314,4 @@
     private String stripCloudPrefix(String objectName) {
         return objectName.substring(config.getPrefix().length());
     }
-
-    private void runOpInterruptibly(Runnable operation) throws HyracksDataException {
-        try {
-            executor.submit(operation).get();
-        } catch (InterruptedException e) {
-            throw HyracksDataException.create(e);
-        } catch (ExecutionException e) {
-            throw HyracksDataException.create(e.getCause());
-        }
-    }
-
-    private <T> T runOpInterruptibly(Supplier<T> operation) throws HyracksDataException {
-        Future<T> opTask = executor.submit(operation::get);
-        try {
-            return opTask.get();
-        } catch (InterruptedException e) {
-            cancelAndUnwind(opTask);
-            throw HyracksDataException.create(e);
-        } catch (ExecutionException e) {
-            throw HyracksDataException.create(e.getCause());
-        }
-    }
-
-    private static <T> void cancelAndUnwind(Future<T> opTask) {
-        opTask.cancel(true);
-        while (true) {
-            try {
-                opTask.get();
-            } catch (InterruptedException e1) {
-                continue;
-            } catch (CancellationException e1) {
-                LOGGER.debug("ignoring exception after cancel of op", e1);
-            } catch (ExecutionException e1) {
-                LOGGER.debug("ignoring exception after cancel of op", e1.getCause());
-            }
-            return;
-        }
-    }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
index cbbaf31..b9e7eee 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.cloud.clients.google.gcs;
 
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY;
+
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -64,6 +66,7 @@
         this.ioManager = ioManager;
         this.profiler = profiler;
         StorageOptions.Builder builder = StorageOptions.newBuilder();
+        builder.setStorageRetryStrategy(DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY);
         if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
             builder.setHost(config.getEndpoint());
         }
@@ -95,7 +98,6 @@
             downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
                     downConfig.setDownloadDirectory(entry.getKey()).build()));
         }
-        // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no need to offload
         downloadJobs.forEach(DownloadJob::getDownloadResults);
     }
 
@@ -122,7 +124,6 @@
         }
         List<DownloadResult> results;
         for (DownloadJob job : downloadJobs) {
-            // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no need to offload
             results = job.getDownloadResults();
             for (DownloadResult result : results) {
                 if (result.getStatus() != TransferStatus.SUCCESS) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 36bb94c..3a34365 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -67,8 +67,7 @@
     ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException {
         GCSClientConfig config = GCSClientConfig.of(configuration, writeBufferSize);
         return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
-                ICloudGuardian.NoOpCloudGuardian.INSTANCE,
-                appCtx.getServiceContext().getControllerService().getExecutor());
+                ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }
 
     @Override
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
index 2f64c37..d89c872 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.cloud.gcs;
 
-import java.util.concurrent.Executors;
-
 import org.apache.asterix.cloud.AbstractLSMTest;
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
@@ -54,8 +52,7 @@
         int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE);
         GCSClientConfig config =
                 new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, true, 0, writeBufferSize, "");
-        CLOUD_CLIENT =
-                new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE, Executors.newCachedThreadPool());
+        CLOUD_CLIENT = new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }
 
     private static void cleanup() {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
index 2613f34..739dbde 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
@@ -18,7 +18,23 @@
  */
 package org.apache.asterix.external.util.google.gcs;
 
+import static org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace;
+
+import java.util.concurrent.CancellationException;
+
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.google.api.gax.retrying.ResultRetryAlgorithm;
+import com.google.api.gax.retrying.TimedAttemptSettings;
+import com.google.cloud.ExceptionHandler;
+import com.google.cloud.storage.StorageRetryStrategy;
+
 public class GCSConstants {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
     private GCSConstants() {
         throw new AssertionError("do not instantiate");
     }
@@ -36,25 +52,87 @@
     // hadoop credentials
     public static final String HADOOP_AUTH_TYPE = "fs.gs.auth.type";
     public static final String HADOOP_AUTH_UNAUTHENTICATED = "UNAUTHENTICATED";
-    public static final String HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE = "SERVICE_ACCOUNT_JSON_KEYFILE";
-    public static final String HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH =
-            "google.cloud.auth.service.account.json.keyfile";
 
     // gs hadoop parameters
     public static final String HADOOP_SUPPORT_COMPRESSED = "fs.gs.inputstream.support.gzip.encoding.enable";
     public static final String HADOOP_ENDPOINT = "fs.gs.storage.root.url";
-    public static final String HADOOP_MAX_REQUESTS_PER_BATCH = "fs.gs.max.requests.per.batch";
-    public static final String HADOOP_BATCH_THREADS = "fs.gs.batch.threads";
 
-    public static class JSON_CREDENTIALS_FIELDS {
+    public static class JsonCredentials {
         public static final String PRIVATE_KEY_ID = "private_key_id";
         public static final String PRIVATE_KEY = "private_key";
         public static final String CLIENT_EMAIL = "client_email";
     }
 
-    public static class HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS {
+    public static class HadoopAuthServiceAccount {
         public static final String PRIVATE_KEY_ID = "fs.gs.auth.service.account.private.key.id";
         public static final String PRIVATE_KEY = "fs.gs.auth.service.account.private.key";
         public static final String CLIENT_EMAIL = "fs.gs.auth.service.account.email";
     }
+
+    public static final StorageRetryStrategy DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY;
+    static {
+        StorageRetryStrategy defaultStrategy = StorageRetryStrategy.getDefaultStorageRetryStrategy();
+        ExceptionHandler defaultIdempotentHandler = (ExceptionHandler) defaultStrategy.getIdempotentHandler();
+        ExceptionHandler defaultNonIdempotentHandler = (ExceptionHandler) defaultStrategy.getNonidempotentHandler();
+
+        ResultRetryAlgorithm<Object> noRetryOnThreadInterruptIdempotentHandler = new ResultRetryAlgorithm<>() {
+            @Override
+            public TimedAttemptSettings createNextAttempt(Throwable prevThrowable, Object prevResponse,
+                    TimedAttemptSettings prevSettings) {
+                return defaultIdempotentHandler.createNextAttempt(prevThrowable, prevResponse, prevSettings);
+            }
+
+            @Override
+            public boolean shouldRetry(Throwable prevThrowable, Object prevResponse) throws CancellationException {
+                if (ExceptionUtils.causedByInterrupt(prevThrowable) || Thread.currentThread().isInterrupted()) {
+                    interruptRequest(prevThrowable);
+                }
+                return defaultIdempotentHandler.shouldRetry(prevThrowable, prevResponse);
+            }
+        };
+
+        ResultRetryAlgorithm<Object> noRetryOnThreadInterruptNonIdempotentHandler = new ResultRetryAlgorithm<>() {
+            @Override
+            public TimedAttemptSettings createNextAttempt(Throwable prevThrowable, Object prevResponse,
+                    TimedAttemptSettings prevSettings) {
+                return defaultNonIdempotentHandler.createNextAttempt(prevThrowable, prevResponse, prevSettings);
+            }
+
+            @Override
+            public boolean shouldRetry(Throwable prevThrowable, Object prevResponse) throws CancellationException {
+                if (ExceptionUtils.causedByInterrupt(prevThrowable) || Thread.currentThread().isInterrupted()) {
+                    interruptRequest(prevThrowable);
+                }
+                return defaultNonIdempotentHandler.shouldRetry(prevThrowable, prevResponse);
+            }
+        };
+
+        DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY = new StorageRetryStrategy() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ResultRetryAlgorithm<?> getIdempotentHandler() {
+                return noRetryOnThreadInterruptIdempotentHandler;
+            }
+
+            @Override
+            public ResultRetryAlgorithm<?> getNonidempotentHandler() {
+                return noRetryOnThreadInterruptNonIdempotentHandler;
+            }
+        };
+    }
+
+    /**
+     * Throwing a CancellationException will cause the GCS client to abort the whole operation, not only stop retrying
+     */
+    private static void interruptRequest(Throwable th) {
+        Thread.currentThread().interrupt();
+        CancellationException ex = new CancellationException("Request was interrupted, aborting retries and request");
+        if (th != null) {
+            ex.initCause(th);
+        }
+        String stackTrace = getStackTrace(ex);
+        LOGGER.debug("Request was interrupted, aborting retries and request\n{}", stackTrace);
+        throw ex;
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index 481b7ff..d768c23 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -26,6 +26,7 @@
 import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
 import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY;
 import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
 import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_TYPE;
 import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_UNAUTHENTICATED;
@@ -94,6 +95,7 @@
         String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
 
         StorageOptions.Builder builder = StorageOptions.newBuilder();
+        builder.setStorageRetryStrategy(DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY);
 
         // default credentials provider
         if (applicationDefaultCredentials != null) {
@@ -259,12 +261,12 @@
                 // Setting these values instead of HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH is supported
                 // in com.google.cloud.bigdataoss:util-hadoop only up to version hadoop3-2.2.x and is removed in
                 // version 3.x.y, which also removed support for hadoop-2
-                conf.set(GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS.PRIVATE_KEY_ID,
-                        jsonCreds.get(GCSConstants.JSON_CREDENTIALS_FIELDS.PRIVATE_KEY_ID).asText());
-                conf.set(GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS.PRIVATE_KEY,
-                        jsonCreds.get(GCSConstants.JSON_CREDENTIALS_FIELDS.PRIVATE_KEY).asText());
-                conf.set(GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS.CLIENT_EMAIL,
-                        jsonCreds.get(GCSConstants.JSON_CREDENTIALS_FIELDS.CLIENT_EMAIL).asText());
+                conf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY_ID,
+                        jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY_ID).asText());
+                conf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY,
+                        jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY).asText());
+                conf.set(GCSConstants.HadoopAuthServiceAccount.CLIENT_EMAIL,
+                        jsonCreds.get(GCSConstants.JsonCredentials.CLIENT_EMAIL).asText());
             } catch (JsonProcessingException e) {
                 throw CompilationException.create(EXTERNAL_SOURCE_ERROR, "Unable to parse Json Credentials",
                         getMessageOrToString(e));
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
index 0b7af91..483076b 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
@@ -61,6 +61,10 @@
         this.sourceLoc = sourceLoc;
         this.nodeId = nodeId;
         this.params = params;
+
+        if (cause instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
+        }
     }
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 46c8ec2..50df1cb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -43,8 +43,6 @@
         }
         if (cause instanceof HyracksDataException) {
             return (HyracksDataException) cause;
-        } else if (cause instanceof InterruptedException) {
-            Thread.currentThread().interrupt();
         } else if (cause instanceof Error) {
             throw (Error) cause;
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index 12f1095..b12c8df 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -79,6 +79,10 @@
         this.errorCode = intCode;
         this.nodeId = nodeId;
         this.params = params;
+
+        if (cause instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
+        }
     }
 
     protected HyracksException(IError errorCode, Throwable cause, SourceLocation sourceLoc, String nodeId,
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
index d616eb5..ddba3f0 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
@@ -128,6 +128,9 @@
     }
 
     public static Throwable getRootCause(Throwable e) {
+        if (e == null) {
+            return null;
+        }
         Throwable current = e;
         Throwable cause = e.getCause();
         while (cause != null && cause != current) {