Run Active Shutdown On Separate Thread

- As feed shutdown can be slow, do it on another thread to not tie up
  worker.
- use nc thread executor for feed adapter thread
- error handling

Change-Id: I8fd9bc454b290420682160364ac78e4b91a9abc3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1223
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index bd6dae9..b15cfca 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -20,6 +20,7 @@
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -29,11 +30,14 @@
 public class ActiveManager {
 
     private static final Logger LOGGER = Logger.getLogger(ActiveManager.class.getName());
+    private final Executor executor;
     private final Map<ActiveRuntimeId, IActiveRuntime> runtimes;
     private final ConcurrentFramePool activeFramePool;
     private final String nodeId;
 
-    public ActiveManager(String nodeId, long activeMemoryBudget, int frameSize) throws HyracksDataException {
+    public ActiveManager(Executor executor, String nodeId, long activeMemoryBudget, int frameSize)
+            throws HyracksDataException {
+        this.executor = executor;
         this.nodeId = nodeId;
         this.activeFramePool = new ConcurrentFramePool(nodeId, activeMemoryBudget, frameSize);
         this.runtimes = new ConcurrentHashMap<>();
@@ -69,7 +73,7 @@
                 stopRuntime(message);
                 break;
             default:
-                LOGGER.warn("Unknown message type received");
+                LOGGER.warn("Unknown message type received: " + message.getKind());
         }
     }
 
@@ -79,12 +83,14 @@
         if (runtime == null) {
             LOGGER.warn("Request to stop a runtime that is not registered " + runtimeId);
         } else {
-            try {
-                runtime.stop();
-            } catch (HyracksDataException | InterruptedException e) {
-                // TODO(till) Figure out a better way to handle failure to stop a runtime
-                LOGGER.warn("Failed to stop runtime: " + runtimeId, e);
-            }
+            executor.execute(() -> {
+                try {
+                    runtime.stop();
+                } catch (Exception e) {
+                    // TODO(till) Figure out a better way to handle failure to stop a runtime
+                    LOGGER.warn("Failed to stop runtime: " + runtimeId, e);
+                }
+            });
         }
     }
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 1cda298..7f25896 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -55,15 +55,10 @@
 
     @Override
     public final void stop() throws HyracksDataException, InterruptedException {
-        try {
-            abort();
-        } finally {
-            if (!done) {
-                synchronized (this) {
-                    while (!done) {
-                        wait();
-                    }
-                }
+        abort();
+        synchronized (this) {
+            while (!done) {
+                wait();
             }
         }
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
index 343fdb3..ed081b5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
@@ -216,7 +216,7 @@
 
         isShuttingdown = false;
 
-        activeManager = new ActiveManager(ncApplicationContext.getNodeId(),
+        activeManager = new ActiveManager(threadExecutor, ncApplicationContext.getNodeId(),
                 feedProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize());
 
         if (ClusterProperties.INSTANCE.isReplicationEnabled()) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
index 424f2dc..7f5372b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -18,13 +18,12 @@
  */
 package org.apache.asterix.external.feed.runtime;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
 
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.log4j.Logger;
 
 /**
@@ -42,45 +41,42 @@
 
     private final int partition; // The partition number
 
-    private final ExecutorService executorService; // Executor service to run/shutdown the adapter executor
+    private final IHyracksTaskContext ctx;
 
     private IngestionRuntime ingestionRuntime; // Runtime representing the ingestion stage of a feed
 
+    private Future<?> execution;
+
     private volatile boolean done = false;
     private volatile boolean failed = false;
 
-    public AdapterRuntimeManager(EntityId entityId, FeedAdapter feedAdapter, IFrameWriter writer, int partition) {
+    public AdapterRuntimeManager(IHyracksTaskContext ctx, EntityId entityId, FeedAdapter feedAdapter,
+                                 IFrameWriter writer, int partition) {
+        this.ctx = ctx;
         this.feedId = entityId;
         this.feedAdapter = feedAdapter;
         this.partition = partition;
         this.adapterExecutor = new AdapterExecutor(writer, feedAdapter, this);
-        this.executorService = Executors.newSingleThreadExecutor();
     }
 
     public void start() {
-        executorService.execute(adapterExecutor);
+        execution = ctx.getExecutorService().submit(adapterExecutor);
     }
 
     public void stop() throws InterruptedException {
-        boolean stopped = false;
         try {
-            stopped = feedAdapter.stop();
-        } catch (Exception exception) {
-            LOGGER.error("Unable to stop adapter " + feedAdapter, exception);
-        } finally {
-            if (stopped) {
+            if (feedAdapter.stop()) {
                 // stop() returned true, we wait for the process termination
-                executorService.shutdown();
-                try {
-                    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
-                } catch (InterruptedException e) {
-                    LOGGER.error("Interrupted while waiting for feed adapter to finish its work", e);
-                    throw e;
-                }
+                execution.get();
             } else {
                 // stop() returned false, we try to force shutdown
-                executorService.shutdownNow();
+                execution.cancel(true);
             }
+        } catch (InterruptedException e) {
+            LOGGER.error("Interrupted while waiting for feed adapter to finish its work", e);
+            throw e;
+        } catch (Exception exception) {
+            LOGGER.error("Unable to stop adapter " + feedAdapter, exception);
         }
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 7c8fe14..afe87c0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -74,7 +74,7 @@
             // create the distributor
             frameDistributor = new DistributeFeedFrameWriter(feedId, writer, FeedRuntimeType.INTAKE, partition);
             // create adapter runtime manager
-            adapterRuntimeManager = new AdapterRuntimeManager(feedId, adapter, frameDistributor, partition);
+            adapterRuntimeManager = new AdapterRuntimeManager(ctx, feedId, adapter, frameDistributor, partition);
             // create and register the runtime
             ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.INTAKE.toString(), partition);
             ingestionRuntime = new IngestionRuntime(feedId, runtimeId, frameDistributor, adapterRuntimeManager, ctx);