[NO ISSUE][ING] Caller checks for active jobs failures

- user model changes: no
- storage format changes: no
- interface changes: yes
  -IActiveEntityEventSubscriber.sync() does not throw failure anymore
   and caller is responsible for checking for failures.

details:
- Previously, certain kinds of failures are sometimes thrown
  and sometimes not when syncing a subscriber. This lead to
  confusions and bugs. After this change, the caller to sync
  is responsible for checking for failures.

Change-Id: I85146028be70f4631d1ef2696489a4624bf23ad4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1986
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
index f9357b4..142e84b 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventSubscriber.java
@@ -43,9 +43,9 @@
     /**
      * Wait until the terminal event has been received
      *
-     * @throws Exception
+     * @throws InterruptedException
      */
-    void sync() throws HyracksDataException, InterruptedException;
+    void sync() throws InterruptedException;
 
     /**
      * callback upon successful subscription
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 3b4b974..368c2da 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -507,8 +507,13 @@
                     .submit(() -> rt.resumeOrRecover(metadataProvider));
             try {
                 subscriber.sync();
-            } catch (Exception e) {
+                if (subscriber.getFailure() != null) {
+                    LOGGER.log(Level.WARNING, "Failure while attempting to resume " + entityId,
+                            subscriber.getFailure());
+                }
+            } catch (InterruptedException e) {
                 LOGGER.log(Level.WARNING, "Failure while attempting to resume " + entityId, e);
+                Thread.currentThread().interrupt();
                 throw HyracksDataException.create(e);
             }
         } finally {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 124e56e..b710f2b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -90,7 +90,7 @@
                     ((QueryTranslator) statementExecutor).getSessionOutput(), mdProvider, feed, feedConnections,
                     compilationProvider, storageComponentProvider, statementExecutorFactory, hcc);
             JobSpecification feedJob = jobInfo.getLeft();
-            IActiveEntityEventSubscriber eventSubscriber =
+            WaitForStateSubscriber eventSubscriber =
                     new WaitForStateSubscriber(this, Collections.singleton(ActivityState.RUNNING));
             feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId);
             // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs.
@@ -99,6 +99,9 @@
             boolean wait = Boolean.parseBoolean(mdProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION));
             JobUtils.runJob(hcc, feedJob, false);
             eventSubscriber.sync();
+            if (eventSubscriber.getFailure() != null) {
+                throw eventSubscriber.getFailure();
+            }
             if (wait) {
                 IActiveEntityEventSubscriber stoppedSubscriber = new WaitForStateSubscriber(this,
                         EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED));
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index 97283b6..e03ee6e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -753,8 +753,8 @@
         listener.onStart(Behavior.FAIL_COMPILE);
         WaitForStateSubscriber tempFailSubscriber =
                 new WaitForStateSubscriber(listener, EnumSet.of(ActivityState.TEMPORARILY_FAILED));
-        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE,
-                Collections.singletonList(new HyracksDataException("Runtime Failure")));
+        List<Exception> exceptions = Collections.singletonList(new HyracksDataException("Runtime Failure"));
+        clusterController.jobFinish(listener.getJobId(), JobStatus.FAILURE, exceptions);
         // recovery is ongoing
         listener.onStart(Behavior.STEP_SUCCEED);
         tempFailSubscriber.sync();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
index 961b731..4de5813 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
@@ -51,10 +51,13 @@
 
     @Override
     protected void doStart(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException {
-        IActiveEntityEventSubscriber eventSubscriber =
+        WaitForStateSubscriber eventSubscriber =
                 new WaitForStateSubscriber(this, Collections.singleton(ActivityState.RUNNING));
         try {
             eventSubscriber.sync();
+            if (eventSubscriber.getFailure() != null) {
+                throw eventSubscriber.getFailure();
+            }
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
index 905df72..c8d4b53 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
@@ -129,7 +129,7 @@
         try {
             subscriber.sync();
             if (subscriber.getFailure() != null) {
-                throw HyracksDataException.create(subscriber.getFailure());
+                throw subscriber.getFailure();
             }
         } catch (Exception e) {
             throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
index e21f9eb..880c4b4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
@@ -20,7 +20,6 @@
 
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class AbstractSubscriber implements IActiveEntityEventSubscriber {
 
@@ -48,12 +47,9 @@
     }
 
     @Override
-    public void sync() throws HyracksDataException, InterruptedException {
+    public void sync() throws InterruptedException {
         synchronized (listener) {
             while (!done) {
-                if (failure != null) {
-                    throw HyracksDataException.create(failure);
-                }
                 listener.wait();
             }
         }