[NO ISSUE][HYR][RT] Log pre-cancel thread stack on stuck canceled tasks, some interrupt fixes
Ext-ref: MB-65432
Change-Id: I926d16500f404397875c2401a458015f1636e21a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19470
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
index c7d2d94..c394220 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
public abstract class AbstractOneInputPushRuntime implements IPushRuntime {
protected IFrameWriter writer;
@@ -53,7 +54,7 @@
try {
fail();
} catch (Throwable th) {
- failure.addSuppressed(th);
+ ExceptionUtils.suppress(failure, th);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 338c799..856979c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -37,6 +37,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -289,6 +290,10 @@
}
private boolean cancelTasks(List<Future<Void>> tasks, Set<Thread> runningThreads, Semaphore completeSemaphore) {
+ Map<Thread, StackTraceElement[]> preCancelStackTraces = new HashMap<>();
+ for (Thread runningThread : runningThreads) {
+ preCancelStackTraces.put(runningThread, runningThread.getStackTrace());
+ }
for (Future<Void> task : tasks) {
task.cancel(true);
}
@@ -298,8 +303,16 @@
if (completionPoll.tryAcquireUninterruptibly(completeSemaphore)) {
return true;
}
- LOGGER.warn("Tasks of job {} did not complete within {}; interrupting running threads {}",
- ctx.getJobletContext().getJobId(), completionPoll, runningThreads);
+ preCancelStackTraces.keySet().removeIf(Predicate.not(runningThreads::contains));
+ preCancelStackTraces.forEach((thread, stack) -> {
+ Throwable t = new Throwable(thread.getName() + "pre-interrupt stack");
+ t.setStackTrace(stack);
+ LOGGER.warn("Task of job {} did not complete within {}: ", ctx.getJobletContext().getJobId(),
+ completionPoll, t);
+ });
+ for (Thread runningThread : runningThreads) {
+ preCancelStackTraces.put(runningThread, runningThread.getStackTrace());
+ }
interruptRunningThreads(runningThreads);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
index 5ebfba4..9778a30 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
@@ -96,9 +96,7 @@
} catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable
// NOSONAR ignore logging failure
}
- if (root != null) {
- root.addSuppressed(th);
- }
+ ExceptionUtils.suppress(root, th);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
index be8be9c..9e3d5c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -85,7 +85,9 @@
} catch (Throwable e) {
// First, clear the interrupted flag
interrupted |= Thread.interrupted();
- if (!ExceptionUtils.causedByInterrupt(e)) {
+ if (ExceptionUtils.causedByInterrupt(e)) {
+ interrupted = true;
+ } else {
// The cause isn't an interruption, rethrow
throw e;
}
@@ -130,7 +132,9 @@
} catch (Throwable e) {
// First, clear the interrupted flag
interrupted |= Thread.interrupted();
- if (!ExceptionUtils.causedByInterrupt(e)) {
+ if (ExceptionUtils.causedByInterrupt(e)) {
+ interrupted = true;
+ } else {
// The cause isn't an interruption, rethrow
throw e;
}
@@ -179,7 +183,11 @@
if (retryPolicy == null) {
retryPolicy = new ExponentialRetryPolicy(NUMBER_OF_RETRIES, MAX_DELAY_BETWEEN_RETRIES);
}
- if (!retryPolicy.retry(e)) {
+ if (ExceptionUtils.causedByInterrupt(e) && !Thread.currentThread().isInterrupted()) {
+ LOGGER.warn("Lost suppressed interrupt during ICloudReturnableRequest", e);
+ Thread.currentThread().interrupt();
+ }
+ if (Thread.currentThread().isInterrupted() || !retryPolicy.retry(e)) {
throw HyracksDataException.create(e);
}
attempt++;