[NO ISSUE][ING] Fix race between active recovery and rebalance
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- In certain cases, a rebalance active suspend starts before
recovery of an active job starts.
- When that happens, sometimes, the recovery task exists and
the active job is not resumed after rebalance.
Change-Id: I66edb73950bb82baa1a1dfd892cb4b23bb7046be
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2950
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 5d722e7..0172b28 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -47,7 +47,6 @@
private final IRetryPolicyFactory retryPolicyFactory;
private final MetadataProvider metadataProvider;
private final IClusterStateManager clusterStateManager;
- private Exception failure;
public RecoveryTask(ICcApplicationContext appCtx, ActiveEntityEventsListener listener,
IRetryPolicyFactory retryPolicyFactory) {
@@ -105,50 +104,46 @@
}
}
- protected Void doRecover(IRetryPolicy policy)
- throws AlgebricksException, HyracksDataException, InterruptedException {
+ protected Void doRecover(IRetryPolicy policy) throws AlgebricksException, InterruptedException {
LOGGER.log(level, "Actual Recovery task has started");
- if (listener.getState() != ActivityState.TEMPORARILY_FAILED) {
- LOGGER.log(level, "but its state is not temp failure and so we're just returning");
- return null;
- }
- LOGGER.log(level, "calling the policy");
+ Exception failure = null;
while (policy.retry(failure)) {
synchronized (listener) {
- if (cancelRecovery) {
- return null;
- }
- while (clusterStateManager.getState() != ClusterState.ACTIVE) {
- if (cancelRecovery) {
- return null;
- }
+ while (!cancelRecovery && clusterStateManager.getState() != ClusterState.ACTIVE) {
listener.wait();
}
+ if (cancelRecovery) {
+ LOGGER.log(level, "Recovery has been cancelled");
+ return null;
+ }
}
IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
- lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
- listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
- for (Dataset dataset : listener.getDatasets()) {
- lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dataset.getDataverseName());
- lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
- DatasetUtil.getFullyQualifiedName(dataset));
- }
- synchronized (listener) {
- try {
- if (cancelRecovery) {
- return null;
- }
- listener.setState(ActivityState.RECOVERING);
- listener.doStart(metadataProvider);
- return null;
- } catch (Exception e) {
- LOGGER.log(level, "Attempt to revive " + listener.getEntityId() + " failed", e);
- listener.setState(ActivityState.TEMPORARILY_FAILED);
- failure = e;
- } finally {
- metadataProvider.getLocks().reset();
+ try {
+ lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
+ listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
+ for (Dataset dataset : listener.getDatasets()) {
+ lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dataset.getDataverseName());
+ lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
+ DatasetUtil.getFullyQualifiedName(dataset));
}
- listener.notifyAll();
+ synchronized (listener) {
+ try {
+ if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
+ listener.setState(ActivityState.RECOVERING);
+ listener.doStart(metadataProvider);
+ }
+ LOGGER.log(level, "Recovery completed successfully");
+ return null;
+ } finally {
+ listener.notifyAll();
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.log(level, "Attempt to revive " + listener.getEntityId() + " failed", e);
+ listener.setState(ActivityState.TEMPORARILY_FAILED);
+ failure = e;
+ } finally {
+ metadataProvider.getLocks().reset();
}
}
// Recovery task is essntially over now either through failure or through cancellation(stop)
@@ -160,6 +155,9 @@
// 1. set the state to permanent failure.
// 2. set the entity to not running to avoid auto recovery attempt
&& listener.getState() != ActivityState.SUSPENDED) {
+ LOGGER.log(level, "Recovery is cancelled because the current state {} is neither {} nor {}",
+ listener.getState(), ActivityState.TEMPORARILY_FAILED,
+ listener.getState() != ActivityState.SUSPENDED);
return null;
}
}
@@ -172,10 +170,7 @@
DatasetUtil.getFullyQualifiedName(dataset));
}
synchronized (listener) {
- if (cancelRecovery) {
- return null;
- }
- if (listener.getState() == ActivityState.TEMPORARILY_FAILED) {
+ if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId());
listener.setState(ActivityState.STOPPED);
listener.setRunning(metadataProvider, false);
@@ -187,8 +182,4 @@
}
return null;
}
-
- public Exception getFailure() {
- return failure;
- }
}