[ASTERIXDB-3570][RT] Re-interrupt running threads on tasks cancelation

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Re-interrupt running threads on tasks cancelation when threads
  don't finish within some period of time.

Ext-ref: MB-65432
Change-Id: I7d056402343e9f80610fef88fda8ca3cda729f04
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19469
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 83d0eb8..338c799 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -31,6 +31,7 @@
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
@@ -65,6 +66,7 @@
     private static final Logger LOGGER = LogManager.getLogger();
 
     private static final String CLASS_ABBREVIATION = "SAO";
+    private static final long TASKS_COMPLETION_POLL_SECONDS = TimeUnit.MINUTES.toSeconds(2);
     private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<>();
     private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = new ArrayList<>();
     private final Map<ActivityId, IActivity> startActivities;
@@ -226,6 +228,7 @@
 
     private void runInParallel(OperatorNodePushableAction action) throws HyracksDataException {
         List<Future<Void>> tasks = new ArrayList<>(operatorNodePushablesBFSOrder.size());
+        Set<Thread> runningThreads = ConcurrentHashMap.newKeySet();
         Queue<Throwable> failures = new ArrayBlockingQueue<>(operatorNodePushablesBFSOrder.size());
         final Semaphore startSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
         final Semaphore completeSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
@@ -233,6 +236,7 @@
         try {
             for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
                 tasks.add(ctx.getExecutorService().submit(() -> {
+                    runningThreads.add(Thread.currentThread());
                     startSemaphore.release();
                     try {
                         Thread.currentThread().setName(CLASS_ABBREVIATION + ":" + ctx.getJobletContext().getJobId()
@@ -243,6 +247,7 @@
                         throw th;
                     } finally {
                         ctx.unsubscribeThreadFromStats();
+                        runningThreads.remove(Thread.currentThread());
                         completeSemaphore.release();
                     }
                     return null;
@@ -264,17 +269,18 @@
         }
         if (root != null) {
             final Throwable failure = root;
-            cancelTasks(tasks, startSemaphore, completeSemaphore);
+            cancelTasks(tasks, runningThreads, startSemaphore, completeSemaphore);
             failures.forEach(t -> ExceptionUtils.suppress(failure, t));
             throw HyracksDataException.create(failure);
         }
     }
 
-    private void cancelTasks(List<Future<Void>> tasks, Semaphore startSemaphore, Semaphore completeSemaphore) {
+    private void cancelTasks(List<Future<Void>> tasks, Set<Thread> runningThreads, Semaphore startSemaphore,
+            Semaphore completeSemaphore) {
         boolean cancelCompleted = false;
         try {
             startSemaphore.acquireUninterruptibly();
-            cancelCompleted = cancelTasks(tasks, completeSemaphore);
+            cancelCompleted = cancelTasks(tasks, runningThreads, completeSemaphore);
         } finally {
             if (!cancelCompleted) {
                 completeSemaphore.acquireUninterruptibly();
@@ -282,17 +288,29 @@
         }
     }
 
-    private static boolean cancelTasks(List<Future<Void>> tasks, Semaphore completeSemaphore) {
-        Span retryWait = Span.init(5, TimeUnit.MINUTES);
+    private boolean cancelTasks(List<Future<Void>> tasks, Set<Thread> runningThreads, Semaphore completeSemaphore) {
+        for (Future<Void> task : tasks) {
+            task.cancel(true);
+        }
+        Span completionPoll = Span.init(TASKS_COMPLETION_POLL_SECONDS, TimeUnit.SECONDS);
         while (true) {
-            for (Future<Void> task : tasks) {
-                task.cancel(true);
-            }
-            retryWait.reset();
-            if (retryWait.tryAcquireUninterruptibly(completeSemaphore)) {
+            completionPoll.reset();
+            if (completionPoll.tryAcquireUninterruptibly(completeSemaphore)) {
                 return true;
             }
-            LOGGER.warn("not all tasks were cancelled within 5 minutes. retrying cancelling...");
+            LOGGER.warn("Tasks of job {} did not complete within {}; interrupting running threads {}",
+                    ctx.getJobletContext().getJobId(), completionPoll, runningThreads);
+            interruptRunningThreads(runningThreads);
+        }
+    }
+
+    private static void interruptRunningThreads(Set<Thread> threads) {
+        for (Thread thread : threads) {
+            try {
+                thread.interrupt();
+            } catch (Throwable t) {
+                LOGGER.debug("failed to interrupt thread {}", thread, t);
+            }
         }
     }
 }