[NO ISSUE][ING] Fix race between active recovery and rebalance

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- In certain cases, a rebalance active suspend starts before
  recovery of an active job starts.
- When that happens, sometimes, the recovery task exists and
  the active job is not resumed after rebalance.

Change-Id: I66edb73950bb82baa1a1dfd892cb4b23bb7046be
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2950
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 5d722e7..0172b28 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -47,7 +47,6 @@
     private final IRetryPolicyFactory retryPolicyFactory;
     private final MetadataProvider metadataProvider;
     private final IClusterStateManager clusterStateManager;
-    private Exception failure;
 
     public RecoveryTask(ICcApplicationContext appCtx, ActiveEntityEventsListener listener,
             IRetryPolicyFactory retryPolicyFactory) {
@@ -105,50 +104,46 @@
         }
     }
 
-    protected Void doRecover(IRetryPolicy policy)
-            throws AlgebricksException, HyracksDataException, InterruptedException {
+    protected Void doRecover(IRetryPolicy policy) throws AlgebricksException, InterruptedException {
         LOGGER.log(level, "Actual Recovery task has started");
-        if (listener.getState() != ActivityState.TEMPORARILY_FAILED) {
-            LOGGER.log(level, "but its state is not temp failure and so we're just returning");
-            return null;
-        }
-        LOGGER.log(level, "calling the policy");
+        Exception failure = null;
         while (policy.retry(failure)) {
             synchronized (listener) {
-                if (cancelRecovery) {
-                    return null;
-                }
-                while (clusterStateManager.getState() != ClusterState.ACTIVE) {
-                    if (cancelRecovery) {
-                        return null;
-                    }
+                while (!cancelRecovery && clusterStateManager.getState() != ClusterState.ACTIVE) {
                     listener.wait();
                 }
+                if (cancelRecovery) {
+                    LOGGER.log(level, "Recovery has been cancelled");
+                    return null;
+                }
             }
             IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
-            lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
-                    listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
-            for (Dataset dataset : listener.getDatasets()) {
-                lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dataset.getDataverseName());
-                lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
-                        DatasetUtil.getFullyQualifiedName(dataset));
-            }
-            synchronized (listener) {
-                try {
-                    if (cancelRecovery) {
-                        return null;
-                    }
-                    listener.setState(ActivityState.RECOVERING);
-                    listener.doStart(metadataProvider);
-                    return null;
-                } catch (Exception e) {
-                    LOGGER.log(level, "Attempt to revive " + listener.getEntityId() + " failed", e);
-                    listener.setState(ActivityState.TEMPORARILY_FAILED);
-                    failure = e;
-                } finally {
-                    metadataProvider.getLocks().reset();
+            try {
+                lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
+                        listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
+                for (Dataset dataset : listener.getDatasets()) {
+                    lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dataset.getDataverseName());
+                    lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
+                            DatasetUtil.getFullyQualifiedName(dataset));
                 }
-                listener.notifyAll();
+                synchronized (listener) {
+                    try {
+                        if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
+                            listener.setState(ActivityState.RECOVERING);
+                            listener.doStart(metadataProvider);
+                        }
+                        LOGGER.log(level, "Recovery completed successfully");
+                        return null;
+                    } finally {
+                        listener.notifyAll();
+                    }
+                }
+            } catch (Exception e) {
+                LOGGER.log(level, "Attempt to revive " + listener.getEntityId() + " failed", e);
+                listener.setState(ActivityState.TEMPORARILY_FAILED);
+                failure = e;
+            } finally {
+                metadataProvider.getLocks().reset();
             }
         }
         // Recovery task is essntially over now either through failure or through cancellation(stop)
@@ -160,6 +155,9 @@
                     // 1. set the state to permanent failure.
                     // 2. set the entity to not running to avoid auto recovery attempt
                     && listener.getState() != ActivityState.SUSPENDED) {
+                LOGGER.log(level, "Recovery is cancelled because the current state {} is neither {} nor {}",
+                        listener.getState(), ActivityState.TEMPORARILY_FAILED,
+                        listener.getState() != ActivityState.SUSPENDED);
                 return null;
             }
         }
@@ -172,10 +170,7 @@
                         DatasetUtil.getFullyQualifiedName(dataset));
             }
             synchronized (listener) {
-                if (cancelRecovery) {
-                    return null;
-                }
-                if (listener.getState() == ActivityState.TEMPORARILY_FAILED) {
+                if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
                     LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId());
                     listener.setState(ActivityState.STOPPED);
                     listener.setRunning(metadataProvider, false);
@@ -187,8 +182,4 @@
         }
         return null;
     }
-
-    public Exception getFailure() {
-        return failure;
-    }
 }