Merge branch 'gerrit/trinity' into 'gerrit/goldfish'
Ext-ref: MB-63819
Change-Id: I9139a47d2170ac8538fb0dcc1e9001cd81c04aee
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 5fe082b..01e47b5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.ActiveEvent.Kind;
@@ -68,23 +69,27 @@
@Override
protected void handle(ActiveEvent event) {
+ resolveListenerForEvent(event).ifPresent(listener -> listener.notify(event));
+ }
+
+ private synchronized Optional<IActiveEntityEventsListener> resolveListenerForEvent(ActiveEvent event) {
JobId jobId = event.getJobId();
Kind eventKind = event.getEventKind();
EntityId entityId = jobId2EntityId.get(jobId);
+ IActiveEntityEventsListener listener = null;
if (entityId != null) {
- IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
+ listener = entityEventListeners.get(entityId);
if (eventKind == Kind.JOB_FINISHED) {
LOGGER.debug("removing ingestion job {}", jobId);
jobId2EntityId.remove(jobId);
}
- if (listener != null) {
- listener.notify(event);
- } else {
- LOGGER.debug("listener not found for entity {} on event={}", entityId, event);
+ if (listener == null) {
+ LOGGER.debug("listener not found for entity {} on event {} for job {}", entityId, event, jobId);
}
} else {
- LOGGER.error("entity not found for event {}", event);
+ LOGGER.log(Level.ERROR, "entity not found for event {} for job {}", eventKind, jobId);
}
+ return Optional.ofNullable(listener);
}
// *** IJobLifecycleListener
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
index 85019c5..b07cb71 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
@@ -31,7 +31,7 @@
private static final Logger LOGGER = LogManager.getLogger();
protected final String name;
private final LinkedBlockingQueue<T> eventInbox;
- private volatile Thread executorThread;
+ private final Thread executorThread;
private volatile boolean stopped = false;
public SingleThreadEventProcessor(String threadName) {
@@ -43,18 +43,22 @@
@Override
public final void run() {
- LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName());
+ LOGGER.info("Started {}", name);
while (!stopped) {
try {
T event = eventInbox.take();
handle(event);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ if (!stopped) {
+ LOGGER.warn("Interrupt while waiting for an event and !stopped");
+ }
+ break;
} catch (Exception e) {
- LOGGER.log(Level.ERROR, "Error handling an event", e);
+ LOGGER.error("Error handling an event", e);
}
}
- LOGGER.log(Level.WARN, "Stopped " + Thread.currentThread().getName());
+ LOGGER.info("Stopped {}", name);
}
protected abstract void handle(T event) throws Exception; //NOSONAR