Merge branch 'gerrit/trinity' into 'gerrit/goldfish'

Ext-ref: MB-64109
Change-Id: I0b42352d9dc60b34f4359e7f61c61fea5bbb419e
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 0dc761b..ac636b7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -105,6 +105,7 @@
     protected int numDeRegistered;
     protected volatile RecoveryTask rt;
     protected volatile boolean suspended = false;
+    private long suspendCount;
     // failures
     protected Exception jobFailure;
     protected Exception resumeFailure;
@@ -265,6 +266,10 @@
         return jobId;
     }
 
+    public long getSuspendCount() {
+        return suspendCount;
+    }
+
     @Override
     public String getStats() {
         return stats;
@@ -568,6 +573,7 @@
             LOGGER.log(level, "{} waiting for ongoing activities", jobId);
             waitForNonTransitionState();
             LOGGER.log(level, "{} proceeding with suspension. current state is {}", jobId, state);
+            suspendCount++;
             if (state == ActivityState.STOPPED) {
                 suspended = true;
                 return;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index a201ec8..39c1125 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -121,6 +121,7 @@
     protected Void doRecover(IRetryPolicy policy) throws AlgebricksException, InterruptedException {
         LOGGER.log(level, "Actual Recovery task has started");
         Exception failure;
+        long prevSuspendCount;
         do {
             synchronized (listener) {
                 while (!cancelRecovery && !canStartRecovery()) {
@@ -153,6 +154,7 @@
                 listener.setState(ActivityState.TEMPORARILY_FAILED);
                 failure = e;
             } finally {
+                prevSuspendCount = listener.getSuspendCount();
                 releaseRecoveryLocks(metadataProvider);
             }
         } while (policy.retry(failure));
@@ -173,20 +175,28 @@
         }
         IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
         IMetadataLockUtil lockUtil = metadataProvider.getApplicationContext().getMetadataLockUtil();
+        boolean retryRecovery = false;
         try {
             acquirePostRecoveryLocks(lockManager, lockUtil);
             synchronized (listener) {
                 if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
-                    LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId());
-                    listener.setState(ActivityState.STOPPED);
-                    listener.setRunning(metadataProvider, false);
+                    if (prevSuspendCount == listener.getSuspendCount()) {
+                        LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId());
+                        listener.setState(ActivityState.STOPPED);
+                        listener.setRunning(metadataProvider, false);
+                    } else {
+                        LOGGER.log(level,
+                                "Retrying recovery on non-retryable failure due to interleaving suspend/resume for {}",
+                                listener.getEntityId());
+                        retryRecovery = true;
+                    }
                 }
                 listener.notifyAll();
             }
         } finally {
             releasePostRecoveryLocks();
         }
-        return null;
+        return retryRecovery ? doRecover(policy) : null;
     }
 
     protected void acquireRecoveryLocks(IMetadataLockManager lockManager, IMetadataLockUtil lockUtil)