[NO ISSUE][HYR] allow override of logging for retryUntilSuccessOrExhausted
Change-Id: I63b81841059fb2eac174e70d631352fdf685e01f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3179
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index 9413d1b..ffd2956 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -40,6 +40,10 @@
public class InvokeUtil {
private static final Logger LOGGER = LogManager.getLogger();
+ private static final IFailedAttemptCallback defaultFailureCallback =
+ (action, attempt, isFinal, span, failure) -> LOGGER.log(Level.WARN,
+ "failure executing action {} (attempt: {}{})", action, attempt, isFinal ? "" : ", will retry",
+ failure);
private InvokeUtil() {
}
@@ -258,27 +262,38 @@
public static <T> T retryUntilSuccessOrExhausted(Span span, ComputingAction<T> action, IRetryPolicy policy,
IDelay delay) throws HyracksDataException {
+ return retryUntilSuccessOrExhausted(span, action, policy, delay, defaultFailureCallback);
+ }
+
+ public static <T> T retryUntilSuccessOrExhausted(Span span, ComputingAction<T> action, IRetryPolicy policy,
+ IDelay delay, IFailedAttemptCallback onFailure) throws HyracksDataException {
Throwable failure;
int attempt = 0;
- do {
+ while (true) {
attempt++;
try {
return action.compute();
} catch (Throwable th) {
failure = th;
- if (!policy.retry(th)) {
- break;
- }
try {
- LOGGER.log(Level.WARN, "Failure executing action {} for the {} time", action, attempt, failure);
- span.sleep(delay.calculate(attempt), TimeUnit.MILLISECONDS);
+ long delayMs = delay.calculate(attempt);
+ if (!policy.retry(th) || span.remaining(TimeUnit.MILLISECONDS) < delayMs) {
+ onFailure.attemptFailed(action, attempt, true, span, failure);
+ throw HyracksDataException.create(failure);
+ } else {
+ onFailure.attemptFailed(action, attempt, false, span, failure);
+ }
+ span.sleep(delayMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw HyracksDataException.create(e);
}
}
- } while (!span.elapsed());
- LOGGER.log(Level.WARN, "Final Failure executing action {} after {} attempts", action, attempt, failure);
- throw HyracksDataException.create(failure);
+ }
+ }
+
+ @FunctionalInterface
+ public interface IFailedAttemptCallback {
+ void attemptFailed(ComputingAction<?> action, int attempt, boolean isFinal, Span span, Throwable failure);
}
}