[ASTERIXDB-1076][HYR] Prevent node death false positives
- Measure actual time since last heartbeat touched, not based on number
of dead cycle detections since last heartbeat received
- Update heartbeat touch on job result received, in addition to when
heartbeat data is received
- Minor refactoring in NC/CC config
Change-Id: Idb1abcc2b783b192b88ed988d398fcfe763531e9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2097
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index f5e94b1..ecf25eb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -166,7 +166,7 @@
ncConfig.getConfigManager().processConfig();
// get initial partitions from config
- String[] nodeStores = ncConfig.getAppConfig().getStringArray(NCConfig.Option.IODEVICES);
+ String[] nodeStores = ncConfig.getNodeScopedAppConfig().getStringArray(NCConfig.Option.IODEVICES);
if (nodeStores == null) {
throw new IllegalStateException("Couldn't find stores for NC: " + ncConfig.getNodeId());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java b/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
index 54ae838..3eff037 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
@@ -56,7 +56,7 @@
ccConfig.setClientListenPort(TEST_HYRACKS_CC_CLIENT_PORT);
ccConfig.setJobHistorySize(0);
ccConfig.setProfileDumpPeriod(-1);
- ccConfig.setHeartbeatPeriod(50);
+ ccConfig.setHeartbeatPeriodMillis(50);
// cluster controller
cc = new ClusterControllerService(ccConfig);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index dfc79ed..a3fbb70 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -207,7 +207,7 @@
webServer.start();
info = new ClusterControllerInfo(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(),
webServer.getListeningPort());
- timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriod());
+ timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriodMillis());
jobLog.open();
startApplication();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index 7be6524..fe4ce89 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -141,7 +141,7 @@
private int rrdPtr;
- private int lastHeartbeatDuration;
+ private long lastHeartbeatNanoTime;
private NodeCapacity capacity;
@@ -207,10 +207,11 @@
rrdPtr = 0;
capacity = reg.getCapacity();
+ touchHeartbeat();
}
public synchronized void notifyHeartbeat(HeartbeatData hbData) {
- lastHeartbeatDuration = 0;
+ touchHeartbeat();
hbTime[rrdPtr] = System.currentTimeMillis();
if (hbData != null) {
heapInitSize[rrdPtr] = hbData.heapInitSize;
@@ -247,8 +248,16 @@
}
}
- public int incrementLastHeartbeatDuration() {
- return lastHeartbeatDuration++;
+ public void touchHeartbeat() {
+ lastHeartbeatNanoTime = System.nanoTime();
+ }
+
+ public long nanosSinceLastHeartbeat() {
+ return System.nanoTime() - lastHeartbeatNanoTime;
+ }
+
+ public long getLastHeartbeatNanoTime() {
+ return lastHeartbeatNanoTime;
}
public NodeControllerRemoteProxy getNodeController() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 2d43d42..a380967 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -29,6 +29,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
@@ -147,11 +148,13 @@
Set<String> deadNodes = new HashSet<>();
Set<JobId> affectedJobIds = new HashSet<>();
Iterator<Map.Entry<String, NodeControllerState>> nodeIterator = nodeRegistry.entrySet().iterator();
+ long deadNodeNanosThreshold = TimeUnit.MILLISECONDS
+ .toNanos(ccConfig.getHeartbeatMaxMisses() * ccConfig.getHeartbeatPeriodMillis());
while (nodeIterator.hasNext()) {
Map.Entry<String, NodeControllerState> entry = nodeIterator.next();
String nodeId = entry.getKey();
NodeControllerState state = entry.getValue();
- if (state.incrementLastHeartbeatDuration() >= ccConfig.getHeartbeatMaxMisses()) {
+ if (state.nanosSinceLastHeartbeat() >= deadNodeNanosThreshold) {
deadNodes.add(nodeId);
affectedJobIds.addAll(state.getActiveJobIds());
// Removes the node from node map.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
index 3babf00..446bfd1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.api.job.ActivityCluster;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
import org.apache.hyracks.control.cc.job.ActivityPlan;
import org.apache.hyracks.control.cc.job.IJobManager;
import org.apache.hyracks.control.cc.job.JobRun;
@@ -75,6 +76,10 @@
}
}
}
+ final NodeControllerState ncState = ccs.getNodeManager().getNodeControllerState(nodeId);
+ if (ncState != null) {
+ ncState.touchHeartbeat();
+ }
}
protected abstract void performEvent(TaskAttempt ta);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 5866ba5..d1d2208 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -67,7 +67,7 @@
NodeParameters params = new NodeParameters();
params.setClusterControllerInfo(ccs.getClusterControllerInfo());
params.setDistributedState(ccs.getContext().getDistributedState());
- params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriod());
+ params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
result = new CCNCFunctions.NodeRegistrationResult(params, null);
ccs.getJobIdFactory().ensureMinimumId(reg.getMaxJobId() + 1);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index cbc6146..c04d5b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -51,7 +51,7 @@
CLIENT_LISTEN_PORT(INTEGER, 1098),
CONSOLE_LISTEN_ADDRESS(STRING, ADDRESS),
CONSOLE_LISTEN_PORT(INTEGER, 16001),
- HEARTBEAT_PERIOD(INTEGER, 10000), // TODO (mblow): add time unit
+ HEARTBEAT_PERIOD(LONG, 10000L), // TODO (mblow): add time unit
HEARTBEAT_MAX_MISSES(INTEGER, 5),
PROFILE_DUMP_PERIOD(INTEGER, 0),
JOB_HISTORY_SIZE(INTEGER, 10),
@@ -176,8 +176,6 @@
}
}
- private final ConfigManager configManager;
-
private List<String> appArgs = new ArrayList<>();
public CCConfig() {
@@ -186,7 +184,6 @@
public CCConfig(ConfigManager configManager) {
super(configManager);
- this.configManager = configManager;
configManager.register(Option.class);
configManager.registerArgsListener(appArgs::addAll);
}
@@ -207,10 +204,6 @@
return configManager.toIni(false);
}
- public ConfigManager getConfigManager() {
- return configManager;
- }
-
// QQQ Note that clusterListenAddress is *not directly used* yet. Both
// the cluster listener and the web server listen on "all interfaces".
// This IP address is only used to instruct the NC on which IP to call in.
@@ -270,11 +263,11 @@
configManager.set(Option.CONSOLE_LISTEN_PORT, consoleListenPort);
}
- public int getHeartbeatPeriod() {
- return getAppConfig().getInt(Option.HEARTBEAT_PERIOD);
+ public long getHeartbeatPeriodMillis() {
+ return getAppConfig().getLong(Option.HEARTBEAT_PERIOD);
}
- public void setHeartbeatPeriod(int heartbeatPeriod) {
+ public void setHeartbeatPeriodMillis(long heartbeatPeriod) {
configManager.set(Option.HEARTBEAT_PERIOD, heartbeatPeriod);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java
index 1745e2a..19c89e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java
@@ -86,6 +86,10 @@
return configManager.getAppConfig();
}
+ public ConfigManager getConfigManager() {
+ return configManager;
+ }
+
public String getConfigFile() {
return getAppConfig().getString(ControllerConfig.Option.CONFIG_FILE);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index bd5895e..e8c96d4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -247,12 +247,7 @@
return appArgs.toArray(new String[appArgs.size()]);
}
- public ConfigManager getConfigManager() {
- return configManager;
- }
-
- @Override
- public IApplicationConfig getAppConfig() {
+ public IApplicationConfig getNodeScopedAppConfig() {
return appConfig;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
index 2264a3f..bf233a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
@@ -29,7 +29,7 @@
private Serializable distributedState;
- private int heartbeatPeriod;
+ private long heartbeatPeriod;
private int profileDumpPeriod;
@@ -49,11 +49,11 @@
this.distributedState = distributedState;
}
- public int getHeartbeatPeriod() {
+ public long getHeartbeatPeriod() {
return heartbeatPeriod;
}
- public void setHeartbeatPeriod(int heartbeatPeriod) {
+ public void setHeartbeatPeriod(long heartbeatPeriod) {
this.heartbeatPeriod = heartbeatPeriod;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 4a2c2e9..75d76c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -366,7 +366,8 @@
}
private void startApplication() throws Exception {
- serviceCtx = new NCServiceContext(this, serverCtx, ioManager, id, memoryManager, lccm, ncConfig.getAppConfig());
+ serviceCtx = new NCServiceContext(this, serverCtx, ioManager, id, memoryManager, lccm,
+ ncConfig.getNodeScopedAppConfig());
application.init(serviceCtx);
executor = Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
application.start(ncConfig.getAppArgsArray());
@@ -486,7 +487,7 @@
private final HeartbeatData hbData;
- HeartbeatTask(IClusterController cc, int heartbeatPeriod) {
+ HeartbeatTask(IClusterController cc, long heartbeatPeriod) {
this.cc = cc;
this.heartbeatPeriodNanos = TimeUnit.MILLISECONDS.toNanos(heartbeatPeriod);
hbData = new HeartbeatData();
@@ -559,7 +560,7 @@
hbData.diskReads = ioCounter.getReads();
hbData.diskWrites = ioCounter.getWrites();
- hbData.numCores = Runtime.getRuntime().availableProcessors() - 1; // Reserves one core for heartbeats.
+ hbData.numCores = Runtime.getRuntime().availableProcessors();
try {
cc.nodeHeartbeat(id, hbData);
@@ -568,7 +569,11 @@
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
- LOGGER.log(Level.SEVERE, "Exception sending heartbeat; will retry after 1s", e);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.log(Level.FINE, "Exception sending heartbeat; will retry after 1s", e);
+ } else {
+ LOGGER.log(Level.SEVERE, "Exception sending heartbeat; will retry after 1s: " + e.toString());
+ }
return false;
}
}