[NO ISSUE][*DB][CLOUD] Ensure interrupts are not lost during GCS ops

Ext-ref: MB-65432
Change-Id: Iab4b86fe3966a2d0b509c9bd871f792c3e35ff23
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19464
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 343baf0..de4a066 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -225,13 +225,15 @@
             IConfigValidatorFactory configValidatorFactory, IReplicationStrategyFactory replicationStrategyFactory,
             boolean initialRun) throws IOException {
         ioManager = getServiceContext().getIoManager();
+        threadExecutor =
+                MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
         CloudConfigurator cloudConfigurator;
         IDiskResourceCacheLockNotifier lockNotifier;
         IDiskCachedPageAllocator pageAllocator;
         IBufferCacheReadContext defaultContext;
         if (isCloudDeployment()) {
             cloudConfigurator = CloudConfigurator.of(cloudProperties, ioManager, namespacePathResolver,
-                    getCloudGuardian(cloudProperties));
+                    getCloudGuardian(cloudProperties), threadExecutor);
             persistenceIOManager = cloudConfigurator.getCloudIoManager();
             partitionBootstrapper = cloudConfigurator.getPartitionBootstrapper();
             lockNotifier = cloudConfigurator.getLockNotifier();
@@ -247,8 +249,6 @@
         }
 
         int ioQueueLen = getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
-        threadExecutor =
-                MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
         ICacheMemoryAllocator bufferAllocator = new HeapBufferAllocator();
         IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000);
         IPageReplacementStrategy prs = new ClockPageReplacementStrategy(bufferAllocator, pageAllocator,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 78f574b..179b996 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -131,7 +131,7 @@
      * not supported, yet.
      */
     @Override
-    public SystemState getSystemState() throws ACIDException {
+    public SystemState getSystemState() throws ACIDException, HyracksDataException {
         //read checkpoint file
         Checkpoint checkpointObject = checkpointManager.getLatest();
         if (checkpointObject == null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 6928b64..5eafe05 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -191,7 +191,7 @@
         if (cloudDeployment) {
             cloudProperties = new CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
             ioManager = CloudConfigurator.createIOManager(ioManager, cloudProperties, namespacePathResolver,
-                    getCloudGuardian(cloudProperties));
+                    getCloudGuardian(cloudProperties), controllerService.getExecutor());
         }
         IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager);
         appCtx = createApplicationContext(null, globalRecoveryManager, lifecycleCoordinator, Receptionist::new,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 6f8126b..0da4e0d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -370,7 +370,7 @@
         return nodeStatus == NodeStatus.IDLE && (primaryCc == null || primaryCc.equals(registeredCc));
     }
 
-    private SystemState getCurrentSystemState() {
+    private SystemState getCurrentSystemState() throws HyracksDataException {
         final NodeProperties nodeProperties = runtimeContext.getNodeProperties();
         IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
         SystemState state = recoveryMgr.getSystemState();
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 912ca47..516ede2 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -31,6 +31,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Predicate;
 
 import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
@@ -84,12 +85,13 @@
     private final List<FileStore> drivePaths;
 
     public AbstractCloudIOManager(IOManager ioManager, CloudProperties cloudProperties,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
+            throws HyracksDataException {
         super(ioManager.getIODevices(), ioManager.getDeviceComputer(), ioManager.getIOParallelism(),
                 ioManager.getQueueSize());
         this.nsPathResolver = nsPathResolver;
         this.bucket = cloudProperties.getStorageBucket();
-        cloudClient = CloudClientProvider.getClient(cloudProperties, guardian);
+        cloudClient = CloudClientProvider.getClient(cloudProperties, guardian, executor);
         this.guardian = guardian;
         int numOfThreads = getIODevices().size() * getIOParallelism();
         writeBufferProvider = new WriteBufferProvider(numOfThreads, cloudClient.getWriteBufferSize());
@@ -106,7 +108,7 @@
      */
 
     @Override
-    public SystemState getSystemStateOnMissingCheckpoint() {
+    public SystemState getSystemStateOnMissingCheckpoint() throws HyracksDataException {
         Set<CloudFile> existingMetadataFiles = getCloudMetadataPartitionFiles();
         CloudFile bootstrapMarkerPath = CloudFile.of(StoragePathUtil.getBootstrapMarkerRelativePath(nsPathResolver));
         if (existingMetadataFiles.isEmpty() || existingMetadataFiles.contains(bootstrapMarkerPath)) {
@@ -436,7 +438,7 @@
      * @param key   the key where the bytes will be written
      * @param bytes the bytes to write
      */
-    public final void put(String key, byte[] bytes) {
+    public final void put(String key, byte[] bytes) throws HyracksDataException {
         cloudClient.write(bucket, key, bytes);
     }
 
@@ -444,7 +446,7 @@
         return cloudClient;
     }
 
-    private Set<CloudFile> getCloudMetadataPartitionFiles() {
+    private Set<CloudFile> getCloudMetadataPartitionFiles() throws HyracksDataException {
         String metadataNamespacePath = StoragePathUtil.getNamespacePath(nsPathResolver,
                 MetadataConstants.METADATA_NAMESPACE, METADATA_PARTITION);
         return cloudClient.listObjects(bucket, metadataNamespacePath, IoUtil.NO_OP_FILTER);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
index dced5b0..e494333 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -70,11 +70,12 @@
     private final long diskCacheMonitoringInterval;
 
     private CloudConfigurator(CloudProperties cloudProperties, IIOManager ioManager,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
+            throws HyracksDataException {
         this.cloudProperties = cloudProperties;
         localIoManager = (IOManager) ioManager;
         diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() == CloudCachePolicy.SELECTIVE;
-        cloudIOManager = createIOManager(ioManager, cloudProperties, nsPathResolver, guardian);
+        cloudIOManager = createIOManager(ioManager, cloudProperties, nsPathResolver, guardian, executor);
         physicalDrive = createPhysicalDrive(diskCacheManagerRequired, cloudProperties, ioManager);
         lockNotifier = createLockNotifier(diskCacheManagerRequired);
         pageAllocator = createPageAllocator(diskCacheManagerRequired);
@@ -131,20 +132,22 @@
     }
 
     public static CloudConfigurator of(CloudProperties cloudProperties, IIOManager ioManager,
-            INamespacePathResolver nsPathResolver, ICloudGuardian cloudGuardian) throws HyracksDataException {
-        return new CloudConfigurator(cloudProperties, ioManager, nsPathResolver, cloudGuardian);
+            INamespacePathResolver nsPathResolver, ICloudGuardian cloudGuardian, ExecutorService executor)
+            throws HyracksDataException {
+        return new CloudConfigurator(cloudProperties, ioManager, nsPathResolver, cloudGuardian, executor);
     }
 
     public static AbstractCloudIOManager createIOManager(IIOManager ioManager, CloudProperties cloudProperties,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
+            throws HyracksDataException {
         IOManager localIoManager = (IOManager) ioManager;
         CloudCachePolicy policy = cloudProperties.getCloudCachePolicy();
         if (policy == CloudCachePolicy.EAGER) {
-            return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian);
+            return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian, executor);
         }
 
         boolean selective = policy == CloudCachePolicy.SELECTIVE;
-        return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, selective, guardian);
+        return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, selective, guardian, executor);
     }
 
     private static IPhysicalDrive createPhysicalDrive(boolean diskCacheManagerRequired, CloudProperties cloudProperties,
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
index b49f3ce..b944161 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.cloud;
 
+import java.util.concurrent.ExecutorService;
+
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.common.api.INamespacePathResolver;
 import org.apache.asterix.common.cloud.CloudCachePolicy;
@@ -32,13 +34,14 @@
     }
 
     public static IIOManager createIOManager(CloudProperties cloudProperties, IIOManager ioManager,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
+            throws HyracksDataException {
         IOManager localIoManager = (IOManager) ioManager;
         if (cloudProperties.getCloudCachePolicy() == CloudCachePolicy.LAZY) {
-            return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, false, guardian);
+            return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, false, guardian, executor);
         }
 
-        return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian);
+        return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian, executor);
     }
 
     public static IPartitionBootstrapper getCloudPartitionBootstrapper(IIOManager ioManager) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 1cb6077..29e5da0 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -23,6 +23,7 @@
 import java.io.File;
 import java.util.Collections;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.cloud.clients.ICloudGuardian;
@@ -48,8 +49,9 @@
     private static final Logger LOGGER = LogManager.getLogger();
 
     public EagerCloudIOManager(IOManager ioManager, CloudProperties cloudProperties,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
-        super(ioManager, cloudProperties, nsPathResolver, guardian);
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
+            throws HyracksDataException {
+        super(ioManager, cloudProperties, nsPathResolver, guardian, executor);
     }
 
     /*
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 1c5efd9..7a7ea06 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -29,6 +29,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
@@ -70,9 +71,9 @@
     private ILazyAccessor accessor;
 
     public LazyCloudIOManager(IOManager ioManager, CloudProperties cloudProperties,
-            INamespacePathResolver nsPathResolver, boolean selective, ICloudGuardian guardian)
+            INamespacePathResolver nsPathResolver, boolean selective, ICloudGuardian guardian, ExecutorService executor)
             throws HyracksDataException {
-        super(ioManager, cloudProperties, nsPathResolver, guardian);
+        super(ioManager, cloudProperties, nsPathResolver, guardian, executor);
         accessor = new InitialCloudAccessor(cloudClient, bucket, localIoManager);
         puncher = HolePuncherProvider.get(this, cloudProperties, writeBufferProvider);
         if (selective) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index c98c6b4..a0c8bd0 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.cloud.clients;
 
+import java.util.concurrent.ExecutorService;
+
 import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
 import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
 import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig;
@@ -38,8 +40,8 @@
         throw new AssertionError("do not instantiate");
     }
 
-    public static ICloudClient getClient(CloudProperties cloudProperties, ICloudGuardian guardian)
-            throws HyracksDataException {
+    public static ICloudClient getClient(CloudProperties cloudProperties, ICloudGuardian guardian,
+            ExecutorService executor) throws HyracksDataException {
         String storageScheme = cloudProperties.getStorageScheme();
         ICloudClient cloudClient;
         if (S3.equalsIgnoreCase(storageScheme)) {
@@ -47,7 +49,7 @@
             cloudClient = new S3CloudClient(config, guardian);
         } else if (GCS.equalsIgnoreCase(storageScheme)) {
             GCSClientConfig config = GCSClientConfig.of(cloudProperties);
-            cloudClient = new GCSCloudClient(config, guardian);
+            cloudClient = new GCSCloudClient(config, guardian, executor);
         } else if (AZ_BLOB.equalsIgnoreCase(storageScheme)) {
             AzBlobStorageClientConfig config = AzBlobStorageClientConfig.of(cloudProperties);
             cloudClient = new AzBlobStorageCloudClient(config, guardian);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index fd82944..d332944 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -65,7 +65,7 @@
      * @param filter filter to apply
      * @return file names returned after applying the file name filter
      */
-    Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter);
+    Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) throws HyracksDataException;
 
     /**
      * Performs a range-read from the specified bucket and path starting at the offset. The amount read is equal to the
@@ -107,7 +107,7 @@
      * @param path   path
      * @param data   data
      */
-    void write(String bucket, String path, byte[] data);
+    void write(String bucket, String path, byte[] data) throws HyracksDataException;
 
     /**
      * Copies an object from the source path to the destination path
@@ -116,7 +116,7 @@
      * @param srcPath  source path
      * @param destPath destination path
      */
-    void copy(String bucket, String srcPath, FileReference destPath);
+    void copy(String bucket, String srcPath, FileReference destPath) throws HyracksDataException;
 
     /**
      * Deletes all objects at the specified bucket and paths
@@ -162,7 +162,7 @@
      * @param bucket       bucket name
      * @return {@link JsonNode} with stored objects' information
      */
-    JsonNode listAsJson(ObjectMapper objectMapper, String bucket);
+    JsonNode listAsJson(ObjectMapper objectMapper, String bucket) throws HyracksDataException;
 
     /**
      * Performs any necessary closing and cleaning up
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
index 28fa53e..2f61b31 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
@@ -66,7 +66,7 @@
     }
 
     @Override
-    public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
+    public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) throws HyracksDataException {
         return cloudClient.listObjects(bucket, path, filter);
     }
 
@@ -88,12 +88,12 @@
     }
 
     @Override
-    public void write(String bucket, String path, byte[] data) {
+    public void write(String bucket, String path, byte[] data) throws HyracksDataException {
         cloudClient.write(bucket, path, data);
     }
 
     @Override
-    public void copy(String bucket, String srcPath, FileReference destPath) {
+    public void copy(String bucket, String srcPath, FileReference destPath) throws HyracksDataException {
         cloudClient.copy(bucket, srcPath, destPath);
     }
 
@@ -127,7 +127,7 @@
     }
 
     @Override
-    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) throws HyracksDataException {
         return cloudClient.listAsJson(objectMapper, bucket);
     }
 
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 924f34a..99cda9e 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -31,6 +31,9 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 
 import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.cloud.clients.CloudFile;
@@ -76,12 +79,15 @@
     private final ICloudGuardian guardian;
     private final IRequestProfilerLimiter profilerLimiter;
     private final int writeBufferSize;
+    private final ExecutorService executor;
 
-    public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian) {
+    public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian,
+            ExecutorService executor) {
         this.gcsClient = gcsClient;
         this.config = config;
         this.guardian = guardian;
         this.writeBufferSize = config.getWriteBufferSize();
+        this.executor = executor;
         long profilerInterval = config.getProfilerLogInterval();
         GCSRequestRateLimiter limiter = new GCSRequestRateLimiter(config);
         if (profilerInterval > 0) {
@@ -92,8 +98,9 @@
         guardian.setCloudClient(this);
     }
 
-    public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian) throws HyracksDataException {
-        this(config, buildClient(config), guardian);
+    public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian, ExecutorService executor)
+            throws HyracksDataException {
+        this(config, buildClient(config), guardian, executor);
     }
 
     @Override
@@ -112,12 +119,12 @@
     }
 
     @Override
-    public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
+    public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) throws HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path),
-                BlobListOption.fields(Storage.BlobField.SIZE));
-
+        // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread
+        Page<Blob> blobs = runOpInterruptibly(() -> gcsClient.list(bucket,
+                BlobListOption.prefix(config.getPrefix() + path), BlobListOption.fields(Storage.BlobField.SIZE)));
         Set<CloudFile> files = new HashSet<>();
         for (Blob blob : blobs.iterateAll()) {
             if (filter.accept(null, IoUtil.getFileNameFromPath(blob.getName()))) {
@@ -150,15 +157,21 @@
     }
 
     @Override
-    public byte[] readAllBytes(String bucket, String path) {
+    public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
         BlobId blobId = BlobId.of(bucket, config.getPrefix() + path);
-        try {
-            return gcsClient.readAllBytes(blobId);
-        } catch (StorageException e) {
-            return null;
-        }
+        // MB-65432: Storage.readAllBytes is not interrupt-safe; we need to offload onto another thread
+        return runOpInterruptibly(() -> {
+            try {
+                return gcsClient.readAllBytes(blobId);
+            } catch (StorageException e) {
+                if (e.getCode() == 404) {
+                    return null;
+                }
+                throw e;
+            }
+        });
     }
 
     @Override
@@ -176,27 +189,32 @@
     }
 
     @Override
-    public void write(String bucket, String path, byte[] data) {
+    public void write(String bucket, String path, byte[] data) throws HyracksDataException {
         guardian.checkWriteAccess(bucket, path);
         profilerLimiter.objectWrite();
         BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() + path).build();
-        gcsClient.create(blobInfo, data);
+        // MB-65432: Storage.create is not interrupt-safe; we need to offload onto another thread
+        runOpInterruptibly(() -> gcsClient.create(blobInfo, data));
     }
 
     @Override
-    public void copy(String bucket, String srcPath, FileReference destPath) {
+    public void copy(String bucket, String srcPath, FileReference destPath) throws HyracksDataException {
         guardian.checkReadAccess(bucket, srcPath);
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath));
-        for (Blob blob : blobs.iterateAll()) {
-            profilerLimiter.objectCopy();
-            BlobId source = blob.getBlobId();
-            String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
-            BlobId target = BlobId.of(bucket, targetName);
-            guardian.checkWriteAccess(bucket, targetName);
-            CopyRequest copyReq = CopyRequest.newBuilder().setSource(source).setTarget(target).build();
-            gcsClient.copy(copyReq);
-        }
+        // MB-65432: Storage.list & copy are not interrupt-safe; we need to offload onto another thread
+        runOpInterruptibly(() -> {
+            Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath));
+            for (Blob blob : blobs.iterateAll()) {
+                profilerLimiter.objectCopy();
+                BlobId source = blob.getBlobId();
+                String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
+                BlobId target = BlobId.of(bucket, targetName);
+                guardian.checkWriteAccess(bucket, targetName);
+                CopyRequest copyReq = CopyRequest.newBuilder().setSource(source).setTarget(target).build();
+                gcsClient.copy(copyReq);
+            }
+            return null;
+        });
     }
 
     @Override
@@ -206,17 +224,16 @@
         }
 
         List<StorageBatchResult<Boolean>> deleteResponses = new ArrayList<>();
-        StorageBatch batchRequest;
         Iterator<String> pathIter = paths.iterator();
         while (pathIter.hasNext()) {
-            batchRequest = gcsClient.batch();
+            StorageBatch batchRequest = gcsClient.batch();
             for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) {
                 BlobId blobId = BlobId.of(bucket, config.getPrefix() + pathIter.next());
                 guardian.checkWriteAccess(bucket, blobId.getName());
                 deleteResponses.add(batchRequest.delete(blobId));
             }
-
-            batchRequest.submit();
+            // MB-65432: StorageBatch.submit may not be interrupt-safe; we need to offload onto another thread
+            runOpInterruptibly(batchRequest::submit);
             Iterator<String> deletePathIter = paths.iterator();
             for (StorageBatchResult<Boolean> deleteResponse : deleteResponses) {
                 String deletedPath = deletePathIter.next();
@@ -240,11 +257,12 @@
     }
 
     @Override
-    public long getObjectSize(String bucket, String path) {
+    public long getObjectSize(String bucket, String path) throws HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        Blob blob =
-                gcsClient.get(bucket, config.getPrefix() + path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+        // MB-65432: Storage.get is not interrupt-safe; we need to offload onto another thread
+        Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, config.getPrefix() + path,
+                Storage.BlobGetOption.fields(Storage.BlobField.SIZE)));
         if (blob == null) {
             return 0;
         }
@@ -252,19 +270,22 @@
     }
 
     @Override
-    public boolean exists(String bucket, String path) {
+    public boolean exists(String bucket, String path) throws HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        Blob blob = gcsClient.get(bucket, config.getPrefix() + path,
-                Storage.BlobGetOption.fields(Storage.BlobField.values()));
+        // MB-65432: Storage.get is not interrupt-safe; we need to offload onto another thread
+        Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, config.getPrefix() + path,
+                Storage.BlobGetOption.fields(Storage.BlobField.values())));
         return blob != null && blob.exists();
     }
 
     @Override
-    public boolean isEmptyPrefix(String bucket, String path) {
+    public boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path));
+        // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread
+        Page<Blob> blobs =
+                runOpInterruptibly(() -> gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path)));
         return !blobs.iterateAll().iterator().hasNext();
     }
 
@@ -275,10 +296,12 @@
     }
 
     @Override
-    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) throws HyracksDataException {
         guardian.checkReadAccess(bucket, "/");
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE));
+        // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread
+        Page<Blob> blobs =
+                runOpInterruptibly(() -> gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE)));
         ArrayNode objectsInfo = objectMapper.createArrayNode();
 
         List<Blob> objects = new ArrayList<>();
@@ -313,4 +336,24 @@
     private String stripCloudPrefix(String objectName) {
         return objectName.substring(config.getPrefix().length());
     }
+
+    private void runOpInterruptibly(Runnable operation) throws HyracksDataException {
+        try {
+            executor.submit(operation).get();
+        } catch (InterruptedException e) {
+            throw HyracksDataException.create(e);
+        } catch (ExecutionException e) {
+            throw HyracksDataException.create(e.getCause());
+        }
+    }
+
+    private <T> T runOpInterruptibly(Supplier<T> operation) throws HyracksDataException {
+        try {
+            return executor.submit(operation::get).get();
+        } catch (InterruptedException e) {
+            throw HyracksDataException.create(e);
+        } catch (ExecutionException e) {
+            throw HyracksDataException.create(e.getCause());
+        }
+    }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
index 0d30120..cbbaf31 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -33,6 +33,7 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
 
 import com.google.api.gax.paging.Page;
@@ -50,7 +51,6 @@
 
 public class GCSParallelDownloader implements IParallelDownloader {
 
-    //    private static final Logger LOGGER = LogManager.getLogger();
     private final String bucket;
     private final IOManager ioManager;
     private final Storage gcsClient;
@@ -95,6 +95,7 @@
             downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
                     downConfig.setDownloadDirectory(entry.getKey()).build()));
         }
+        // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no need to offload
         downloadJobs.forEach(DownloadJob::getDownloadResults);
     }
 
@@ -121,6 +122,7 @@
         }
         List<DownloadResult> results;
         for (DownloadJob job : downloadJobs) {
+            // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no need to offload
             results = job.getDownloadResults();
             for (DownloadResult result : results) {
                 if (result.getStatus() != TransferStatus.SUCCESS) {
@@ -134,12 +136,7 @@
 
     @Override
     public void close() throws HyracksDataException {
-        try {
-            transferManager.close();
-            gcsClient.close();
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
+        InvokeUtil.tryWithCleanupsAsHyracks(transferManager::close, gcsClient::close);
     }
 
     private <K, V> void addToMap(Map<K, List<V>> map, K key, V value) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index 89d8fd5..f240ad4 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -108,9 +108,10 @@
         profiler.objectMultipartUpload();
         try {
             writer.close();
-            writer = null;
         } catch (IOException | RuntimeException e) {
             throw HyracksDataException.create(e);
+        } finally {
+            writer = null;
         }
         log("FINISHED");
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 63a8366..0aa1f87 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -65,7 +65,8 @@
     ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException {
         GCSClientConfig config = GCSClientConfig.of(configuration, writeBufferSize);
         return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
-                ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+                ICloudGuardian.NoOpCloudGuardian.INSTANCE,
+                appCtx.getServiceContext().getControllerService().getExecutor());
     }
 
     @Override
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
index d89c872..2f64c37 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.cloud.gcs;
 
+import java.util.concurrent.Executors;
+
 import org.apache.asterix.cloud.AbstractLSMTest;
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
@@ -52,7 +54,8 @@
         int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE);
         GCSClientConfig config =
                 new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, true, 0, writeBufferSize, "");
-        CLOUD_CLIENT = new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+        CLOUD_CLIENT =
+                new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE, Executors.newCachedThreadPool());
     }
 
     private static void cleanup() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
index 73a1392..88e88ab 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
@@ -34,7 +34,7 @@
      * @return the systems state in case the checkpoint upon calling {@link IRecoveryManager#getSystemState()}
      * is missing the checkpoint file
      */
-    IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint();
+    IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint() throws HyracksDataException;
 
     /**
      * Bootstraps the node's partitions directory by doing the following:
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index aef2215..a5f79ac 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -61,7 +61,7 @@
      * @return SystemState The state of the system
      * @throws ACIDException
      */
-    SystemState getSystemState() throws ACIDException;
+    SystemState getSystemState() throws ACIDException, HyracksDataException;
 
     /**
      * Rolls back a transaction.
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index fdda793..46c8ec2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -45,6 +45,8 @@
             return (HyracksDataException) cause;
         } else if (cause instanceof InterruptedException) {
             Thread.currentThread().interrupt();
+        } else if (cause instanceof Error) {
+            throw (Error) cause;
         }
         return new HyracksDataException(cause);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index e7970d3..ff0a3c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -216,6 +216,39 @@
 
     @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" })
     // catching Throwable, instanceofs, false-positive unreachable code
+    public static void tryWithCleanupsAsHyracks(ThrowingAction action, ThrowingAction... cleanups)
+            throws HyracksDataException {
+        Throwable savedT = null;
+        boolean suppressedInterrupted = false;
+        try {
+            action.run();
+        } catch (Throwable t) {
+            savedT = t;
+        } finally {
+            for (ThrowingAction cleanup : cleanups) {
+                try {
+                    cleanup.run();
+                } catch (Throwable t) {
+                    if (savedT != null) {
+                        savedT.addSuppressed(t);
+                        suppressedInterrupted |= t instanceof InterruptedException;
+                    } else {
+                        savedT = t;
+                    }
+                }
+            }
+        }
+        if (savedT == null) {
+            return;
+        }
+        if (suppressedInterrupted) {
+            Thread.currentThread().interrupt();
+        }
+        throw HyracksDataException.create(savedT);
+    }
+
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" })
+    // catching Throwable, instanceofs, false-positive unreachable code
     public static void tryWithCleanups(ThrowingAction action, ThrowingAction... cleanups) throws Exception {
         Throwable savedT = null;
         boolean suppressedInterrupted = false;