[NO ISSUE][ACTIVE] Account for force stop while suspending

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- When a failure happens while trying to suspend ingestion,
  we will force stop the active job. If the job completes
  ungracefully, we set the listener state to TEMPORARILY_FAILED.
  However, since force to stop only waits for STOPPED state,
  the thread waiting for ingestion to be suspended will wait
  forever. This change accounts for such case and makes
  the force stop waits for TEMPORARILY_FAILED too.

Change-Id: Ib33f191be2b84d97a08e3bc6d607b0edbf35bed1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13144
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 0242ecd..ddd3d64 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -195,7 +195,7 @@
     @SuppressWarnings("unchecked")
     protected void finish(ActiveEvent event) throws HyracksDataException {
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "the job " + jobId + " finished");
+            LOGGER.log(level, "the job {} finished", jobId);
         }
         JobId lastJobId = jobId;
         if (numRegistered != numDeRegistered) {
@@ -208,7 +208,7 @@
         JobStatus jobStatus = status.getLeft();
         List<Exception> exceptions = status.getRight();
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "The job finished with status: " + jobStatus);
+            LOGGER.log(level, "The job finished with status: {}", jobStatus);
         }
         if (!jobSuccessfullyTerminated(jobStatus)) {
             jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
@@ -440,8 +440,9 @@
 
     private void cancelJob(Throwable th) {
         cancelJobSafely(metadataProvider, th);
+        // we can come here due to a failure while in suspending state
         final WaitForStateSubscriber cancelSubscriber =
-                new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED));
+                new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED, ActivityState.TEMPORARILY_FAILED));
         final Span span = Span.start(2, TimeUnit.MINUTES);
         InvokeUtil.doUninterruptibly(() -> {
             if (!cancelSubscriber.sync(span)) {
@@ -491,6 +492,7 @@
             forceStop(subscriber, ie);
             Thread.currentThread().interrupt();
         } catch (Throwable e) {
+            LOGGER.error("forcing active job stop due to", e);
             forceStop(subscriber, e);
         } finally {
             Thread.currentThread().setName(nameBefore);