Shutdown ActiveManager Before Killing Thread Executors
Avoid race conditions with start / stop of active runtimes by stopping
the ActiveManager & and started runtimes before terminating the thread
executor
Change-Id: I45e83b0378198f80297fd2741969507741914dea
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1594
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index b15cfca..44d6dae 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -18,11 +18,19 @@
*/
package org.apache.asterix.active;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.common.api.ThreadExecutor;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.memory.ConcurrentFramePool;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.log4j.Logger;
@@ -30,12 +38,15 @@
public class ActiveManager {
private static final Logger LOGGER = Logger.getLogger(ActiveManager.class.getName());
- private final Executor executor;
- private final Map<ActiveRuntimeId, IActiveRuntime> runtimes;
+ private static final int SHUTDOWN_TIMEOUT_SECS = 60;
+
+ private final ThreadExecutor executor;
+ private final ConcurrentMap<ActiveRuntimeId, IActiveRuntime> runtimes;
private final ConcurrentFramePool activeFramePool;
private final String nodeId;
+ private volatile boolean shutdown;
- public ActiveManager(Executor executor, String nodeId, long activeMemoryBudget, int frameSize)
+ public ActiveManager(ThreadExecutor executor, String nodeId, long activeMemoryBudget, int frameSize)
throws HyracksDataException {
this.executor = executor;
this.nodeId = nodeId;
@@ -47,11 +58,11 @@
return activeFramePool;
}
- public void registerRuntime(IActiveRuntime runtime) {
- ActiveRuntimeId id = runtime.getRuntimeId();
- if (!runtimes.containsKey(id)) {
- runtimes.put(id, runtime);
+ public void registerRuntime(IActiveRuntime runtime) throws HyracksDataException {
+ if (shutdown) {
+ throw new RuntimeDataException(ErrorCode.ACTIVE_MANAGER_SHUTDOWN);
}
+ runtimes.putIfAbsent(runtime.getRuntimeId(), runtime);
}
public void deregisterRuntime(ActiveRuntimeId id) {
@@ -77,6 +88,30 @@
}
}
+ public void shutdown() {
+ LOGGER.warn("Shutting down ActiveManager on node " + nodeId);
+ Map<ActiveRuntimeId, Future<Void>> stopFutures = new HashMap<>();
+ shutdown = true;
+ runtimes.forEach((runtimeId, runtime) -> stopFutures.put(runtimeId, executor.submit(() -> {
+ // we may already have been stopped- only stop once
+ stopIfRunning(runtimeId, runtime);
+ return null;
+ })));
+ stopFutures.entrySet().parallelStream().forEach(entry -> {
+ try {
+ entry.getValue().get(SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted waiting to stop runtime: " + entry.getKey());
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ LOGGER.warn("Exception while stopping runtime: " + entry.getKey(), e);
+ } catch (TimeoutException e) {
+ LOGGER.warn("Timed out waiting to stop runtime: " + entry.getKey(), e);
+ }
+ });
+ LOGGER.warn("Shutdown ActiveManager on node " + nodeId + " complete");
+ }
+
private void stopRuntime(ActiveManagerMessage message) {
ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
IActiveRuntime runtime = runtimes.get(runtimeId);
@@ -85,7 +120,7 @@
} else {
executor.execute(() -> {
try {
- runtime.stop();
+ stopIfRunning(runtimeId, runtime);
} catch (Exception e) {
// TODO(till) Figure out a better way to handle failure to stop a runtime
LOGGER.warn("Failed to stop runtime: " + runtimeId, e);
@@ -93,4 +128,14 @@
});
}
}
+
+ private void stopIfRunning(ActiveRuntimeId runtimeId, IActiveRuntime runtime)
+ throws HyracksDataException, InterruptedException {
+ if (runtimes.remove(runtimeId) != null) {
+ runtime.stop();
+ } else {
+ LOGGER.info("Not stopping already stopped runtime " + runtimeId);
+ }
+ }
+
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 55ea971..183a0ed 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -290,6 +290,11 @@
}
@Override
+ public void preStop() throws Exception {
+ activeManager.shutdown();
+ }
+
+ @Override
public void deinitialize() throws HyracksDataException {
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index ce33cef..aac7bc3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -150,6 +150,11 @@
}
@Override
+ public void preStop() throws Exception {
+ runtimeContext.preStop();
+ }
+
+ @Override
public void startupCompleted() throws Exception {
// Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag
final NodeProperties nodeProperties = runtimeContext.getNodeProperties();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index 5df39db..e65759c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -6747,7 +6747,7 @@
<test-case FilePath="load">
<compilation-unit name="file-not-found">
<output-dir compare="Text">none</output-dir>
- <expected-error>org.apache.hyracks.api.exceptions.HyracksDataException: ASX3077: bla: path not found</expected-error>
+ <expected-error>ASX3077: bla: path not found</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="user-defined-functions">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
index 548d714..da4da6b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IAppRuntimeContext.java
@@ -50,6 +50,8 @@
ITransactionSubsystem getTransactionSubsystem();
+ void preStop() throws Exception;
+
boolean isShuttingdown();
ILSMIOOperationScheduler getLSMIOScheduler();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
index 4bc3d82..03cead0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
@@ -37,7 +37,7 @@
executorService.execute(command);
}
- public Future<? extends Object> submit(Callable<? extends Object> command) {
+ public <T> Future<T> submit(Callable<T> command) {
return executorService.submit(command);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 6748287..ad9b4af 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -165,6 +165,7 @@
public static final int UTIL_FILE_SYSTEM_WATCHER_NO_FILES_FOUND = 3076;
public static final int UTIL_LOCAL_FILE_SYSTEM_UTILS_PATH_NOT_FOUND = 3077;
public static final int UTIL_HDFS_UTILS_CANNOT_OBTAIN_HDFS_SCHEDULER = 3078;
+ public static final int ACTIVE_MANAGER_SHUTDOWN = 3079;
private ErrorCode() {
}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index c4698df..7e439dd 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -151,3 +151,4 @@
3076 = %1$s: no files found
3077 = %1$s: path not found
3078 = Cannot obtain hdfs scheduler
+3079 = Cannot register runtime, active manager has been shutdown
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
index 44b1eb5..5b35095 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
@@ -22,5 +22,7 @@
public interface INCApplication extends IApplication {
+ void preStop() throws Exception; //NOSONAR
+
NodeCapacity getCapacity();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index 1f2c7a5..3aa95b3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -70,7 +70,7 @@
*/
@Deprecated
public HyracksException(Throwable cause) {
- this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, null);
+ this(ErrorMessageUtil.NONE, UNKNOWN, String.valueOf(cause), cause, null);
}
/**
@@ -78,7 +78,7 @@
*/
@Deprecated
public HyracksException(Throwable cause, String nodeId) {
- this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, nodeId);
+ this(ErrorMessageUtil.NONE, UNKNOWN, String.valueOf(cause), cause, nodeId);
}
/**
@@ -102,7 +102,7 @@
}
public HyracksException(Throwable cause, int errorCode, Serializable... params) {
- this(ErrorMessageUtil.NONE, errorCode, cause.getMessage(), cause, null, params);
+ this(ErrorMessageUtil.NONE, errorCode, String.valueOf(cause), cause, null, params);
}
public HyracksException(String component, int errorCode, String message, Serializable... params) {
@@ -110,7 +110,7 @@
}
public HyracksException(String component, int errorCode, Throwable cause, Serializable... params) {
- this(component, errorCode, cause.getMessage(), cause, null, params);
+ this(component, errorCode, String.valueOf(cause), cause, null, params);
}
public HyracksException(String component, int errorCode, String message, Throwable cause, Serializable... params) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 5abc2e0..0835551 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -289,6 +289,7 @@
}
private void stopApplication() throws Exception {
+
application.stop();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index 869ab5b..4a5805e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -54,6 +54,11 @@
}
@Override
+ public void preStop() throws Exception {
+ // no-op
+ }
+
+ @Override
public NodeCapacity getCapacity() {
int allCores = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
return new NodeCapacity(Runtime.getRuntime().maxMemory(), allCores > 1 ? allCores - 1 : allCores);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index b893d26..14d221e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -336,6 +336,7 @@
public synchronized void stop() throws Exception {
if (!shuttedDown) {
LOGGER.log(Level.INFO, "Stopping NodeControllerService");
+ application.preStop();
executor.shutdownNow();
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing with abnormal shutdown");
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index e34e551..fe3c02c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -372,10 +372,8 @@
} finally {
collector.close();
}
- } catch (HyracksException e) {
- throw new HyracksDataException(e);
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
index e99e3b2..6986e3d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
@@ -44,6 +44,11 @@
}
@Override
+ public void preStop() throws Exception {
+ // No-op
+ }
+
+ @Override
public NodeCapacity getCapacity() {
return new NodeCapacity(Runtime.getRuntime().maxMemory(), Runtime.getRuntime().availableProcessors() - 1);
}