[NO ISSUE][MTD] Let active listeners acquire suspend locks
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Let each active listener acquire its suspend locks.
Change-Id: I38cf6e9107ee5ce6a1084dca13708ea0153e9e56
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10985
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 5f7d65e..cc4b25f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -44,12 +44,14 @@
import org.apache.asterix.active.message.ActivePartitionMessage.Event;
import org.apache.asterix.active.message.ActiveStatsRequestMessage;
import org.apache.asterix.active.message.StopRuntimeParameters;
+import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
import org.apache.asterix.metadata.api.IActiveEntityController;
@@ -58,6 +60,7 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
@@ -704,6 +707,29 @@
return new RecoveryTask(appCtx, this, retryPolicyFactory);
}
+ public void acquireSuspendLocks(MetadataProvider metadataProvider, Dataset targetDataset)
+ throws AlgebricksException {
+ // write lock the listener
+ // exclusive lock all the datasets (except the target dataset)
+ IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
+ DataverseName dataverseName = entityId.getDataverseName();
+ String entityName = entityId.getEntityName();
+ lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), dataverseName, entityName);
+ acquireSuspendDatasetsLocks(metadataProvider, lockManager, targetDataset);
+ }
+
+ protected void acquireSuspendDatasetsLocks(MetadataProvider metadataProvider, IMetadataLockManager lockManager,
+ Dataset targetDataset) throws AlgebricksException {
+ for (Dataset dataset : getDatasets()) {
+ if (targetDataset != null && targetDataset.equals(dataset)) {
+ // DDL operation already acquired the proper lock for the operation
+ continue;
+ }
+ lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), dataset.getDataverseName(),
+ dataset.getDatasetName());
+ }
+ }
+
@Override
public String toString() {
return "{\"class\":\"" + getClass().getSimpleName() + "\"," + "\"entityId\":\"" + entityId + "\","
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 8b74e07..0d63bca 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -22,7 +22,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.ActiveEvent.Kind;
@@ -30,10 +29,8 @@
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.active.IActiveNotificationHandler;
import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.commons.lang3.tuple.Pair;
@@ -278,30 +275,13 @@
public void suspendForDdlOrHalt(IActiveEntityEventsListener listener, MetadataProvider metadataProvider,
Dataset targetDataset) {
try {
- // write lock the listener
- // exclusive lock all the datasets (except the target dataset)
- IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
- DataverseName dataverseName = listener.getEntityId().getDataverseName();
- String entityName = listener.getEntityId().getEntityName();
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Suspending " + listener.getEntityId());
- }
- LOGGER.log(level, "Acquiring locks");
- lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), dataverseName, entityName);
- Set<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets();
- for (Dataset dataset : datasets) {
- if (targetDataset != null && targetDataset.equals(dataset)) {
- // DDL operation already acquired the proper lock for the operation
- continue;
- }
- lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
- dataset.getDataverseName(), dataset.getDatasetName());
- }
- LOGGER.log(level, "locks acquired");
+ EntityId entityId = listener.getEntityId();
+ LOGGER.log(level, "Suspending {}", entityId);
+ LOGGER.log(level, "Acquiring locks for {}", entityId);
+ ((ActiveEntityEventsListener) listener).acquireSuspendLocks(metadataProvider, targetDataset);
+ LOGGER.log(level, "locks acquired for {}", entityId);
((ActiveEntityEventsListener) listener).suspend(metadataProvider);
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, listener.getEntityId() + " suspended");
- }
+ LOGGER.log(level, "{} suspended", entityId);
} catch (Throwable th) { // NOSONAR must halt in case of any failure
LOGGER.error("Suspend active failed", th);
ExitUtil.halt(ExitUtil.EC_ACTIVE_SUSPEND_FAILURE);