[ASTERIXDB-2031][HYR] Kill NC when shutdwon hangs
- user model changes: Shutdown doesn't hang anymore
- storage format changes: no
- interface changes: no
details:
- A watchdog is added to the shutdown hook to ensure it completes
within a pre-specified time window. If the window passes before
shutdown completes, the JVM is killed.
Change-Id: I9de911f81d6b3723e7cc3674bd80d56df8203c0a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1934
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/lifecycle/LifeCycleComponentManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/lifecycle/LifeCycleComponentManager.java
index 4674f9a..76fa322 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/lifecycle/LifeCycleComponentManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/lifecycle/LifeCycleComponentManager.java
@@ -51,11 +51,10 @@
@Override
public void uncaughtException(Thread t, Throwable e) {
- LOGGER.log(Level.SEVERE, "Uncaught Exception from thread " + t.getName(), e);
try {
- stopAll(true);
- } catch (IOException e1) {
- LOGGER.log(Level.SEVERE, "Exception in stopping instance", e1);
+ LOGGER.log(Level.SEVERE, "Uncaught Exception from thread " + t.getName() + ". Calling shutdown hook", e);
+ } finally {
+ Runtime.getRuntime().exit(99);// NOSONAR: It is really required
}
}
@@ -73,13 +72,14 @@
@Override
public synchronized void stopAll(boolean dumpState) throws IOException {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Attempting to stop " + this);
- }
if (stopped) {
LOGGER.info("Lifecycle management was already stopped");
return;
}
+ stopped = true;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Attempting to stop " + this);
+ }
if (stopInitiated) {
LOGGER.info("Stop already in progress");
return;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCShutdownHook.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCShutdownHook.java
index a8b9461..4d0c159 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCShutdownHook.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCShutdownHook.java
@@ -23,21 +23,51 @@
/**
* Shutdown hook that invokes {@link NodeControllerService#stop() stop} method.
+ * This shutdown hook must have a failsafe mechanism to halt the process in case the shutdown
+ * operation is hanging for any reason
*/
public class NCShutdownHook extends Thread {
private static final Logger LOGGER = Logger.getLogger(NCShutdownHook.class.getName());
+ private static final long SHUTDOWN_WAIT_TIME = 10 * 60 * 1000L;
+ private final Thread watchDog;
private final NodeControllerService nodeControllerService;
+ private volatile Thread shutdownHookThread;
+
public NCShutdownHook(NodeControllerService nodeControllerService) {
+ super("ShutdownHook-" + nodeControllerService.getId());
this.nodeControllerService = nodeControllerService;
+ watchDog = new Thread(watch(), "ShutdownHookWatchDog-" + nodeControllerService.getId());
+ }
+
+ private Runnable watch() {
+ return () -> {
+ try {
+ shutdownHookThread.join(SHUTDOWN_WAIT_TIME); // 10 min
+ if (shutdownHookThread.isAlive()) {
+ try {
+ LOGGER.info("Watchdog is angry. Killing shutdown hook");
+ } finally {
+ Runtime.getRuntime().halt(66); // NOSONAR last resort
+ }
+ }
+ } catch (Throwable th) { // NOSONAR must catch them all
+ Runtime.getRuntime().halt(77); // NOSONAR last resort
+ }
+ };
}
@Override
public void run() {
- LOGGER.info("Shutdown hook in progress");
try {
+ try {
+ LOGGER.info("Shutdown hook called");
+ } catch (Throwable th) {//NOSONAR
+ }
+ shutdownHookThread = Thread.currentThread();
+ watchDog.start();
nodeControllerService.stop();
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "Exception in executing shutdown hook", e);
+ } catch (Throwable th) { // NOSONAR... This is fine since this is shutdwon hook
+ LOGGER.log(Level.WARNING, "Exception in executing shutdown hook", th);
}
}
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index b1f39f7..6b97b31 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -170,22 +170,27 @@
}
public NodeControllerService(NCConfig config, INCApplication application) throws IOException, CmdLineException {
- this.ncConfig = config;
- this.configManager = ncConfig.getConfigManager();
+ ncConfig = config;
+ configManager = ncConfig.getConfigManager();
if (application == null) {
throw new IllegalArgumentException("INCApplication cannot be null");
}
configManager.processConfig();
this.application = application;
id = ncConfig.getNodeId();
-
- ioManager =
- new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver());
if (id == null) {
throw new HyracksException("id not set");
}
-
lccm = new LifeCycleComponentManager();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Setting uncaught exception handler " + getLifeCycleComponentManager());
+ }
+ // Set shutdown hook before so it doesn't have the same uncaught exception handler
+ Runtime.getRuntime().addShutdownHook(new NCShutdownHook(this));
+ Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager());
+ ioManager =
+ new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver());
+
workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
jobletMap = new Hashtable<>();
preDistributedJobActivityClusterGraphMap = new Hashtable<>();
@@ -263,11 +268,6 @@
@Override
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting NodeControllerService");
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Setting uncaught exception handler " + getLifeCycleComponentManager());
- }
- Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager());
- Runtime.getRuntime().addShutdownHook(new NCShutdownHook(this));
ipc = new IPCSystem(new InetSocketAddress(ncConfig.getClusterListenAddress(), ncConfig.getClusterListenPort()),
new NodeControllerIPCI(this), new CCNCFunctions.SerializerDeserializer());
ipc.start();
@@ -276,10 +276,8 @@
ncConfig.getNetThreadCount(), ncConfig.getNetBufferCount(), ncConfig.getDataPublicAddress(),
ncConfig.getDataPublicPort(), FullFrameChannelInterfaceFactory.INSTANCE);
netManager.start();
-
startApplication();
init();
-
datasetNetworkManager.start();
if (messagingNetManager != null) {
messagingNetManager.start();
@@ -394,6 +392,8 @@
heartbeatTask.cancel();
LOGGER.log(Level.INFO, "Stopped NodeControllerService");
shuttedDown = true;
+ } else {
+ LOGGER.log(Level.SEVERE, "Double shutdown calls!!", new Exception("Double shutdown calls"));
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
index 260312b..85ba115 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
@@ -23,6 +23,9 @@
public class ByteArrayAccessibleOutputStream extends ByteArrayOutputStream {
+ private static final int MAX_SIZE = 1024 * 1024 * 64;
+ private static final double BUFFER_INCREMENT_FACTOR = 1.5;
+
public ByteArrayAccessibleOutputStream() {
super();
}
@@ -94,7 +97,10 @@
private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = buf.length;
- int newCapacity = oldCapacity << 1;
+ if (oldCapacity == MAX_SIZE) {
+ throw new IllegalArgumentException("Buffer is too large...");
+ }
+ int newCapacity = Math.min((int) (oldCapacity * BUFFER_INCREMENT_FACTOR), MAX_SIZE);
if (newCapacity - minCapacity < 0) {
newCapacity = minCapacity;
}