[NO ISSUE][HYR] Retry cancelling tasks only after the timeout
- user model changes: no
- storage format changes: no
- interface changes: no
Instead of retrying the tasks cancellation on both
the 5 minutes timeout or interrupt, retry only after the timeout
to avoid retrying with every interrupt that can happen to
the Super Activity.
Ext-ref: MB-65432
Change-Id: Ie585127fe30904f5126bae8867b94ea12cd45762
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19466
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index e031284..a4dd9db 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -52,6 +52,8 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.util.Span;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -282,25 +284,27 @@
}
private static boolean cancelTasks(List<Future<Void>> tasks, Semaphore completeSemaphore) {
- boolean interrupted = Thread.interrupted();
- try {
- while (true) {
- for (Future<Void> task : tasks) {
- task.cancel(true);
- }
- try {
- if (completeSemaphore.tryAcquire(5, TimeUnit.MINUTES)) {
- return true;
- }
- LOGGER.warn("not all tasks were cancelled within 5 minutes. retrying cancelling...");
- } catch (InterruptedException e) {
- interrupted = true;
- }
+ Span retryWait = Span.init(5, TimeUnit.MINUTES);
+ while (true) {
+ for (Future<Void> task : tasks) {
+ task.cancel(true);
}
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
+ if (acquireUninterruptibly(completeSemaphore, retryWait)) {
+ return true;
}
+ LOGGER.warn("not all tasks were cancelled within 5 minutes. retrying cancelling...");
}
}
+
+ private static boolean acquireUninterruptibly(Semaphore completeSemaphore, Span s) {
+ s.reset();
+ return InvokeUtil.getUninterruptibly(() -> {
+ while (!s.elapsed()) {
+ if (completeSemaphore.tryAcquire(s.remaining(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS)) {
+ return true;
+ }
+ }
+ return false;
+ });
+ }
}