[NO ISSUE][MTD] Let active listeners acquire suspend locks

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Let each active listener acquire its suspend locks.

Change-Id: I38cf6e9107ee5ce6a1084dca13708ea0153e9e56
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10985
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 5f7d65e..cc4b25f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -44,12 +44,14 @@
 import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.active.message.ActiveStatsRequestMessage;
 import org.apache.asterix.active.message.StopRuntimeParameters;
+import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
 import org.apache.asterix.metadata.api.IActiveEntityController;
@@ -58,6 +60,7 @@
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
@@ -704,6 +707,29 @@
         return new RecoveryTask(appCtx, this, retryPolicyFactory);
     }
 
+    public void acquireSuspendLocks(MetadataProvider metadataProvider, Dataset targetDataset)
+            throws AlgebricksException {
+        // write lock the listener
+        // exclusive lock all the datasets (except the target dataset)
+        IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
+        DataverseName dataverseName = entityId.getDataverseName();
+        String entityName = entityId.getEntityName();
+        lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), dataverseName, entityName);
+        acquireSuspendDatasetsLocks(metadataProvider, lockManager, targetDataset);
+    }
+
+    protected void acquireSuspendDatasetsLocks(MetadataProvider metadataProvider, IMetadataLockManager lockManager,
+            Dataset targetDataset) throws AlgebricksException {
+        for (Dataset dataset : getDatasets()) {
+            if (targetDataset != null && targetDataset.equals(dataset)) {
+                // DDL operation already acquired the proper lock for the operation
+                continue;
+            }
+            lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), dataset.getDataverseName(),
+                    dataset.getDatasetName());
+        }
+    }
+
     @Override
     public String toString() {
         return "{\"class\":\"" + getClass().getSimpleName() + "\"," + "\"entityId\":\"" + entityId + "\","
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 8b74e07..0d63bca 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -22,7 +22,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActiveEvent.Kind;
@@ -30,10 +29,8 @@
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.IActiveNotificationHandler;
 import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.commons.lang3.tuple.Pair;
@@ -278,30 +275,13 @@
     public void suspendForDdlOrHalt(IActiveEntityEventsListener listener, MetadataProvider metadataProvider,
             Dataset targetDataset) {
         try {
-            // write lock the listener
-            // exclusive lock all the datasets (except the target dataset)
-            IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
-            DataverseName dataverseName = listener.getEntityId().getDataverseName();
-            String entityName = listener.getEntityId().getEntityName();
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Suspending " + listener.getEntityId());
-            }
-            LOGGER.log(level, "Acquiring locks");
-            lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), dataverseName, entityName);
-            Set<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets();
-            for (Dataset dataset : datasets) {
-                if (targetDataset != null && targetDataset.equals(dataset)) {
-                    // DDL operation already acquired the proper lock for the operation
-                    continue;
-                }
-                lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
-                        dataset.getDataverseName(), dataset.getDatasetName());
-            }
-            LOGGER.log(level, "locks acquired");
+            EntityId entityId = listener.getEntityId();
+            LOGGER.log(level, "Suspending {}", entityId);
+            LOGGER.log(level, "Acquiring locks for {}", entityId);
+            ((ActiveEntityEventsListener) listener).acquireSuspendLocks(metadataProvider, targetDataset);
+            LOGGER.log(level, "locks acquired for {}", entityId);
             ((ActiveEntityEventsListener) listener).suspend(metadataProvider);
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, listener.getEntityId() + " suspended");
-            }
+            LOGGER.log(level, "{} suspended", entityId);
         } catch (Throwable th) { // NOSONAR must halt in case of any failure
             LOGGER.error("Suspend active failed", th);
             ExitUtil.halt(ExitUtil.EC_ACTIVE_SUSPEND_FAILURE);