NC Shutdown Hangs On Stuck Work, Tweak Shutdown Timeouts

1. Fix NC WorkQueue shutdown to interrupt() possibly stuck work
2. Adjust NC shutdown timeout on CC to allow NC to timeout any work it
   is awaiting
3. Improved logging (i.e. /printStackTrace()/LOGGER.log.../)
4. Bypass work queue for shutdown & thread dump request works

Change-Id: I12a9577c570c095afeac882664d29f0c8f53a4ad
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1217
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index f88f30f..fda9cc3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -22,6 +22,7 @@
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
@@ -46,21 +47,22 @@
 
     @Override
     public ClusterControllerInfo getClusterControllerInfo() throws Exception {
-        HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif = new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
+        HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif =
+                new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
         return (ClusterControllerInfo) rpci.call(ipcHandle, gccif);
     }
 
     @Override
     public JobStatus getJobStatus(JobId jobId) throws Exception {
-        HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = new HyracksClientInterfaceFunctions.GetJobStatusFunction(
-                jobId);
+        HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
+                new HyracksClientInterfaceFunctions.GetJobStatusFunction(jobId);
         return (JobStatus) rpci.call(ipcHandle, gjsf);
     }
 
     @Override
     public JobId startJob(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception {
-        HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
-                acggfBytes, jobFlags);
+        HyracksClientInterfaceFunctions.StartJobFunction sjf =
+                new HyracksClientInterfaceFunctions.StartJobFunction(acggfBytes, jobFlags);
         return (JobId) rpci.call(ipcHandle, sjf);
     }
 
@@ -73,47 +75,50 @@
 
     @Override
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
-        HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction gddsf = new HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
+        HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction gddsf =
+                new HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
         return (NetworkAddress) rpci.call(ipcHandle, gddsf);
     }
 
     @Override
     public void waitForCompletion(JobId jobId) throws Exception {
-        HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(
-                jobId);
+        HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
+                new HyracksClientInterfaceFunctions.WaitForCompletionFunction(jobId);
         rpci.call(ipcHandle, wfcf);
     }
 
     @Override
     public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
-        HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
+        HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif =
+                new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
         return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif);
     }
 
     @Override
     public ClusterTopology getClusterTopology() throws Exception {
-        HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf = new HyracksClientInterfaceFunctions.GetClusterTopologyFunction();
+        HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf =
+                new HyracksClientInterfaceFunctions.GetClusterTopologyFunction();
         return (ClusterTopology) rpci.call(ipcHandle, gctf);
     }
 
     @Override
     public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception {
-        HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(
-                binaryURLs, deploymentId);
+        HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
+                new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs, deploymentId);
         rpci.call(ipcHandle, dbf);
     }
 
     @Override
     public void unDeployBinary(DeploymentId deploymentId) throws Exception {
-        HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction dbf = new HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction(
-                deploymentId);
+        HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction dbf =
+                new HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction(deploymentId);
         rpci.call(ipcHandle, dbf);
     }
 
     @Override
     public JobInfo getJobInfo(JobId jobId) throws Exception {
-        HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = new HyracksClientInterfaceFunctions.GetJobInfoFunction(
-                jobId);
+        HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
+                new HyracksClientInterfaceFunctions.GetJobInfoFunction(jobId);
         return (JobInfo) rpci.call(ipcHandle, gjsf);
     }
 
@@ -122,14 +127,15 @@
         HyracksClientInterfaceFunctions.ClusterShutdownFunction csdf =
                 new HyracksClientInterfaceFunctions.ClusterShutdownFunction(terminateNCService);
         rpci.call(ipcHandle, csdf);
-        //give the CC some time to do final settling after it returns our request
-        for (int i = 3; ipcHandle.isConnected() && i > 0; i--) {
+        // give the CC some time to do final settling after it returns our request
+        int seconds = 30;
+        while (ipcHandle.isConnected() && --seconds > 0) {
             synchronized (this) {
-                wait(3000l); //3sec
+                wait(TimeUnit.SECONDS.toMillis(1));
             }
         }
         if (ipcHandle.isConnected()) {
-            throw new IPCException("CC refused to release connection after 9 seconds");
+            throw new IPCException("CC refused to release connection after 30 seconds");
         }
     }
 
@@ -145,6 +151,5 @@
         HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
                 new HyracksClientInterfaceFunctions.ThreadDumpFunction(node);
         return (String)rpci.call(ipcHandle, tdf);
-
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
index 29e1f83..e05dfbc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java
@@ -50,7 +50,7 @@
     public void doRun() {
         try {
             if (ccs.getShutdownRun() != null) {
-                throw new IPCException("Shutdown in Progress");
+                throw new IPCException("Shutdown already in progress");
             }
             Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap();
             Set<String> nodeIds = new TreeSet<>();
@@ -73,12 +73,13 @@
                         /*
                          * wait for all our acks
                          */
+                        LOGGER.info("Waiting for NCs to shutdown...");
                         boolean cleanShutdown = shutdownStatus.waitForCompletion();
                         if (!cleanShutdown) {
                             /*
                              * best effort - just exit, user will have to kill misbehaving NCs
                              */
-                            LOGGER.severe("Clean shutdown of NCs timed out- giving up!  Unresponsive nodes: " +
+                            LOGGER.severe("Clean shutdown of NCs timed out- giving up; unresponsive nodes: " +
                                     shutdownStatus.getRemainingNodes());
                         }
                         callback.setValue(cleanShutdown);
@@ -96,12 +97,13 @@
         }
     }
 
-    protected void shutdownNode(String key, NodeControllerState ncState) {
+    protected void shutdownNode(String nodeId, NodeControllerState ncState) {
         try {
+            LOGGER.info("Notifying NC " + nodeId + " to shutdown...");
             ncState.getNodeController().shutdown(terminateNCService);
         } catch (Exception e) {
-            LOGGER.log(
-                    Level.INFO, "Exception shutting down NC " + key + " (possibly dead?), continuing shutdown...", e);
+            LOGGER.log(Level.INFO,
+                    "Exception shutting down NC " + nodeId + " (possibly dead?), continuing shutdown...", e);
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
index 4e5c98f..0a50f6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java
@@ -21,12 +21,13 @@
 
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 public class ShutdownRun implements IShutdownStatusConditionVariable{
 
     private final Set<String> shutdownNodeIds = new TreeSet<>();
     private boolean shutdownSuccess = false;
-    private static final int SHUTDOWN_TIMER_MS = 10000; //10 seconds
+    private static final long SHUTDOWN_TIMER_MS = TimeUnit.SECONDS.toMillis(30);
 
     public ShutdownRun(Set<String> nodeIds) {
         shutdownNodeIds.addAll(nodeIds);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
index f9df54b..fe0821f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/WorkQueue.java
@@ -45,7 +45,7 @@
             throw new IllegalArgumentException("Illegal thread priority number.");
         }
         this.threadPriority = threadPriority;
-        queue = new LinkedBlockingQueue<AbstractWork>();
+        queue = new LinkedBlockingQueue<>();
         thread = new WorkerThread(id);
         stopSemaphore = new Semaphore(1);
         stopped = true;
@@ -59,6 +59,7 @@
         try {
             stopSemaphore.acquire();
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new HyracksException(e);
         }
         if (DEBUG) {
@@ -73,14 +74,11 @@
         synchronized (this) {
             stopped = true;
         }
-        schedule(new AbstractWork() {
-            @Override
-            public void run() {
-            }
-        });
+        thread.interrupt();
         try {
             stopSemaphore.acquire();
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new HyracksException(e);
         }
     }
@@ -119,8 +117,8 @@
                     }
                     try {
                         r = queue.take();
-                    } catch (InterruptedException e) {
-                        continue;
+                    } catch (InterruptedException e) { // NOSONAR: aborting the thread
+                        break;
                     }
                     if (DEBUG) {
                         LOGGER.log(Level.FINEST,
@@ -133,7 +131,7 @@
                         }
                         r.run();
                     } catch (Exception e) {
-                        e.printStackTrace();
+                        LOGGER.log(Level.WARNING, "Exception while executing " + r, e);
                     }
                 }
             } finally {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index d7facf0..d2d4811 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -98,7 +98,7 @@
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 
 public class NodeControllerService implements IControllerService {
-    private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
 
     private static final double MEMORY_FUDGE_FACTOR = 0.8;
 
@@ -182,7 +182,7 @@
 
         lccm = new LifeCycleComponentManager();
         queue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
-        jobletMap = new Hashtable<JobId, Joblet>();
+        jobletMap = new Hashtable<>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
                 new File(new File(NodeControllerService.class.getName()), id));
@@ -192,7 +192,7 @@
         runtimeMXBean = ManagementFactory.getRuntimeMXBean();
         osMXBean = ManagementFactory.getOperatingSystemMXBean();
         registrationPending = true;
-        getNodeControllerInfosAcceptor = new MutableObject<FutureValue<Map<String, NodeControllerInfo>>>();
+        getNodeControllerInfosAcceptor = new MutableObject<>();
         memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
         ioCounter = new IOCounterFactory().getIOCounter();
     }
@@ -210,7 +210,7 @@
     }
 
     private static List<IODeviceHandle> getDevices(String ioDevices) {
-        List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
+        List<IODeviceHandle> devices = new ArrayList<>();
         StringTokenizer tok = new StringTokenizer(ioDevices, ",");
         while (tok.hasMoreElements()) {
             String devPath = tok.nextToken().trim();
@@ -227,7 +227,7 @@
     }
 
     public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
-        FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<Map<String, NodeControllerInfo>>();
+        FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<>();
         synchronized (getNodeControllerInfosAcceptor) {
             while (getNodeControllerInfosAcceptor.getValue() != null) {
                 getNodeControllerInfosAcceptor.wait();
@@ -350,7 +350,7 @@
             LOGGER.log(Level.INFO, "Stopping NodeControllerService");
             executor.shutdownNow();
             if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
-                LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing shutdown abnormally");
+                LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing with abnormal shutdown");
             }
             partitionManager.close();
             datasetPartitionManager.close();
@@ -480,7 +480,7 @@
             try {
                 cc.nodeHeartbeat(id, hbData);
             } catch (Exception e) {
-                e.printStackTrace();
+                LOGGER.log(Level.SEVERE, "Exception sending heartbeat", e);
             }
         }
     }
@@ -495,7 +495,7 @@
         @Override
         public void run() {
             try {
-                FutureValue<List<JobProfile>> fv = new FutureValue<List<JobProfile>>();
+                FutureValue<List<JobProfile>> fv = new FutureValue<>();
                 BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv);
                 queue.scheduleAndSync(bjpw);
                 List<JobProfile> profiles = fv.get();
@@ -503,7 +503,7 @@
                     cc.reportProfile(id, profiles);
                 }
             } catch (Exception e) {
-                e.printStackTrace();
+                LOGGER.log(Level.WARNING, "Exception reporting profile", e);
             }
         }
     }
@@ -573,15 +573,17 @@
 
                 case SHUTDOWN_REQUEST:
                     final CCNCFunctions.ShutdownRequestFunction sdrf = (CCNCFunctions.ShutdownRequestFunction) fn;
-                    queue.schedule(new ShutdownWork(NodeControllerService.this, sdrf.isTerminateNCService()));
+                    executor.submit(new ShutdownWork(NodeControllerService.this, sdrf.isTerminateNCService()));
                     return;
 
                 case THREAD_DUMP_REQUEST:
                     final CCNCFunctions.ThreadDumpRequestFunction tdrf = (CCNCFunctions.ThreadDumpRequestFunction) fn;
-                    queue.schedule(new NodeThreadDumpWork(NodeControllerService.this, tdrf.getRequestId()));
+                    executor.submit(new NodeThreadDumpWork(NodeControllerService.this, tdrf.getRequestId()));
                     return;
+
+                default:
+                    throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
             }
-            throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
 
         }
     }
@@ -611,15 +613,11 @@
 
         @Override
         public void run() {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Shutdown hook in progress");
-            }
+            LOGGER.info("Shutdown hook in progress");
             try {
                 nodeControllerService.stop();
             } catch (Exception e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Exception in executing shutdown hook" + e);
-                }
+                LOGGER.log(Level.WARNING, "Exception in executing shutdown hook", e);
             }
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ShutdownWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ShutdownWork.java
index 4558a91..c195c98 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ShutdownWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ShutdownWork.java
@@ -27,7 +27,7 @@
 import org.apache.hyracks.control.nc.NodeControllerService;
 
 public class ShutdownWork extends AbstractWork {
-    private static Logger LOGGER = Logger.getLogger(ShutdownWork.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(ShutdownWork.class.getName());
     private final NodeControllerService ncs;
     private final boolean terminateNCService;
 
@@ -43,25 +43,25 @@
             ccs.notifyShutdown(ncs.getId());
         } catch (Exception e) {
             LOGGER.log(Level.WARNING, "Exception notifying CC of shutdown acknowledgment", e);
-            throw new RuntimeException(e);
+            // proceed with shutdown
         }
 
         LOGGER.info("JVM Exiting.. Bye!");
-            //run the shutdown in a new thread, so we don't block this last work task
-            Thread t = new Thread("NC " + ncs.getId() + " Shutdown") {
-                @Override
-                public void run() {
-                    try {
-                        ncs.stop();
-                    } catch (Exception e) {
-                        LOGGER.log(Level.SEVERE, "Exception stopping node controller service", e);
-                    } finally {
-                        Runtime rt = Runtime.getRuntime();
-                        rt.exit(terminateNCService ? 99 : 0);
-                    }
+        //run the shutdown in a new thread, so we don't block this last work task
+        Thread t = new Thread("NC " + ncs.getId() + " Shutdown") {
+            @Override
+            public void run() {
+                try {
+                    ncs.stop();
+                } catch (Exception e) {
+                    LOGGER.log(Level.SEVERE, "Exception stopping node controller service", e);
+                } finally {
+                    Runtime rt = Runtime.getRuntime();
+                    rt.exit(terminateNCService ? 99 : 0);
                 }
-            };
-            t.start();
+            }
+        };
+        t.start();
     }
 
 }