Merge branch 'gerrit/trinity' into 'gerrit/goldfish'
Ext-ref: MB-64109
Change-Id: I0b42352d9dc60b34f4359e7f61c61fea5bbb419e
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 0dc761b..ac636b7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -105,6 +105,7 @@
protected int numDeRegistered;
protected volatile RecoveryTask rt;
protected volatile boolean suspended = false;
+ private long suspendCount;
// failures
protected Exception jobFailure;
protected Exception resumeFailure;
@@ -265,6 +266,10 @@
return jobId;
}
+ public long getSuspendCount() {
+ return suspendCount;
+ }
+
@Override
public String getStats() {
return stats;
@@ -568,6 +573,7 @@
LOGGER.log(level, "{} waiting for ongoing activities", jobId);
waitForNonTransitionState();
LOGGER.log(level, "{} proceeding with suspension. current state is {}", jobId, state);
+ suspendCount++;
if (state == ActivityState.STOPPED) {
suspended = true;
return;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index a201ec8..39c1125 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -121,6 +121,7 @@
protected Void doRecover(IRetryPolicy policy) throws AlgebricksException, InterruptedException {
LOGGER.log(level, "Actual Recovery task has started");
Exception failure;
+ long prevSuspendCount;
do {
synchronized (listener) {
while (!cancelRecovery && !canStartRecovery()) {
@@ -153,6 +154,7 @@
listener.setState(ActivityState.TEMPORARILY_FAILED);
failure = e;
} finally {
+ prevSuspendCount = listener.getSuspendCount();
releaseRecoveryLocks(metadataProvider);
}
} while (policy.retry(failure));
@@ -173,20 +175,28 @@
}
IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
IMetadataLockUtil lockUtil = metadataProvider.getApplicationContext().getMetadataLockUtil();
+ boolean retryRecovery = false;
try {
acquirePostRecoveryLocks(lockManager, lockUtil);
synchronized (listener) {
if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
- LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId());
- listener.setState(ActivityState.STOPPED);
- listener.setRunning(metadataProvider, false);
+ if (prevSuspendCount == listener.getSuspendCount()) {
+ LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId());
+ listener.setState(ActivityState.STOPPED);
+ listener.setRunning(metadataProvider, false);
+ } else {
+ LOGGER.log(level,
+ "Retrying recovery on non-retryable failure due to interleaving suspend/resume for {}",
+ listener.getEntityId());
+ retryRecovery = true;
+ }
}
listener.notifyAll();
}
} finally {
releasePostRecoveryLocks();
}
- return null;
+ return retryRecovery ? doRecover(policy) : null;
}
protected void acquireRecoveryLocks(IMetadataLockManager lockManager, IMetadataLockUtil lockUtil)