[ASTERIXDB-3570][RT] Re-interrupt running threads on tasks cancelation
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Re-interrupt running threads on tasks cancelation when threads
don't finish within some period of time.
Ext-ref: MB-65432
Change-Id: I7d056402343e9f80610fef88fda8ca3cda729f04
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19469
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 83d0eb8..338c799 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -31,6 +31,7 @@
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
@@ -65,6 +66,7 @@
private static final Logger LOGGER = LogManager.getLogger();
private static final String CLASS_ABBREVIATION = "SAO";
+ private static final long TASKS_COMPLETION_POLL_SECONDS = TimeUnit.MINUTES.toSeconds(2);
private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<>();
private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = new ArrayList<>();
private final Map<ActivityId, IActivity> startActivities;
@@ -226,6 +228,7 @@
private void runInParallel(OperatorNodePushableAction action) throws HyracksDataException {
List<Future<Void>> tasks = new ArrayList<>(operatorNodePushablesBFSOrder.size());
+ Set<Thread> runningThreads = ConcurrentHashMap.newKeySet();
Queue<Throwable> failures = new ArrayBlockingQueue<>(operatorNodePushablesBFSOrder.size());
final Semaphore startSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
final Semaphore completeSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
@@ -233,6 +236,7 @@
try {
for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
tasks.add(ctx.getExecutorService().submit(() -> {
+ runningThreads.add(Thread.currentThread());
startSemaphore.release();
try {
Thread.currentThread().setName(CLASS_ABBREVIATION + ":" + ctx.getJobletContext().getJobId()
@@ -243,6 +247,7 @@
throw th;
} finally {
ctx.unsubscribeThreadFromStats();
+ runningThreads.remove(Thread.currentThread());
completeSemaphore.release();
}
return null;
@@ -264,17 +269,18 @@
}
if (root != null) {
final Throwable failure = root;
- cancelTasks(tasks, startSemaphore, completeSemaphore);
+ cancelTasks(tasks, runningThreads, startSemaphore, completeSemaphore);
failures.forEach(t -> ExceptionUtils.suppress(failure, t));
throw HyracksDataException.create(failure);
}
}
- private void cancelTasks(List<Future<Void>> tasks, Semaphore startSemaphore, Semaphore completeSemaphore) {
+ private void cancelTasks(List<Future<Void>> tasks, Set<Thread> runningThreads, Semaphore startSemaphore,
+ Semaphore completeSemaphore) {
boolean cancelCompleted = false;
try {
startSemaphore.acquireUninterruptibly();
- cancelCompleted = cancelTasks(tasks, completeSemaphore);
+ cancelCompleted = cancelTasks(tasks, runningThreads, completeSemaphore);
} finally {
if (!cancelCompleted) {
completeSemaphore.acquireUninterruptibly();
@@ -282,17 +288,29 @@
}
}
- private static boolean cancelTasks(List<Future<Void>> tasks, Semaphore completeSemaphore) {
- Span retryWait = Span.init(5, TimeUnit.MINUTES);
+ private boolean cancelTasks(List<Future<Void>> tasks, Set<Thread> runningThreads, Semaphore completeSemaphore) {
+ for (Future<Void> task : tasks) {
+ task.cancel(true);
+ }
+ Span completionPoll = Span.init(TASKS_COMPLETION_POLL_SECONDS, TimeUnit.SECONDS);
while (true) {
- for (Future<Void> task : tasks) {
- task.cancel(true);
- }
- retryWait.reset();
- if (retryWait.tryAcquireUninterruptibly(completeSemaphore)) {
+ completionPoll.reset();
+ if (completionPoll.tryAcquireUninterruptibly(completeSemaphore)) {
return true;
}
- LOGGER.warn("not all tasks were cancelled within 5 minutes. retrying cancelling...");
+ LOGGER.warn("Tasks of job {} did not complete within {}; interrupting running threads {}",
+ ctx.getJobletContext().getJobId(), completionPoll, runningThreads);
+ interruptRunningThreads(runningThreads);
+ }
+ }
+
+ private static void interruptRunningThreads(Set<Thread> threads) {
+ for (Thread thread : threads) {
+ try {
+ thread.interrupt();
+ } catch (Throwable t) {
+ LOGGER.debug("failed to interrupt thread {}", thread, t);
+ }
}
}
}