[NO ISSUE][HYR] Retry cancelling tasks only after the timeout

- user model changes: no
- storage format changes: no
- interface changes: no

Instead of retrying the tasks cancellation on both
the 5 minutes timeout or interrupt, retry only after the timeout
to avoid retrying with every interrupt that can happen to
the Super Activity.

Ext-ref: MB-65432

Change-Id: Ie585127fe30904f5126bae8867b94ea12cd45762
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19466
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index e031284..a4dd9db 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -52,6 +52,8 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.util.Span;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -282,25 +284,27 @@
     }
 
     private static boolean cancelTasks(List<Future<Void>> tasks, Semaphore completeSemaphore) {
-        boolean interrupted = Thread.interrupted();
-        try {
-            while (true) {
-                for (Future<Void> task : tasks) {
-                    task.cancel(true);
-                }
-                try {
-                    if (completeSemaphore.tryAcquire(5, TimeUnit.MINUTES)) {
-                        return true;
-                    }
-                    LOGGER.warn("not all tasks were cancelled within 5 minutes. retrying cancelling...");
-                } catch (InterruptedException e) {
-                    interrupted = true;
-                }
+        Span retryWait = Span.init(5, TimeUnit.MINUTES);
+        while (true) {
+            for (Future<Void> task : tasks) {
+                task.cancel(true);
             }
-        } finally {
-            if (interrupted) {
-                Thread.currentThread().interrupt();
+            if (acquireUninterruptibly(completeSemaphore, retryWait)) {
+                return true;
             }
+            LOGGER.warn("not all tasks were cancelled within 5 minutes. retrying cancelling...");
         }
     }
+
+    private static boolean acquireUninterruptibly(Semaphore completeSemaphore, Span s) {
+        s.reset();
+        return InvokeUtil.getUninterruptibly(() -> {
+            while (!s.elapsed()) {
+                if (completeSemaphore.tryAcquire(s.remaining(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)) {
+                    return true;
+                }
+            }
+            return false;
+        });
+    }
 }