Merge branch 'gerrit/trinity' into 'gerrit/goldfish'

Ext-ref: MB-63819
Change-Id: I9139a47d2170ac8538fb0dcc1e9001cd81c04aee
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 5fe082b..01e47b5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActiveEvent.Kind;
@@ -68,23 +69,27 @@
 
     @Override
     protected void handle(ActiveEvent event) {
+        resolveListenerForEvent(event).ifPresent(listener -> listener.notify(event));
+    }
+
+    private synchronized Optional<IActiveEntityEventsListener> resolveListenerForEvent(ActiveEvent event) {
         JobId jobId = event.getJobId();
         Kind eventKind = event.getEventKind();
         EntityId entityId = jobId2EntityId.get(jobId);
+        IActiveEntityEventsListener listener = null;
         if (entityId != null) {
-            IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
+            listener = entityEventListeners.get(entityId);
             if (eventKind == Kind.JOB_FINISHED) {
                 LOGGER.debug("removing ingestion job {}", jobId);
                 jobId2EntityId.remove(jobId);
             }
-            if (listener != null) {
-                listener.notify(event);
-            } else {
-                LOGGER.debug("listener not found for entity {} on event={}", entityId, event);
+            if (listener == null) {
+                LOGGER.debug("listener not found for entity {} on event {} for job {}", entityId, event, jobId);
             }
         } else {
-            LOGGER.error("entity not found for event {}", event);
+            LOGGER.log(Level.ERROR, "entity not found for event {} for job {}", eventKind, jobId);
         }
+        return Optional.ofNullable(listener);
     }
 
     // *** IJobLifecycleListener
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
index 85019c5..b07cb71 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
@@ -31,7 +31,7 @@
     private static final Logger LOGGER = LogManager.getLogger();
     protected final String name;
     private final LinkedBlockingQueue<T> eventInbox;
-    private volatile Thread executorThread;
+    private final Thread executorThread;
     private volatile boolean stopped = false;
 
     public SingleThreadEventProcessor(String threadName) {
@@ -43,18 +43,22 @@
 
     @Override
     public final void run() {
-        LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName());
+        LOGGER.info("Started {}", name);
         while (!stopped) {
             try {
                 T event = eventInbox.take();
                 handle(event);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
+                if (!stopped) {
+                    LOGGER.warn("Interrupt while waiting for an event and !stopped");
+                }
+                break;
             } catch (Exception e) {
-                LOGGER.log(Level.ERROR, "Error handling an event", e);
+                LOGGER.error("Error handling an event", e);
             }
         }
-        LOGGER.log(Level.WARN, "Stopped " + Thread.currentThread().getName());
+        LOGGER.info("Stopped {}", name);
     }
 
     protected abstract void handle(T event) throws Exception; //NOSONAR