[NO ISSUE][ING] Improvements to active retry policy

- user model changes: no
- storage format changes: no
- interface changes: yes
  - IRetryPolicy.retry now takes a Throwable parameter

Details:
- This change improves the retry policy for active
  entities by providing the failure causing the last
  failure.
- The change also removes the lock over the active
  notification handler on the recover call.

Change-Id: I4246e2a276e1f80569a07630e182aab8f49dd115
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2665
Reviewed-by: Michael Blow <mblow@apache.org>
Contrib: Michael Blow <mblow@apache.org>
Integration-Tests: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java
index 6633810..b964430 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java
@@ -28,7 +28,7 @@
     }
 
     @Override
-    public boolean retry() {
+    public boolean retry(Throwable failure) {
         if (attempted < count) {
             attempted++;
             return true;
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
index a010984..1daf07e 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
@@ -21,7 +21,9 @@
 @FunctionalInterface
 public interface IRetryPolicy {
     /**
+     * @param failure
+     *            the cause of the active entity failure
      * @return true if one more attempt should be done
      */
-    boolean retry();
+    boolean retry(Throwable failure);
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java
index 074f8f4..fde67e6 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java
@@ -27,7 +27,7 @@
     }
 
     @Override
-    public boolean retry() {
+    public boolean retry(Throwable failure) {
         synchronized (listener) {
             try {
                 listener.wait(5000); //NOSONAR this method is being called in a while loop
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
index 1596c17..a48283a 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
@@ -20,7 +20,7 @@
 
 public class NoRetryPolicyFactory implements IRetryPolicyFactory {
     public static final NoRetryPolicyFactory INSTANCE = new NoRetryPolicyFactory();
-    private static final IRetryPolicy policy = () -> false;
+    private static final IRetryPolicy policy = failure -> false;
 
     private NoRetryPolicyFactory() {
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 053e6cd..1b7d5b9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -207,9 +207,9 @@
     }
 
     @Override
-    public synchronized void recover() {
+    public void recover() {
         LOGGER.log(level, "Starting active recovery");
-        for (IActiveEntityEventsListener listener : entityEventListeners.values()) {
+        for (IActiveEntityEventsListener listener : getEventListeners()) {
             synchronized (listener) {
                 LOGGER.log(level, "Entity " + listener.getEntityId() + " is " + listener.getStats());
                 listener.notifyAll();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 2de8319..1f72856 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -93,6 +93,7 @@
                 synchronized (listener) {
                     if (!cancelRecovery) {
                         listener.setState(ActivityState.PERMANENTLY_FAILED);
+                        listener.setRunning(metadataProvider, false);
                     }
                 }
             } else {
@@ -112,7 +113,7 @@
             return null;
         }
         LOGGER.log(level, "calling the policy");
-        while (policy.retry()) {
+        while (policy.retry(failure)) {
             synchronized (listener) {
                 if (cancelRecovery) {
                     return null;
@@ -170,7 +171,9 @@
                     return null;
                 }
                 if (listener.getState() == ActivityState.TEMPORARILY_FAILED) {
+                    LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId());
                     listener.setState(ActivityState.PERMANENTLY_FAILED);
+                    listener.setRunning(metadataProvider, false);
                 }
                 listener.notifyAll();
             }
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
index c4b23ef..8367fa0 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
@@ -213,6 +213,8 @@
                 return aOrderedListToString((AOrderedList) aObj);
             case STRING:
                 return ((AString) aObj).getStringValue();
+            case BOOLEAN:
+                return ((ABoolean) aObj).getBoolean().toString();
             default:
                 throw new AlgebricksException("value of type " + aObj.getType() + " is not supported yet");
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index cfa6f78..eaec3c3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -84,8 +84,8 @@
 
     private volatile long reqId = 0L;
 
-    private final ExecutorService uninterruptibleExecutor = Executors.newFixedThreadPool(2,
-            r -> new Thread(r, "HyracksConnection Uninterrubtible thread: " + r.getClass().getSimpleName()));
+    private final ExecutorService uninterruptibleExecutor =
+            Executors.newFixedThreadPool(2, r -> new Thread(r, "HyracksConnection Uninterrubtible thread: "));
 
     private final BlockingQueue<UnInterruptibleRequest<?>> uninterruptibles = new ArrayBlockingQueue<>(1);
 
@@ -367,6 +367,11 @@
             return null;
         }
 
+        @Override
+        public String toString() {
+            return "CancelJobRequest: " + jobId.toString();
+        }
+
     }
 
     private class StartDeployedJobRequest extends UnInterruptibleRequest<JobId> {
@@ -407,24 +412,35 @@
             }
         }
 
+        @Override
+        public String toString() {
+            return "StartJobRequest";
+        }
+
     }
 
     private class UninterrubtileRequestHandler implements Runnable {
         @SuppressWarnings({ "squid:S2189", "squid:S2142" })
         @Override
         public void run() {
-            while (true) {
-                try {
-                    UnInterruptibleRequest<?> next = uninterruptibles.take();
-                    reqId++;
-                    running = true;
-                    next.handle();
-                } catch (InterruptedException e) {
-                    LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
-                    continue;
-                } finally {
-                    running = false;
+            String nameBefore = Thread.currentThread().getName();
+            Thread.currentThread().setName(nameBefore + getClass().getSimpleName());
+            try {
+                while (true) {
+                    try {
+                        UnInterruptibleRequest<?> current = uninterruptibles.take();
+                        reqId++;
+                        running = true;
+                        current.handle();
+                    } catch (InterruptedException e) {
+                        LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
+                        continue;
+                    } finally {
+                        running = false;
+                    }
                 }
+            } finally {
+                Thread.currentThread().setName(nameBefore);
             }
         }
     }
@@ -433,25 +449,31 @@
         @Override
         @SuppressWarnings({ "squid:S2189", "squid:S2142" })
         public void run() {
-            long currentReqId = 0L;
-            long currentTime = System.nanoTime();
-            while (true) {
-                try {
-                    TimeUnit.MINUTES.sleep(1);
-                } catch (InterruptedException e) {
-                    LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
-                    continue;
-                }
-                if (running) {
-                    if (reqId == currentReqId) {
-                        if (TimeUnit.NANOSECONDS.toMinutes(System.nanoTime() - currentTime) > 0) {
-                            ExitUtil.halt(ExitUtil.EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST);
+            String nameBefore = Thread.currentThread().getName();
+            Thread.currentThread().setName(nameBefore + getClass().getSimpleName());
+            try {
+                long currentReqId = 0L;
+                long currentTime = System.nanoTime();
+                while (true) {
+                    try {
+                        TimeUnit.MINUTES.sleep(1);
+                    } catch (InterruptedException e) {
+                        LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
+                        continue;
+                    }
+                    if (running) {
+                        if (reqId == currentReqId) {
+                            if (TimeUnit.NANOSECONDS.toMinutes(System.nanoTime() - currentTime) > 0) {
+                                ExitUtil.halt(ExitUtil.EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST);
+                            }
+                        } else {
+                            currentReqId = reqId;
+                            currentTime = System.nanoTime();
                         }
-                    } else {
-                        currentReqId = reqId;
-                        currentTime = System.nanoTime();
                     }
                 }
+            } finally {
+                Thread.currentThread().setName(nameBefore);
             }
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
index 5964c04..0f36c80 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
@@ -66,11 +66,11 @@
                 LOGGER.error("{} tasks associated with CC {} failed to complete after {}ms. Giving up",
                         runningTasks.size(), ccId, TIMEOUT);
                 logPendingTasks();
-                ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
+                ExitUtil.halt(ExitUtil.EC_NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
             }
         } catch (Throwable th) {
             LOGGER.error("Failed to abort all previous tasks associated with CC {}", ccId, th);
-            ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
+            ExitUtil.halt(ExitUtil.EC_NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index c8b9112..8500842 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -33,7 +33,7 @@
     public static final int EC_ABNORMAL_TERMINATION = 1;
     public static final int EC_FAILED_TO_STARTUP = 2;
     public static final int EC_FAILED_TO_RECOVER = 3;
-    public static final int NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS = 4;
+    public static final int EC_NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS = 4;
     public static final int EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST = 5;
     public static final int EC_UNHANDLED_EXCEPTION = 11;
     public static final int EC_IMMEDIATE_HALT = 33;
@@ -75,8 +75,8 @@
         exit(status);
     }
 
-    public static void halt(int status) {
-        LOGGER.fatal("JVM halting with status " + status + "; bye!", new Throwable("halt stacktrace"));
+    public static synchronized void halt(int status) {
+        LOGGER.fatal("JVM halting with status {}; thread dump at halt: {}", status, ThreadDumpUtil.takeDumpString());
         // try to give time for the log to be emitted...
         LogManager.shutdown();
         Runtime.getRuntime().halt(status);