[NO ISSUE][CLUS] Asynchronous reregistration with CC
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Perform CC reregistration asynchronously on a
separate thread when connection is restored.
- Ensure a single registration is sent to CC when
connection is restored.
- Restart JVM on unexpected CC registration request
failures.
- Halt JVM on registration failures or timeout.
Change-Id: I404256e6f550c42a6eaf17c0ae4defb7ffb9cb2f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2505
Reviewed-by: Michael Blow <mblow@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index 1c6c98e..dce7d35 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -18,10 +18,15 @@
*/
package org.apache.hyracks.control.nc;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.control.common.base.IClusterController;
import org.apache.hyracks.control.common.controllers.NodeParameters;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
+import org.apache.hyracks.util.ExitUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -31,6 +36,7 @@
private final IClusterController ccs;
private boolean registrationPending;
+ private boolean registrationCompleted;
private Exception registrationException;
private NodeParameters nodeParameters;
@@ -57,12 +63,14 @@
public synchronized CcId registerNode(NodeRegistration nodeRegistration, int registrationId) throws Exception {
registrationPending = true;
ccs.registerNode(nodeRegistration, registrationId);
- while (registrationPending) {
- wait();
+ try {
+ InvokeUtil.runWithTimeout(this::wait, () -> !registrationPending, 2, TimeUnit.MINUTES);
+ } catch (Exception e) {
+ registrationException = e;
}
if (registrationException != null) {
- LOGGER.log(Level.WARN, "Registering with {} failed with exception", this, registrationException);
- throw registrationException;
+ LOGGER.fatal("Registering with {} failed with exception", this, registrationException);
+ ExitUtil.halt(ExitUtil.EC_IMMEDIATE_HALT);
}
return getCcId();
}
@@ -74,4 +82,27 @@
public NodeParameters getNodeParameters() {
return nodeParameters;
}
+
+ public synchronized void notifyConnectionRestored(NodeControllerService ncs, InetSocketAddress ccAddress)
+ throws InterruptedException {
+ if (registrationCompleted) {
+ registrationCompleted = false;
+ ncs.getExecutor().submit(() -> {
+ try {
+ return ncs.registerNode(this, ccAddress);
+ } catch (Exception e) {
+ LOGGER.log(Level.ERROR, "Failed registering with cc", e);
+ throw new IllegalStateException(e);
+ }
+ });
+ }
+ while (!registrationCompleted) {
+ wait();
+ }
+ }
+
+ public synchronized void notifyRegistrationCompleted() {
+ registrationCompleted = true;
+ notifyAll();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0756210..98f5c70 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -352,10 +352,11 @@
@Override
public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
// we need to re-register in case of NC -> CC connection reset
+ final CcConnection ccConnection = getCcConnection(ccAddressMap.get(ccAddress));
try {
- registerNode(getCcConnection(ccAddressMap.get(ccAddress)), ccAddress);
- } catch (Exception e) {
- LOGGER.log(Level.WARN, "Failed Registering with cc", e);
+ ccConnection.notifyConnectionRestored(NodeControllerService.this, ccAddress);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new IPCException(e);
}
}
@@ -412,7 +413,7 @@
}
}
- private CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception {
+ public CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception {
LOGGER.info("Registering with Cluster Controller {}", ccc);
int registrationId = nextRegistrationId.incrementAndGet();
@@ -444,7 +445,7 @@
ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, nodeParameters.getProfileDumpPeriod());
ccTimers.put(ccId, ccTimer);
}
-
+ ccc.notifyRegistrationCompleted();
LOGGER.info("Registering with Cluster Controller {} complete", ccc);
return ccId;
}