[ASTERIXDB-1076][HYR] Generate heartbeats in their own thread
- Generate & send NC heartbeats in their own thread to prevent starvation
/ scheduling issues
- Fix retries on IPC connections
- Don't spin on heartbeat send failure
Change-Id: Ieae21b1596013a699f27975fb21894244c536395
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2060
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 02a469d..bd5895e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -168,7 +168,7 @@
case MESSAGING_PUBLIC_PORT:
return "Public IP port to announce messaging listener";
case CLUSTER_CONNECT_RETRIES:
- return "Number of attempts to contact CC before giving up";
+ return "Number of attempts to retry contacting CC before giving up";
case IODEVICES:
return "Comma separated list of IO Device mount points";
case NET_THREAD_COUNT:
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 4707487..ede2c41 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -51,8 +51,9 @@
}
@Override
- protected int getRetries(boolean first) {
- return first ? clusterConnectRetries : 0;
+ protected int getMaxRetries(boolean first) {
+ // -1 == retry forever
+ return first ? clusterConnectRetries : -1;
}
@Override
@@ -104,7 +105,7 @@
@Override
public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData);
- ensureIpcHandle().send(-1, fn, null);
+ ensureIpcHandle(0).send(-1, fn, null);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
index d4ccbd9..83972d5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
@@ -45,22 +45,25 @@
}
protected IIPCHandle ensureIpcHandle() throws HyracksDataException {
+ return ensureIpcHandle(getMaxRetries(ipcHandle == null));
+ }
+
+ protected IIPCHandle ensureIpcHandle(int maxRetries) throws HyracksDataException {
+ if (ipcHandle != null && ipcHandle.isConnected()) {
+ return ipcHandle;
+ }
try {
final boolean first = ipcHandle == null;
- if (first || !ipcHandle.isConnected()) {
- if (!first) {
- getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection");
- eventListener.ipcHandleDisconnected(ipcHandle);
- }
- ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first));
- if (ipcHandle.isConnected()) {
- if (first) {
- eventListener.ipcHandleConnected(ipcHandle);
- } else {
- getLogger().warning("ipcHandle " + ipcHandle + " restored");
- eventListener.ipcHandleRestored(ipcHandle);
- }
- }
+ if (!first) {
+ getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection");
+ eventListener.ipcHandleDisconnected(ipcHandle);
+ }
+ ipcHandle = ipc.getHandle(inetSocketAddress, maxRetries);
+ if (first) {
+ eventListener.ipcHandleConnected(ipcHandle);
+ } else {
+ getLogger().warning("ipcHandle " + ipcHandle + " restored");
+ eventListener.ipcHandleRestored(ipcHandle);
}
} catch (IPCException e) {
throw HyracksDataException.create(e);
@@ -68,7 +71,12 @@
return ipcHandle;
}
- protected abstract int getRetries(boolean first);
+ /**
+ * Maximum number of times to retry a failed connection attempt
+ * @param first true if the initial connection attempt (i.e. server start)
+ * @return the maximum number of retries, if any. <0 means retry forever
+ */
+ protected abstract int getMaxRetries(boolean first);
protected abstract Logger getLogger();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 68a5b76..41284a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -48,8 +48,9 @@
}
@Override
- protected int getRetries(boolean first) {
- return 0;
+ protected int getMaxRetries(boolean first) {
+ // -1 == retry forever
+ return -1;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 350343b..69137e5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -27,7 +27,6 @@
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
-import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Hashtable;
@@ -37,6 +36,7 @@
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
@@ -98,6 +98,7 @@
private static final Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
private static final double MEMORY_FUDGE_FACTOR = 0.8;
+ private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1);
private NCConfig ncConfig;
@@ -133,7 +134,7 @@
private NodeParameters nodeParameters;
- private HeartbeatTask heartbeatTask;
+ private Thread heartbeatThread;
private final ServerContext serverCtx;
@@ -308,15 +309,6 @@
workQueue.start();
- heartbeatTask = new HeartbeatTask(ccs);
-
- // Use reflection to set the priority of the timer thread.
- Field threadField = timer.getClass().getDeclaredField("thread");
- threadField.setAccessible(true);
- Thread timerThread = (Thread) threadField.get(timer); // The internal timer thread of the Timer object.
- timerThread.setPriority(Thread.MAX_PRIORITY);
- // Schedule heartbeat generator.
- timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
// Schedule tracing a human-readable datetime
timer.schedule(new TraceCurrentTimeTask(serviceCtx.getTracer()), 0, 60000);
@@ -362,6 +354,12 @@
registrationException);
throw registrationException;
}
+ // Start heartbeat generator.
+ heartbeatThread = new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()), id + "-Heartbeat");
+ heartbeatThread.setPriority(Thread.MAX_PRIORITY);
+ heartbeatThread.setDaemon(true);
+ heartbeatThread.start();
+
serviceCtx.setDistributedState(nodeParameters.getDistributedState());
application.onRegisterNode();
LOGGER.info("Registering with Cluster Controller complete");
@@ -401,7 +399,10 @@
* Stop heartbeat after NC has stopped to avoid false node failure detection
* on CC if an NC takes a long time to stop.
*/
- heartbeatTask.cancel();
+ if (heartbeatThread != null) {
+ heartbeatThread.interrupt();
+ heartbeatThread.join(1000); // give it 1s to stop gracefully
+ }
LOGGER.log(Level.INFO, "Stopped NodeControllerService");
} else {
LOGGER.log(Level.SEVERE, "Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack),
@@ -478,17 +479,16 @@
return workQueue;
}
- public ThreadMXBean getThreadMXBean() {
- return threadMXBean;
- }
-
- private class HeartbeatTask extends TimerTask {
- private IClusterController cc;
+ private class HeartbeatTask implements Runnable {
+ private final Semaphore delayBlock = new Semaphore(0);
+ private final IClusterController cc;
+ private final long heartbeatPeriodNanos;
private final HeartbeatData hbData;
- public HeartbeatTask(IClusterController cc) {
+ HeartbeatTask(IClusterController cc, int heartbeatPeriod) {
this.cc = cc;
+ this.heartbeatPeriodNanos = TimeUnit.MILLISECONDS.toNanos(heartbeatPeriod);
hbData = new HeartbeatData();
hbData.gcCollectionCounts = new long[gcMXBeans.size()];
hbData.gcCollectionTimes = new long[gcMXBeans.size()];
@@ -496,6 +496,28 @@
@Override
public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ long nextFireNanoTime = System.nanoTime() + heartbeatPeriodNanos;
+ final boolean success = execute();
+ sleepUntilNextFire(success ? nextFireNanoTime - System.nanoTime() : ONE_SECOND_NANOS);
+ } catch (InterruptedException e) { // NOSONAR
+ break;
+ }
+ }
+ LOGGER.log(Level.INFO, "Heartbeat thread interrupted; shutting down");
+ }
+
+ private void sleepUntilNextFire(long delayNanos) throws InterruptedException {
+ if (delayNanos > 0) {
+ delayBlock.tryAcquire(delayNanos, TimeUnit.NANOSECONDS); //NOSONAR - ignore result of tryAcquire
+ } else {
+ LOGGER.warning("After sending heartbeat, next one is already late by "
+ + TimeUnit.NANOSECONDS.toMillis(-delayNanos) + "ms; sending without delay");
+ }
+ }
+
+ private boolean execute() throws InterruptedException {
MemoryUsage heapUsage = memoryMXBean.getHeapMemoryUsage();
hbData.heapInitSize = heapUsage.getInit();
hbData.heapUsedSize = heapUsage.getUsed();
@@ -541,8 +563,13 @@
try {
cc.nodeHeartbeat(id, hbData);
+ LOGGER.log(Level.FINE, "Successfully sent heartbeat");
+ return true;
+ } catch (InterruptedException e) {
+ throw e;
} catch (Exception e) {
- LOGGER.log(Level.SEVERE, "Exception sending heartbeat", e);
+ LOGGER.log(Level.SEVERE, "Exception sending heartbeat; will retry after 1s", e);
+ return false;
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index d1659a8..36cf2fd 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -47,6 +47,10 @@
public class IPCConnectionManager {
private static final Logger LOGGER = Logger.getLogger(IPCConnectionManager.class.getName());
+ // TODO(mblow): the next two could be config parameters
+ private static final int INITIAL_RETRY_DELAY_MILLIS = 100;
+ private static final int MAX_RETRY_DELAY_MILLIS = 15000;
+
private final IPCSystem system;
private final NetworkThread networkThread;
@@ -99,9 +103,10 @@
networkThread.selector.wakeup();
}
- IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int retries) throws IOException, InterruptedException {
+ IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int maxRetries) throws IOException, InterruptedException {
IPCHandle handle;
- int attempt = 1;
+ int retries = 0;
+ int delay = INITIAL_RETRY_DELAY_MILLIS;
while (true) {
synchronized (this) {
handle = ipcHandleMap.get(remoteAddress);
@@ -114,19 +119,11 @@
if (handle.waitTillConnected()) {
return handle;
}
- if (retries < 0) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Connection to " + remoteAddress + " failed, retrying...");
- attempt++;
- Thread.sleep(5000);
- }
- } else if (attempt < retries) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Connection to " + remoteAddress + " failed (Attempt " + attempt + " of " + retries
- + ")");
- attempt++;
- Thread.sleep(5000);
- }
+ if (maxRetries < 0 || retries++ < maxRetries) {
+ LOGGER.warning("Connection to " + remoteAddress + " failed; retrying" + (maxRetries <= 0 ? ""
+ : " (retry attempt " + retries + " of " + maxRetries + ") after " + delay + "ms"));
+ Thread.sleep(delay);
+ delay = Math.min(MAX_RETRY_DELAY_MILLIS, (int) (delay * 1.5));
} else {
throw new IOException("Connection failed to " + remoteAddress);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index dea48bd..f27b268 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -68,9 +68,9 @@
return getHandle(remoteAddress, 0);
}
- public IIPCHandle getHandle(InetSocketAddress remoteAddress, int retries) throws IPCException {
+ public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries) throws IPCException {
try {
- return cMgr.getIPCHandle(remoteAddress, retries);
+ return cMgr.getIPCHandle(remoteAddress, maxRetries);
} catch (IOException e) {
throw new IPCException(e);
} catch (InterruptedException e) {