[NO ISSUE][ING] Caller checks for active jobs failures
- user model changes: no
- storage format changes: no
- interface changes: yes
-IActiveEntityEventSubscriber.sync() does not throw failure anymore
and caller is responsible for checking for failures.
details:
- Previously, certain kinds of failures are sometimes thrown
and sometimes not when syncing a subscriber. This lead to
confusions and bugs. After this change, the caller to sync
is responsible for checking for failures.
Change-Id: I85146028be70f4631d1ef2696489a4624bf23ad4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1986
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
index f9357b4..142e84b 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
@@ -43,9 +43,9 @@
/**
* Wait until the terminal event has been received
*
- * @throws Exception
+ * @throws InterruptedException
*/
- void sync() throws HyracksDataException, InterruptedException;
+ void sync() throws InterruptedException;
/**
* callback upon successful subscription
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 3b4b974..368c2da 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -507,8 +507,13 @@
.submit(() -> rt.resumeOrRecover(metadataProvider));
try {
subscriber.sync();
- } catch (Exception e) {
+ if (subscriber.getFailure() != null) {
+ LOGGER.log(Level.WARNING, "Failure while attempting to resume " + entityId,
+ subscriber.getFailure());
+ }
+ } catch (InterruptedException e) {
LOGGER.log(Level.WARNING, "Failure while attempting to resume " + entityId, e);
+ Thread.currentThread().interrupt();
throw HyracksDataException.create(e);
}
} finally {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 124e56e..b710f2b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -90,7 +90,7 @@
((QueryTranslator) statementExecutor).getSessionOutput(), mdProvider, feed, feedConnections,
compilationProvider, storageComponentProvider, statementExecutorFactory, hcc);
JobSpecification feedJob = jobInfo.getLeft();
- IActiveEntityEventSubscriber eventSubscriber =
+ WaitForStateSubscriber eventSubscriber =
new WaitForStateSubscriber(this, Collections.singleton(ActivityState.RUNNING));
feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
// TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs.
@@ -99,6 +99,9 @@
boolean wait = Boolean.parseBoolean(mdProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
JobUtils.runJob(hcc, feedJob, false);
eventSubscriber.sync();
+ if (eventSubscriber.getFailure() != null) {
+ throw eventSubscriber.getFailure();
+ }
if (wait) {
IActiveEntityEventSubscriber stoppedSubscriber = new WaitForStateSubscriber(this,
EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED));
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index 97283b6..e03ee6e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -753,8 +753,8 @@
listener.onStart(Behavior.FAIL_COMPILE);
WaitForStateSubscriber tempFailSubscriber =
new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
- clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE,
- Collections.singletonList(new HyracksDataException("Runtime Failure")));
+ List<Exception> exceptions = Collections.singletonList(new HyracksDataException("Runtime Failure"));
+ clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, exceptions);
// recovery is ongoing
listener.onStart(Behavior.STEP_SUCCEED);
tempFailSubscriber.sync();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
index 961b731..4de5813 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
@@ -51,10 +51,13 @@
@Override
protected void doStart(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException {
- IActiveEntityEventSubscriber eventSubscriber =
+ WaitForStateSubscriber eventSubscriber =
new WaitForStateSubscriber(this, Collections.singleton(ActivityState.RUNNING));
try {
eventSubscriber.sync();
+ if (eventSubscriber.getFailure() != null) {
+ throw eventSubscriber.getFailure();
+ }
} catch (Exception e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
index 905df72..c8d4b53 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
@@ -129,7 +129,7 @@
try {
subscriber.sync();
if (subscriber.getFailure() != null) {
- throw HyracksDataException.create(subscriber.getFailure());
+ throw subscriber.getFailure();
}
} catch (Exception e) {
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
index e21f9eb..880c4b4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
@@ -20,7 +20,6 @@
import org.apache.asterix.active.IActiveEntityEventSubscriber;
import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
public abstract class AbstractSubscriber implements IActiveEntityEventSubscriber {
@@ -48,12 +47,9 @@
}
@Override
- public void sync() throws HyracksDataException, InterruptedException {
+ public void sync() throws InterruptedException {
synchronized (listener) {
while (!done) {
- if (failure != null) {
- throw HyracksDataException.create(failure);
- }
listener.wait();
}
}