Run Active Shutdown On Separate Thread
- As feed shutdown can be slow, do it on another thread to not tie up
worker.
- use nc thread executor for feed adapter thread
- error handling
Change-Id: I8fd9bc454b290420682160364ac78e4b91a9abc3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1223
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index bd6dae9..b15cfca 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -20,6 +20,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -29,11 +30,14 @@
public class ActiveManager {
private static final Logger LOGGER = Logger.getLogger(ActiveManager.class.getName());
+ private final Executor executor;
private final Map<ActiveRuntimeId, IActiveRuntime> runtimes;
private final ConcurrentFramePool activeFramePool;
private final String nodeId;
- public ActiveManager(String nodeId, long activeMemoryBudget, int frameSize) throws HyracksDataException {
+ public ActiveManager(Executor executor, String nodeId, long activeMemoryBudget, int frameSize)
+ throws HyracksDataException {
+ this.executor = executor;
this.nodeId = nodeId;
this.activeFramePool = new ConcurrentFramePool(nodeId, activeMemoryBudget, frameSize);
this.runtimes = new ConcurrentHashMap<>();
@@ -69,7 +73,7 @@
stopRuntime(message);
break;
default:
- LOGGER.warn("Unknown message type received");
+ LOGGER.warn("Unknown message type received: " + message.getKind());
}
}
@@ -79,12 +83,14 @@
if (runtime == null) {
LOGGER.warn("Request to stop a runtime that is not registered " + runtimeId);
} else {
- try {
- runtime.stop();
- } catch (HyracksDataException | InterruptedException e) {
- // TODO(till) Figure out a better way to handle failure to stop a runtime
- LOGGER.warn("Failed to stop runtime: " + runtimeId, e);
- }
+ executor.execute(() -> {
+ try {
+ runtime.stop();
+ } catch (Exception e) {
+ // TODO(till) Figure out a better way to handle failure to stop a runtime
+ LOGGER.warn("Failed to stop runtime: " + runtimeId, e);
+ }
+ });
}
}
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 1cda298..7f25896 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -55,15 +55,10 @@
@Override
public final void stop() throws HyracksDataException, InterruptedException {
- try {
- abort();
- } finally {
- if (!done) {
- synchronized (this) {
- while (!done) {
- wait();
- }
- }
+ abort();
+ synchronized (this) {
+ while (!done) {
+ wait();
}
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
index 343fdb3..ed081b5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
@@ -216,7 +216,7 @@
isShuttingdown = false;
- activeManager = new ActiveManager(ncApplicationContext.getNodeId(),
+ activeManager = new ActiveManager(threadExecutor, ncApplicationContext.getNodeId(),
feedProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize());
if (ClusterProperties.INSTANCE.isReplicationEnabled()) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
index 424f2dc..7f5372b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -18,13 +18,12 @@
*/
package org.apache.asterix.external.feed.runtime;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.log4j.Logger;
/**
@@ -42,45 +41,42 @@
private final int partition; // The partition number
- private final ExecutorService executorService; // Executor service to run/shutdown the adapter executor
+ private final IHyracksTaskContext ctx;
private IngestionRuntime ingestionRuntime; // Runtime representing the ingestion stage of a feed
+ private Future<?> execution;
+
private volatile boolean done = false;
private volatile boolean failed = false;
- public AdapterRuntimeManager(EntityId entityId, FeedAdapter feedAdapter, IFrameWriter writer, int partition) {
+ public AdapterRuntimeManager(IHyracksTaskContext ctx, EntityId entityId, FeedAdapter feedAdapter,
+ IFrameWriter writer, int partition) {
+ this.ctx = ctx;
this.feedId = entityId;
this.feedAdapter = feedAdapter;
this.partition = partition;
this.adapterExecutor = new AdapterExecutor(writer, feedAdapter, this);
- this.executorService = Executors.newSingleThreadExecutor();
}
public void start() {
- executorService.execute(adapterExecutor);
+ execution = ctx.getExecutorService().submit(adapterExecutor);
}
public void stop() throws InterruptedException {
- boolean stopped = false;
try {
- stopped = feedAdapter.stop();
- } catch (Exception exception) {
- LOGGER.error("Unable to stop adapter " + feedAdapter, exception);
- } finally {
- if (stopped) {
+ if (feedAdapter.stop()) {
// stop() returned true, we wait for the process termination
- executorService.shutdown();
- try {
- executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOGGER.error("Interrupted while waiting for feed adapter to finish its work", e);
- throw e;
- }
+ execution.get();
} else {
// stop() returned false, we try to force shutdown
- executorService.shutdownNow();
+ execution.cancel(true);
}
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted while waiting for feed adapter to finish its work", e);
+ throw e;
+ } catch (Exception exception) {
+ LOGGER.error("Unable to stop adapter " + feedAdapter, exception);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 7c8fe14..afe87c0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -74,7 +74,7 @@
// create the distributor
frameDistributor = new DistributeFeedFrameWriter(feedId, writer, FeedRuntimeType.INTAKE, partition);
// create adapter runtime manager
- adapterRuntimeManager = new AdapterRuntimeManager(feedId, adapter, frameDistributor, partition);
+ adapterRuntimeManager = new AdapterRuntimeManager(ctx, feedId, adapter, frameDistributor, partition);
// create and register the runtime
ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.INTAKE.toString(), partition);
ingestionRuntime = new IngestionRuntime(feedId, runtimeId, frameDistributor, adapterRuntimeManager, ctx);