[NO ISSUE][*DB][CLOUD] Cancel running GCS ops on interrupt
Ext-ref: MB-65432
Change-Id: I840f300f11a5bc2676cd4b542bda40cfb78e64e4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19468
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 99cda9e..16fb278 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -31,8 +31,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.apache.asterix.cloud.IWriteBufferProvider;
@@ -348,12 +350,30 @@
}
private <T> T runOpInterruptibly(Supplier<T> operation) throws HyracksDataException {
+ Future<T> opTask = executor.submit(operation::get);
try {
- return executor.submit(operation::get).get();
+ return opTask.get();
} catch (InterruptedException e) {
+ cancelAndUnwind(opTask);
throw HyracksDataException.create(e);
} catch (ExecutionException e) {
throw HyracksDataException.create(e.getCause());
}
}
+
+ private static <T> void cancelAndUnwind(Future<T> opTask) {
+ opTask.cancel(true);
+ while (true) {
+ try {
+ opTask.get();
+ } catch (InterruptedException e1) {
+ continue;
+ } catch (CancellationException e1) {
+ LOGGER.debug("ignoring exception after cancel of op", e1);
+ } catch (ExecutionException e1) {
+ LOGGER.debug("ignoring exception after cancel of op", e1.getCause());
+ }
+ return;
+ }
+ }
}