[NO ISSUE][HYR][RT] Log pre-cancel thread stack on stuck canceled tasks, some interrupt fixes

Ext-ref: MB-65432
Change-Id: I926d16500f404397875c2401a458015f1636e21a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19470
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
index c7d2d94..c394220 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
 
 public abstract class AbstractOneInputPushRuntime implements IPushRuntime {
     protected IFrameWriter writer;
@@ -53,7 +54,7 @@
         try {
             fail();
         } catch (Throwable th) {
-            failure.addSuppressed(th);
+            ExceptionUtils.suppress(failure, th);
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 338c799..856979c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -37,6 +37,7 @@
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -289,6 +290,10 @@
     }
 
     private boolean cancelTasks(List<Future<Void>> tasks, Set<Thread> runningThreads, Semaphore completeSemaphore) {
+        Map<Thread, StackTraceElement[]> preCancelStackTraces = new HashMap<>();
+        for (Thread runningThread : runningThreads) {
+            preCancelStackTraces.put(runningThread, runningThread.getStackTrace());
+        }
         for (Future<Void> task : tasks) {
             task.cancel(true);
         }
@@ -298,8 +303,16 @@
             if (completionPoll.tryAcquireUninterruptibly(completeSemaphore)) {
                 return true;
             }
-            LOGGER.warn("Tasks of job {} did not complete within {}; interrupting running threads {}",
-                    ctx.getJobletContext().getJobId(), completionPoll, runningThreads);
+            preCancelStackTraces.keySet().removeIf(Predicate.not(runningThreads::contains));
+            preCancelStackTraces.forEach((thread, stack) -> {
+                Throwable t = new Throwable(thread.getName() + "pre-interrupt stack");
+                t.setStackTrace(stack);
+                LOGGER.warn("Task of job {} did not complete within {}: ", ctx.getJobletContext().getJobId(),
+                        completionPoll, t);
+            });
+            for (Thread runningThread : runningThreads) {
+                preCancelStackTraces.put(runningThread, runningThread.getStackTrace());
+            }
             interruptRunningThreads(runningThreads);
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
index 5ebfba4..9778a30 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
@@ -96,9 +96,7 @@
             } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable
                 // NOSONAR ignore logging failure
             }
-            if (root != null) {
-                root.addSuppressed(th);
-            }
+            ExceptionUtils.suppress(root, th);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
index be8be9c..9e3d5c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -85,7 +85,9 @@
                 } catch (Throwable e) {
                     // First, clear the interrupted flag
                     interrupted |= Thread.interrupted();
-                    if (!ExceptionUtils.causedByInterrupt(e)) {
+                    if (ExceptionUtils.causedByInterrupt(e)) {
+                        interrupted = true;
+                    } else {
                         // The cause isn't an interruption, rethrow
                         throw e;
                     }
@@ -130,7 +132,9 @@
                 } catch (Throwable e) {
                     // First, clear the interrupted flag
                     interrupted |= Thread.interrupted();
-                    if (!ExceptionUtils.causedByInterrupt(e)) {
+                    if (ExceptionUtils.causedByInterrupt(e)) {
+                        interrupted = true;
+                    } else {
                         // The cause isn't an interruption, rethrow
                         throw e;
                     }
@@ -179,7 +183,11 @@
                 if (retryPolicy == null) {
                     retryPolicy = new ExponentialRetryPolicy(NUMBER_OF_RETRIES, MAX_DELAY_BETWEEN_RETRIES);
                 }
-                if (!retryPolicy.retry(e)) {
+                if (ExceptionUtils.causedByInterrupt(e) && !Thread.currentThread().isInterrupted()) {
+                    LOGGER.warn("Lost suppressed interrupt during ICloudReturnableRequest", e);
+                    Thread.currentThread().interrupt();
+                }
+                if (Thread.currentThread().isInterrupted() || !retryPolicy.retry(e)) {
                     throw HyracksDataException.create(e);
                 }
                 attempt++;