[NO ISSUE][HYR] Keep trying to cancel job tasks

- user model changes: no
- storage format changes: no
- interface changes: no

When cancelling a job's tasks by interrupting them, sometimes a task
would not respond to the interrupt and continues to work/wait causing
the job cancellation to get stuck. Typically, a task should respond to
the interrupt, however there have been cases where the interrupt status
is cleared due to some ill parts of the task. To account for such cases,
keep trying to cancel the tasks to set the interrupt status again.
Log warning since this is typically a code issue.

Ext-ref: MB-65432

Change-Id: I51fccbceeed0222aeedbaa7b6f138f3ed3e7c44d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19465
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 17f5cb1..e031284 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -35,6 +35,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -51,12 +52,17 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  * The runtime of a SuperActivity, which internally executes a DAG of one-to-one
  * connected activities in a single thread.
  */
 public class SuperActivityOperatorNodePushable implements IOperatorNodePushable {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
     private static final String CLASS_ABBREVIATION = "SAO";
     private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<>();
     private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = new ArrayList<>();
@@ -264,13 +270,37 @@
     }
 
     private void cancelTasks(List<Future<Void>> tasks, Semaphore startSemaphore, Semaphore completeSemaphore) {
+        boolean cancelCompleted = false;
         try {
             startSemaphore.acquireUninterruptibly();
-            for (Future<Void> task : tasks) {
-                task.cancel(true);
+            cancelCompleted = cancelTasks(tasks, completeSemaphore);
+        } finally {
+            if (!cancelCompleted) {
+                completeSemaphore.acquireUninterruptibly();
+            }
+        }
+    }
+
+    private static boolean cancelTasks(List<Future<Void>> tasks, Semaphore completeSemaphore) {
+        boolean interrupted = Thread.interrupted();
+        try {
+            while (true) {
+                for (Future<Void> task : tasks) {
+                    task.cancel(true);
+                }
+                try {
+                    if (completeSemaphore.tryAcquire(5, TimeUnit.MINUTES)) {
+                        return true;
+                    }
+                    LOGGER.warn("not all tasks were cancelled within 5 minutes. retrying cancelling...");
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                }
             }
         } finally {
-            completeSemaphore.acquireUninterruptibly();
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
         }
     }
 }