Timeout NC thread dump requests after 60 seconds

Change-Id: If4840c78a6f6a2916ee682a9061df62a50bedc8a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1213
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
index be53232..7931cf8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
@@ -20,6 +20,10 @@
 
 import java.lang.management.ManagementFactory;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
@@ -27,6 +31,9 @@
 import org.apache.hyracks.control.common.work.ThreadDumpWork;
 
 public class GetThreadDumpWork extends ThreadDumpWork {
+    private static final Logger LOGGER = Logger.getLogger(ThreadDumpWork.class.getName());
+    public static final int TIMEOUT_SECS = 60;
+
     private final ClusterControllerService ccs;
     private final String nodeId;
     private final IResultCallback<String> callback;
@@ -55,8 +62,27 @@
                 try {
                     ncState.getNodeController().takeThreadDump(run.getRequestId());
                 } catch (Exception e) {
+                    ccs.removeThreadDumpRun(run.getRequestId());
                     callback.setException(e);
                 }
+                final long requestTime = System.currentTimeMillis();
+                ccs.getExecutor().execute(() -> {
+                    try {
+                        final long queueTime = System.currentTimeMillis() - requestTime;
+                        final long sleepTime = TimeUnit.SECONDS.toMillis(TIMEOUT_SECS) - queueTime;
+                        if (sleepTime > 0) {
+                            Thread.sleep(sleepTime);
+                        }
+                        if (ccs.removeThreadDumpRun(run.getRequestId()) != null) {
+                            LOGGER.log(Level.WARNING, "Timed out thread dump request " + run.getRequestId()
+                                    + " for node " + nodeId);
+                            callback.setException(new TimeoutException("Thread dump request for node " + nodeId
+                                    + " timed out after " + TIMEOUT_SECS + " seconds."));
+                        }
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                });
             }
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyThreadDumpResponse.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyThreadDumpResponse.java
index bbdf211..2dae4b0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyThreadDumpResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyThreadDumpResponse.java
@@ -18,10 +18,13 @@
  */
 package org.apache.hyracks.control.cc.work;
 
+import java.util.logging.Logger;
+
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.work.AbstractWork;
 
 public class NotifyThreadDumpResponse extends AbstractWork {
+    private static final Logger LOGGER = Logger.getLogger(NotifyThreadDumpResponse.class.getName());
 
     private final ClusterControllerService ccs;
 
@@ -36,6 +39,12 @@
 
     @Override
     public void run() {
-        ccs.removeThreadDumpRun(requestId).notifyThreadDumpReceived(threadDumpJSON);
+        LOGGER.fine("Delivering thread dump response: " + requestId);
+        final GetThreadDumpWork.ThreadDumpRun threadDumpRun = ccs.removeThreadDumpRun(requestId);
+        if (threadDumpRun == null) {
+            LOGGER.warning("Thread dump run " + requestId + " not found; discarding reply: " + threadDumpJSON);
+        } else {
+            threadDumpRun.notifyThreadDumpReceived(threadDumpJSON);
+        }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NodeThreadDumpWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NodeThreadDumpWork.java
index 1fc4690..85233b2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NodeThreadDumpWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NodeThreadDumpWork.java
@@ -33,7 +33,6 @@
     @Override
     protected void doRun() throws Exception {
         final String result = takeDump(ncs.getThreadMXBean());
-
         ncs.getClusterController().notifyThreadDump(
                 ncs.getApplicationContext().getNodeId(), requestId, result);
     }