[ASTERIXDB-2284][CLUS] Ensure Node Failure on Heartbeat Miss
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Request the node which exceeded its heartbeat misses
to shutdown to ensure its failures.
- Ensure thread safety of lastHeartbeatNanoTime in
NodeControllerState.
Change-Id: I121f85fd858484377a9d888d18c3069c239f00fc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2390
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index 06af01f..415ca81 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -141,7 +141,7 @@
private int rrdPtr;
- private long lastHeartbeatNanoTime;
+ private volatile long lastHeartbeatNanoTime;
private NodeCapacity capacity;
@@ -254,10 +254,6 @@
return System.nanoTime() - lastHeartbeatNanoTime;
}
- public long getLastHeartbeatNanoTime() {
- return lastHeartbeatNanoTime;
- }
-
public NodeControllerRemoteProxy getNodeController() {
return nodeController;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 98cf67a..8f73864 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -48,9 +48,11 @@
import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortCCJobsFunction;
import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+@NotThreadSafe
public class NodeManager implements INodeManager {
private static final Logger LOGGER = LogManager.getLogger();
@@ -99,7 +101,7 @@
// Updates the node registry.
if (nodeRegistry.containsKey(nodeId)) {
LOGGER.warn("Node with name " + nodeId + " has already registered; failing the node then re-registering.");
- removeDeadNode(nodeId);
+ failNonDeadNode(nodeId);
} else {
try {
// TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
@@ -155,22 +157,23 @@
Map.Entry<String, NodeControllerState> entry = nodeIterator.next();
String nodeId = entry.getKey();
NodeControllerState state = entry.getValue();
- if (state.nanosSinceLastHeartbeat() >= deadNodeNanosThreshold) {
+ final long nanosSinceLastHeartbeat = state.nanosSinceLastHeartbeat();
+ if (nanosSinceLastHeartbeat >= deadNodeNanosThreshold) {
+ ensureNodeFailure(nodeId, state);
deadNodes.add(nodeId);
affectedJobIds.addAll(state.getActiveJobIds());
- // Removes the node from node map.
nodeIterator.remove();
- // Removes the node from IP map.
removeNodeFromIpAddressMap(nodeId, state);
- // Updates the cluster capacity.
resourceManager.update(nodeId, new NodeCapacity(0L, 0));
- LOGGER.info(entry.getKey() + " considered dead");
+ LOGGER.info("{} considered dead. Last heartbeat received {}ms ago. Max miss period: {}ms", nodeId,
+ TimeUnit.NANOSECONDS.toMillis(nanosSinceLastHeartbeat),
+ TimeUnit.NANOSECONDS.toMillis(deadNodeNanosThreshold));
}
}
return Pair.of(deadNodes, affectedJobIds);
}
- public void removeDeadNode(String nodeId) throws HyracksException {
+ private void failNonDeadNode(String nodeId) throws HyracksException {
NodeControllerState state = nodeRegistry.get(nodeId);
Set<JobId> affectedJobIds = state.getActiveJobIds();
// Removes the node from node map.
@@ -196,7 +199,6 @@
nodeRegistry.forEach(nodeFunction::apply);
}
- // Removes the entry of the node in <code>ipAddressNodeNameMap</code>.
private void removeNodeFromIpAddressMap(String nodeId, NodeControllerState ncState) throws HyracksException {
InetAddress ipAddress = getIpAddress(ncState);
Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
@@ -209,7 +211,6 @@
}
}
- // Retrieves the IP address for a given node.
private InetAddress getIpAddress(NodeControllerState ncState) throws HyracksException {
String ipAddress = ncState.getNCConfig().getDataPublicAddress();
try {
@@ -222,4 +223,15 @@
private NodeCapacity getAdjustedNodeCapacity(NodeCapacity nodeCapacity) {
return new NodeCapacity(nodeCapacity.getMemoryByteSize(), nodeCapacity.getCores() * nodeCoresMultiplier);
}
+
+ private void ensureNodeFailure(String nodeId, NodeControllerState state) {
+ try {
+ LOGGER.info("Requesting node {} to shutdown to ensure failure", nodeId);
+ state.getNodeController().shutdown(false);
+ LOGGER.info("Request to shutdown failed node {} succeeded. false positive heartbeat miss indication",
+ nodeId);
+ } catch (Exception ignore) {
+ LOGGER.debug(() -> "Ignoring failure on ensuring node " + nodeId + " has failed", ignore);
+ }
+ }
}