[NO ISSUE] Allow override of dead node sweeper threshold
Change-Id: I3e534c277e52778ec4551247842acc51960a61b4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2922
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index ae82803..b5dacfb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -217,7 +217,7 @@
webServer.start();
info = new ClusterControllerInfo(ccId, ccConfig.getClientPublicAddress(), ccConfig.getClientPublicPort(),
ccConfig.getConsolePublicPort());
- timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriodMillis());
+ timer.schedule(sweeper, 0, ccConfig.getDeadNodeSweepThreshold());
jobLog.open();
startApplication();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index f83df6e..1cb2d05 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -60,6 +60,7 @@
CONSOLE_PUBLIC_PORT(INTEGER, CONSOLE_LISTEN_PORT),
HEARTBEAT_PERIOD(LONG, 10000L), // TODO (mblow): add time unit
HEARTBEAT_MAX_MISSES(INTEGER, 5),
+ DEAD_NODE_SWEEP_THRESHOLD(LONG, HEARTBEAT_PERIOD),
PROFILE_DUMP_PERIOD(INTEGER, 0),
JOB_HISTORY_SIZE(INTEGER, 10),
RESULT_TTL(LONG, 86400000L), // TODO(mblow): add time unit
@@ -154,7 +155,9 @@
case HEARTBEAT_PERIOD:
return "Sets the time duration between two heartbeats from each node controller in milliseconds";
case HEARTBEAT_MAX_MISSES:
- return "Sets the maximum number of missed heartbeats before a node is marked as dead";
+ return "Sets the maximum number of missed heartbeats before a node can be considered dead";
+ case DEAD_NODE_SWEEP_THRESHOLD:
+ return "Sets the frequency (in milliseconds) to process nodes that can be considered dead";
case PROFILE_DUMP_PERIOD:
return "Sets the time duration between two profile dumps from each node controller in "
+ "milliseconds; 0 to disable";
@@ -326,6 +329,14 @@
configManager.set(Option.HEARTBEAT_MAX_MISSES, heartbeatMaxMisses);
}
+ public long getDeadNodeSweepThreshold() {
+ return getAppConfig().getLong(Option.DEAD_NODE_SWEEP_THRESHOLD);
+ }
+
+ public void setDeadNodeSweepThreshold(long deadNodeSweepThreshold) {
+ configManager.set(Option.DEAD_NODE_SWEEP_THRESHOLD, deadNodeSweepThreshold);
+ }
+
public int getProfileDumpPeriod() {
return getAppConfig().getInt(Option.PROFILE_DUMP_PERIOD);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
index 0ea6399..08b8c11 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
@@ -63,7 +63,6 @@
}
public void notifyAck(HyracksDataException exception) {
- // TODO: we should also reregister in case of no ack
LOGGER.debug("ack rec'd from {} w/ exception: {}", ccId::toString, () -> String.valueOf(exception));
if (exception != null && exception.matches(ErrorCode.HYRACKS, ErrorCode.NO_SUCH_NODE)) {
LOGGER.info("{} indicates it does not recognize us; force a reconnect", ccId);