[NO ISSUE][CLUS] Ignore Received Heartbeats Before App Initialization
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Ignore received node heartbeats on CC when the CC executor
has not been initialized since nodes are not registered yet.
Change-Id: I65ef92ae3179214f1efecd1bb44da4772b7a3dd9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3395
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 78ed9b9..e022dfe 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.control.cc;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.control.cc.work.ApplicationMessageWork;
@@ -73,9 +74,7 @@
ccs.getWorkQueue().schedule(new UnregisterNodeWork(ccs.getNodeManager(), unf.getNodeId()));
break;
case NODE_HEARTBEAT:
- CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
- ccs.getExecutor().execute(
- new NodeHeartbeatWork(ccs, nhf.getNodeId(), nhf.getHeartbeatData(), nhf.getNcAddress()));
+ processNodeHeartbeat(ccs, fn);
break;
case NOTIFY_JOBLET_CLEANUP:
CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
@@ -170,4 +169,12 @@
LOGGER.warn("Unknown function: " + fn.getFunctionId());
}
}
+
+ private static void processNodeHeartbeat(ClusterControllerService ccs, CCNCFunctions.Function fn) {
+ final ExecutorService executor = ccs.getExecutor();
+ if (executor != null) {
+ CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
+ executor.execute(new NodeHeartbeatWork(ccs, nhf.getNodeId(), nhf.getHeartbeatData(), nhf.getNcAddress()));
+ }
+ }
}