[NO ISSUE][HYR] allow override of logging for retryUntilSuccessOrExhausted

Change-Id: I63b81841059fb2eac174e70d631352fdf685e01f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3179
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index 9413d1b..ffd2956 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -40,6 +40,10 @@
 public class InvokeUtil {
 
     private static final Logger LOGGER = LogManager.getLogger();
+    private static final IFailedAttemptCallback defaultFailureCallback =
+            (action, attempt, isFinal, span, failure) -> LOGGER.log(Level.WARN,
+                    "failure executing action {} (attempt: {}{})", action, attempt, isFinal ? "" : ", will retry",
+                    failure);
 
     private InvokeUtil() {
     }
@@ -258,27 +262,38 @@
 
     public static <T> T retryUntilSuccessOrExhausted(Span span, ComputingAction<T> action, IRetryPolicy policy,
             IDelay delay) throws HyracksDataException {
+        return retryUntilSuccessOrExhausted(span, action, policy, delay, defaultFailureCallback);
+    }
+
+    public static <T> T retryUntilSuccessOrExhausted(Span span, ComputingAction<T> action, IRetryPolicy policy,
+            IDelay delay, IFailedAttemptCallback onFailure) throws HyracksDataException {
         Throwable failure;
         int attempt = 0;
-        do {
+        while (true) {
             attempt++;
             try {
                 return action.compute();
             } catch (Throwable th) {
                 failure = th;
-                if (!policy.retry(th)) {
-                    break;
-                }
                 try {
-                    LOGGER.log(Level.WARN, "Failure executing action {} for the {} time", action, attempt, failure);
-                    span.sleep(delay.calculate(attempt), TimeUnit.MILLISECONDS);
+                    long delayMs = delay.calculate(attempt);
+                    if (!policy.retry(th) || span.remaining(TimeUnit.MILLISECONDS) < delayMs) {
+                        onFailure.attemptFailed(action, attempt, true, span, failure);
+                        throw HyracksDataException.create(failure);
+                    } else {
+                        onFailure.attemptFailed(action, attempt, false, span, failure);
+                    }
+                    span.sleep(delayMs, TimeUnit.MILLISECONDS);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     throw HyracksDataException.create(e);
                 }
             }
-        } while (!span.elapsed());
-        LOGGER.log(Level.WARN, "Final Failure executing action {} after {} attempts", action, attempt, failure);
-        throw HyracksDataException.create(failure);
+        }
+    }
+
+    @FunctionalInterface
+    public interface IFailedAttemptCallback {
+        void attemptFailed(ComputingAction<?> action, int attempt, boolean isFinal, Span span, Throwable failure);
     }
 }