[ASTERIXDB-2284][CLUS] Ensure Node Failure on Heartbeat Miss

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Request the node which exceeded its heartbeat misses
  to shutdown to ensure its failures.
- Ensure thread safety of lastHeartbeatNanoTime in
  NodeControllerState.

Change-Id: I121f85fd858484377a9d888d18c3069c239f00fc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2390
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index 06af01f..415ca81 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -141,7 +141,7 @@
 
     private int rrdPtr;
 
-    private long lastHeartbeatNanoTime;
+    private volatile long lastHeartbeatNanoTime;
 
     private NodeCapacity capacity;
 
@@ -254,10 +254,6 @@
         return System.nanoTime() - lastHeartbeatNanoTime;
     }
 
-    public long getLastHeartbeatNanoTime() {
-        return lastHeartbeatNanoTime;
-    }
-
     public NodeControllerRemoteProxy getNodeController() {
         return nodeController;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 98cf67a..8f73864 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -48,9 +48,11 @@
 import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortCCJobsFunction;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+@NotThreadSafe
 public class NodeManager implements INodeManager {
     private static final Logger LOGGER = LogManager.getLogger();
 
@@ -99,7 +101,7 @@
         // Updates the node registry.
         if (nodeRegistry.containsKey(nodeId)) {
             LOGGER.warn("Node with name " + nodeId + " has already registered; failing the node then re-registering.");
-            removeDeadNode(nodeId);
+            failNonDeadNode(nodeId);
         } else {
             try {
                 // TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
@@ -155,22 +157,23 @@
             Map.Entry<String, NodeControllerState> entry = nodeIterator.next();
             String nodeId = entry.getKey();
             NodeControllerState state = entry.getValue();
-            if (state.nanosSinceLastHeartbeat() >= deadNodeNanosThreshold) {
+            final long nanosSinceLastHeartbeat = state.nanosSinceLastHeartbeat();
+            if (nanosSinceLastHeartbeat >= deadNodeNanosThreshold) {
+                ensureNodeFailure(nodeId, state);
                 deadNodes.add(nodeId);
                 affectedJobIds.addAll(state.getActiveJobIds());
-                // Removes the node from node map.
                 nodeIterator.remove();
-                // Removes the node from IP map.
                 removeNodeFromIpAddressMap(nodeId, state);
-                // Updates the cluster capacity.
                 resourceManager.update(nodeId, new NodeCapacity(0L, 0));
-                LOGGER.info(entry.getKey() + " considered dead");
+                LOGGER.info("{} considered dead. Last heartbeat received {}ms ago. Max miss period: {}ms", nodeId,
+                        TimeUnit.NANOSECONDS.toMillis(nanosSinceLastHeartbeat),
+                        TimeUnit.NANOSECONDS.toMillis(deadNodeNanosThreshold));
             }
         }
         return Pair.of(deadNodes, affectedJobIds);
     }
 
-    public void removeDeadNode(String nodeId) throws HyracksException {
+    private void failNonDeadNode(String nodeId) throws HyracksException {
         NodeControllerState state = nodeRegistry.get(nodeId);
         Set<JobId> affectedJobIds = state.getActiveJobIds();
         // Removes the node from node map.
@@ -196,7 +199,6 @@
         nodeRegistry.forEach(nodeFunction::apply);
     }
 
-    // Removes the entry of the node in <code>ipAddressNodeNameMap</code>.
     private void removeNodeFromIpAddressMap(String nodeId, NodeControllerState ncState) throws HyracksException {
         InetAddress ipAddress = getIpAddress(ncState);
         Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
@@ -209,7 +211,6 @@
         }
     }
 
-    // Retrieves the IP address for a given node.
     private InetAddress getIpAddress(NodeControllerState ncState) throws HyracksException {
         String ipAddress = ncState.getNCConfig().getDataPublicAddress();
         try {
@@ -222,4 +223,15 @@
     private NodeCapacity getAdjustedNodeCapacity(NodeCapacity nodeCapacity) {
         return new NodeCapacity(nodeCapacity.getMemoryByteSize(), nodeCapacity.getCores() * nodeCoresMultiplier);
     }
+
+    private void ensureNodeFailure(String nodeId, NodeControllerState state) {
+        try {
+            LOGGER.info("Requesting node {} to shutdown to ensure failure", nodeId);
+            state.getNodeController().shutdown(false);
+            LOGGER.info("Request to shutdown failed node {} succeeded. false positive heartbeat miss indication",
+                    nodeId);
+        } catch (Exception ignore) {
+            LOGGER.debug(() -> "Ignoring failure on ensuring node " + nodeId + " has failed", ignore);
+        }
+    }
 }