[ASTERIXDB-2320][CLUS] Don't delay removing dead node on max heartbeat misses

Once heartbeat miss threshold miss has occurred, we attempt to contact the failed nc
to confirm it is down, and if not, force him to fail and reconnect.  This contact
attempt can take some time, ensure we do not delay marking the node as dead due to
this.

Change-Id: I89c0241fbc88fb6c4150201e4bcba07a3548d3f5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2475
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 8246cad..e1a36cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -94,7 +94,7 @@
     }
 
     @Override
-    public void addNode(String nodeId, NodeControllerState ncState) throws HyracksException {
+    public synchronized void addNode(String nodeId, NodeControllerState ncState) throws HyracksException {
         LOGGER.warn("addNode(" + nodeId + ") called");
         if (nodeId == null || ncState == null) {
             throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
@@ -131,7 +131,7 @@
 
     @Override
     @Idempotent
-    public void removeNode(String nodeId) throws HyracksException {
+    public synchronized void removeNode(String nodeId) throws HyracksException {
         NodeControllerState ncState = nodeRegistry.remove(nodeId);
         if (ncState == null) {
             LOGGER.warn("request to remove unknown node {}; ignoring", nodeId);
@@ -152,7 +152,7 @@
     }
 
     @Override
-    public Pair<Collection<String>, Collection<JobId>> removeDeadNodes() throws HyracksException {
+    public synchronized Pair<Collection<String>, Collection<JobId>> removeDeadNodes() throws HyracksException {
         Set<String> deadNodes = new HashSet<>();
         Set<JobId> affectedJobIds = new HashSet<>();
         Iterator<Map.Entry<String, NodeControllerState>> nodeIterator = nodeRegistry.entrySet().iterator();
@@ -178,7 +178,7 @@
         return Pair.of(deadNodes, affectedJobIds);
     }
 
-    public void failNode(String nodeId) throws HyracksException {
+    public synchronized void failNode(String nodeId) throws HyracksException {
         NodeControllerState state = nodeRegistry.get(nodeId);
         Set<JobId> affectedJobIds = state.getActiveJobIds();
         // Removes the node from node map.
@@ -230,13 +230,15 @@
     }
 
     private void ensureNodeFailure(String nodeId, NodeControllerState state) {
-        try {
-            LOGGER.info("Requesting node {} to shutdown to ensure failure", nodeId);
-            state.getNodeController().shutdown(false);
-            LOGGER.info("Request to shutdown failed node {} succeeded. false positive heartbeat miss indication",
-                    nodeId);
-        } catch (Exception ignore) {
-            LOGGER.debug(() -> "Ignoring failure on ensuring node " + nodeId + " has failed", ignore);
-        }
+        ccs.getExecutor().submit(() -> {
+            try {
+                LOGGER.info("Requesting node {} to shutdown to ensure failure", nodeId);
+                state.getNodeController().shutdown(false);
+                LOGGER.warn("Request to shutdown failed node {} succeeded. false positive heartbeat miss indication",
+                        nodeId);
+            } catch (Exception ignore) {
+                LOGGER.debug(() -> "Ignoring failure on ensuring node " + nodeId + " has failed", ignore);
+            }
+        });
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index 931e436..9959e34 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -22,6 +22,8 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.concurrent.Executors;
+
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -134,6 +136,7 @@
         Mockito.when(ccs.getClusterIPC()).thenReturn(ipcSystem);
         Mockito.when(ipcSystem.getHandle(Mockito.any())).thenReturn(ipcHandle);
         Mockito.when(ipcSystem.getHandle(Mockito.any(), Mockito.anyInt())).thenReturn(ipcHandle);
+        Mockito.when(ccs.getExecutor()).thenReturn(Executors.newCachedThreadPool());
         return ccs;
     }