[NO ISSUE][ING] Improvements to active retry policy
- user model changes: no
- storage format changes: no
- interface changes: yes
- IRetryPolicy.retry now takes a Throwable parameter
Details:
- This change improves the retry policy for active
entities by providing the failure causing the last
failure.
- The change also removes the lock over the active
notification handler on the recover call.
Change-Id: I4246e2a276e1f80569a07630e182aab8f49dd115
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2665
Reviewed-by: Michael Blow <mblow@apache.org>
Contrib: Michael Blow <mblow@apache.org>
Integration-Tests: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java
index 6633810..b964430 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java
@@ -28,7 +28,7 @@
}
@Override
- public boolean retry() {
+ public boolean retry(Throwable failure) {
if (attempted < count) {
attempted++;
return true;
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
index a010984..1daf07e 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
@@ -21,7 +21,9 @@
@FunctionalInterface
public interface IRetryPolicy {
/**
+ * @param failure
+ * the cause of the active entity failure
* @return true if one more attempt should be done
*/
- boolean retry();
+ boolean retry(Throwable failure);
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java
index 074f8f4..fde67e6 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java
@@ -27,7 +27,7 @@
}
@Override
- public boolean retry() {
+ public boolean retry(Throwable failure) {
synchronized (listener) {
try {
listener.wait(5000); //NOSONAR this method is being called in a while loop
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
index 1596c17..a48283a 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
@@ -20,7 +20,7 @@
public class NoRetryPolicyFactory implements IRetryPolicyFactory {
public static final NoRetryPolicyFactory INSTANCE = new NoRetryPolicyFactory();
- private static final IRetryPolicy policy = () -> false;
+ private static final IRetryPolicy policy = failure -> false;
private NoRetryPolicyFactory() {
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 053e6cd..1b7d5b9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -207,9 +207,9 @@
}
@Override
- public synchronized void recover() {
+ public void recover() {
LOGGER.log(level, "Starting active recovery");
- for (IActiveEntityEventsListener listener : entityEventListeners.values()) {
+ for (IActiveEntityEventsListener listener : getEventListeners()) {
synchronized (listener) {
LOGGER.log(level, "Entity " + listener.getEntityId() + " is " + listener.getStats());
listener.notifyAll();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 2de8319..1f72856 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -93,6 +93,7 @@
synchronized (listener) {
if (!cancelRecovery) {
listener.setState(ActivityState.PERMANENTLY_FAILED);
+ listener.setRunning(metadataProvider, false);
}
}
} else {
@@ -112,7 +113,7 @@
return null;
}
LOGGER.log(level, "calling the policy");
- while (policy.retry()) {
+ while (policy.retry(failure)) {
synchronized (listener) {
if (cancelRecovery) {
return null;
@@ -170,7 +171,9 @@
return null;
}
if (listener.getState() == ActivityState.TEMPORARILY_FAILED) {
+ LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId());
listener.setState(ActivityState.PERMANENTLY_FAILED);
+ listener.setRunning(metadataProvider, false);
}
listener.notifyAll();
}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
index c4b23ef..8367fa0 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
@@ -213,6 +213,8 @@
return aOrderedListToString((AOrderedList) aObj);
case STRING:
return ((AString) aObj).getStringValue();
+ case BOOLEAN:
+ return ((ABoolean) aObj).getBoolean().toString();
default:
throw new AlgebricksException("value of type " + aObj.getType() + " is not supported yet");
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index cfa6f78..eaec3c3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -84,8 +84,8 @@
private volatile long reqId = 0L;
- private final ExecutorService uninterruptibleExecutor = Executors.newFixedThreadPool(2,
- r -> new Thread(r, "HyracksConnection Uninterrubtible thread: " + r.getClass().getSimpleName()));
+ private final ExecutorService uninterruptibleExecutor =
+ Executors.newFixedThreadPool(2, r -> new Thread(r, "HyracksConnection Uninterrubtible thread: "));
private final BlockingQueue<UnInterruptibleRequest<?>> uninterruptibles = new ArrayBlockingQueue<>(1);
@@ -367,6 +367,11 @@
return null;
}
+ @Override
+ public String toString() {
+ return "CancelJobRequest: " + jobId.toString();
+ }
+
}
private class StartDeployedJobRequest extends UnInterruptibleRequest<JobId> {
@@ -407,24 +412,35 @@
}
}
+ @Override
+ public String toString() {
+ return "StartJobRequest";
+ }
+
}
private class UninterrubtileRequestHandler implements Runnable {
@SuppressWarnings({ "squid:S2189", "squid:S2142" })
@Override
public void run() {
- while (true) {
- try {
- UnInterruptibleRequest<?> next = uninterruptibles.take();
- reqId++;
- running = true;
- next.handle();
- } catch (InterruptedException e) {
- LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
- continue;
- } finally {
- running = false;
+ String nameBefore = Thread.currentThread().getName();
+ Thread.currentThread().setName(nameBefore + getClass().getSimpleName());
+ try {
+ while (true) {
+ try {
+ UnInterruptibleRequest<?> current = uninterruptibles.take();
+ reqId++;
+ running = true;
+ current.handle();
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
+ continue;
+ } finally {
+ running = false;
+ }
}
+ } finally {
+ Thread.currentThread().setName(nameBefore);
}
}
}
@@ -433,25 +449,31 @@
@Override
@SuppressWarnings({ "squid:S2189", "squid:S2142" })
public void run() {
- long currentReqId = 0L;
- long currentTime = System.nanoTime();
- while (true) {
- try {
- TimeUnit.MINUTES.sleep(1);
- } catch (InterruptedException e) {
- LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
- continue;
- }
- if (running) {
- if (reqId == currentReqId) {
- if (TimeUnit.NANOSECONDS.toMinutes(System.nanoTime() - currentTime) > 0) {
- ExitUtil.halt(ExitUtil.EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST);
+ String nameBefore = Thread.currentThread().getName();
+ Thread.currentThread().setName(nameBefore + getClass().getSimpleName());
+ try {
+ long currentReqId = 0L;
+ long currentTime = System.nanoTime();
+ while (true) {
+ try {
+ TimeUnit.MINUTES.sleep(1);
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
+ continue;
+ }
+ if (running) {
+ if (reqId == currentReqId) {
+ if (TimeUnit.NANOSECONDS.toMinutes(System.nanoTime() - currentTime) > 0) {
+ ExitUtil.halt(ExitUtil.EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST);
+ }
+ } else {
+ currentReqId = reqId;
+ currentTime = System.nanoTime();
}
- } else {
- currentReqId = reqId;
- currentTime = System.nanoTime();
}
}
+ } finally {
+ Thread.currentThread().setName(nameBefore);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
index 5964c04..0f36c80 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
@@ -66,11 +66,11 @@
LOGGER.error("{} tasks associated with CC {} failed to complete after {}ms. Giving up",
runningTasks.size(), ccId, TIMEOUT);
logPendingTasks();
- ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
+ ExitUtil.halt(ExitUtil.EC_NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
}
} catch (Throwable th) {
LOGGER.error("Failed to abort all previous tasks associated with CC {}", ccId, th);
- ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
+ ExitUtil.halt(ExitUtil.EC_NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index c8b9112..8500842 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -33,7 +33,7 @@
public static final int EC_ABNORMAL_TERMINATION = 1;
public static final int EC_FAILED_TO_STARTUP = 2;
public static final int EC_FAILED_TO_RECOVER = 3;
- public static final int NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS = 4;
+ public static final int EC_NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS = 4;
public static final int EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST = 5;
public static final int EC_UNHANDLED_EXCEPTION = 11;
public static final int EC_IMMEDIATE_HALT = 33;
@@ -75,8 +75,8 @@
exit(status);
}
- public static void halt(int status) {
- LOGGER.fatal("JVM halting with status " + status + "; bye!", new Throwable("halt stacktrace"));
+ public static synchronized void halt(int status) {
+ LOGGER.fatal("JVM halting with status {}; thread dump at halt: {}", status, ThreadDumpUtil.takeDumpString());
// try to give time for the log to be emitted...
LogManager.shutdown();
Runtime.getRuntime().halt(status);