[NO ISSUE] Add Invoke.runUninterruptible
Add helper method to clear interruption and restore around invocation of an
action
Change-Id: I2f0a5dcb36bfc43168498b9a0a2a4afccedfc4f6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2415
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index 9b356a0..ba4f82d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -39,7 +39,7 @@
* Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible
* completes, the current thread will be re-interrupted, if the original operation was interrupted.
*/
- public static void doUninterruptibly(Interruptible interruptible) {
+ public static void doUninterruptibly(InterruptibleAction interruptible) {
boolean interrupted = false;
try {
while (true) {
@@ -58,10 +58,10 @@
}
/**
- * Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible
+ * Executes the passed action, retrying if the operation is interrupted. Once the interruptible
* completes, the current thread will be re-interrupted, if the original operation was interrupted.
*/
- public static void doExUninterruptibly(ThrowingInterruptible interruptible) throws Exception {
+ public static void doExUninterruptibly(ThrowingAction interruptible) throws Exception {
boolean interrupted = false;
try {
while (true) {
@@ -84,7 +84,7 @@
*
* @return true if the original operation was interrupted, otherwise false
*/
- public static boolean doUninterruptiblyGet(Interruptible interruptible) {
+ public static boolean doUninterruptiblyGet(InterruptibleAction interruptible) {
boolean interrupted = false;
while (true) {
try {
@@ -103,12 +103,12 @@
*
* @return true if the original operation was interrupted, otherwise false
*/
- public static boolean doExUninterruptiblyGet(ThrowingInterruptible interruptible) throws Exception {
+ public static boolean doExUninterruptiblyGet(Callable<Void> interruptible) throws Exception {
boolean interrupted = false;
boolean success = false;
while (true) {
try {
- interruptible.run();
+ interruptible.call();
success = true;
break;
} catch (InterruptedException e) { // NOSONAR- contract states caller must handle
@@ -154,7 +154,7 @@
* {@link InterruptedException}. Once the interruptible completes, the current thread will be re-interrupted, if
* the original operation was interrupted.
*/
- public static void doIoUninterruptibly(ThrowingIOInterruptible interruptible) throws IOException {
+ public static void doIoUninterruptibly(IOInterruptibleAction interruptible) throws IOException {
boolean interrupted = false;
try {
while (true) {
@@ -175,8 +175,7 @@
}
@SuppressWarnings({ "squid:S1181", "squid:S1193" }) // catching Throwable, instanceof of exception
- public static void tryWithCleanups(ThrowingInterruptible action, ThrowingInterruptible... cleanups)
- throws Exception {
+ public static void tryWithCleanups(ThrowingAction action, ThrowingAction... cleanups) throws Exception {
Throwable savedT = null;
boolean suppressedInterrupted = false;
try {
@@ -184,7 +183,7 @@
} catch (Throwable t) {
savedT = t;
} finally {
- for (ThrowingInterruptible cleanup : cleanups) {
+ for (ThrowingAction cleanup : cleanups) {
try {
cleanup.run();
} catch (Throwable t) {
@@ -212,18 +211,39 @@
}
}
+ /**
+ * Runs the supplied action, after suspending any pending interruption. An error will be logged if
+ * the action is itself interrupted.
+ */
+ public static void runUninterruptible(ThrowingAction action) throws Exception {
+ boolean interrupted = Thread.interrupted();
+ try {
+ action.run();
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("uninterruptible action {} was interrupted!", action, e);
+ interrupted = true;
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
@FunctionalInterface
- public interface Interruptible {
+ public interface InterruptibleAction {
void run() throws InterruptedException;
}
@FunctionalInterface
- public interface ThrowingInterruptible {
+ public interface ThrowingAction {
void run() throws Exception; // NOSONAR
}
@FunctionalInterface
- public interface ThrowingIOInterruptible {
+ public interface IOInterruptibleAction {
void run() throws IOException, InterruptedException;
}
}