[NO ISSUE][OTH] Make Active Recovery Task Extensible
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Allow active entity listeners to create their own
recovery tasks.
- Make lock acquisition in recovery task extensible.
Change-Id: I801eec74f7c1723e8243fe0f36db6148638bde35
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3538
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 3b847b3..882afc5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -366,7 +366,7 @@
ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor();
setState(ActivityState.TEMPORARILY_FAILED);
LOGGER.log(level, "Recovery task has been submitted");
- rt = new RecoveryTask(appCtx, this, retryPolicyFactory);
+ rt = createRecoveryTask();
executor.submit(rt.recover());
}
}
@@ -685,6 +685,10 @@
return suspended;
}
+ protected RecoveryTask createRecoveryTask() {
+ return new RecoveryTask(appCtx, this, retryPolicyFactory);
+ }
+
@Override
public String toString() {
return "{\"class\":\"" + getClass().getSimpleName() + "\"," + "\"entityId\":\"" + entityId + "\","
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 0172b28..a1989fc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -119,13 +119,7 @@
}
IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
try {
- lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
- listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
- for (Dataset dataset : listener.getDatasets()) {
- lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dataset.getDataverseName());
- lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
- DatasetUtil.getFullyQualifiedName(dataset));
- }
+ acquireRecoveryLocks(lockManager);
synchronized (listener) {
try {
if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
@@ -143,7 +137,7 @@
listener.setState(ActivityState.TEMPORARILY_FAILED);
failure = e;
} finally {
- metadataProvider.getLocks().reset();
+ releaseRecoveryLocks(metadataProvider);
}
}
// Recovery task is essntially over now either through failure or through cancellation(stop)
@@ -163,12 +157,7 @@
}
IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
try {
- lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
- listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
- for (Dataset dataset : listener.getDatasets()) {
- MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataset.getDatasetName(),
- DatasetUtil.getFullyQualifiedName(dataset));
- }
+ acquirePostRecoveryLocks(lockManager);
synchronized (listener) {
if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId());
@@ -178,8 +167,35 @@
listener.notifyAll();
}
} finally {
- metadataProvider.getLocks().reset();
+ releasePostRecoveryLocks();
}
return null;
}
+
+ protected void acquireRecoveryLocks(IMetadataLockManager lockManager) throws AlgebricksException {
+ lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
+ listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
+ for (Dataset dataset : listener.getDatasets()) {
+ lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dataset.getDataverseName());
+ lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
+ DatasetUtil.getFullyQualifiedName(dataset));
+ }
+ }
+
+ protected void releaseRecoveryLocks(MetadataProvider metadataProvider) {
+ metadataProvider.getLocks().reset();
+ }
+
+ protected void acquirePostRecoveryLocks(IMetadataLockManager lockManager) throws AlgebricksException {
+ lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
+ listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
+ for (Dataset dataset : listener.getDatasets()) {
+ MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataset.getDatasetName(),
+ DatasetUtil.getFullyQualifiedName(dataset));
+ }
+ }
+
+ protected void releasePostRecoveryLocks() {
+ metadataProvider.getLocks().reset();
+ }
}