[NO ISSUE][ING] Refactor Active Suspend/Resume Logic

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Refactor the logic for checking DDLs on connected datasets.
- Refactor suspend listener API to allow for suspend for a DDL
  on a dataset.
- Allow suspended active listeners to be unregistered. This is
  done to support removing suspended listeners on active entities
  that were dropped.

Change-Id: I38254582e08d97951a949f7327c8c3d7cf2ab51d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2999
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 2d36bb2..ca610aa 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -85,6 +85,11 @@
     boolean isActive();
 
     /**
+     * @return true, if this {@link IActiveEntityEventsListener} is suspended. Otherwise false.
+     */
+    boolean isSuspended();
+
+    /**
      * unregister the listener upon deletion of entity
      *
      * @throws HyracksDataException
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 62f2c02..783d823 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -681,6 +681,11 @@
     }
 
     @Override
+    public synchronized boolean isSuspended() {
+        return suspended;
+    }
+
+    @Override
     public String toString() {
         return "{\"class\":\"" + getClass().getSimpleName() + "\"" + "\"entityId\":\"" + entityId + "\""
                 + "\"state\":\"" + state + "\"" + "}";
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 6eba4ea..a572e28 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -36,7 +36,6 @@
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
@@ -219,7 +218,7 @@
         if (registeredListener == null) {
             throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED, listener.getEntityId());
         }
-        if (registeredListener.isActive()) {
+        if (registeredListener.isActive() && !registeredListener.isSuspended()) {
             entityEventListeners.put(registeredListener.getEntityId(), registeredListener);
             throw new RuntimeDataException(ErrorCode.CANNOT_DERIGESTER_ACTIVE_ENTITY_LISTENER, listener.getEntityId());
         }
@@ -251,8 +250,7 @@
         }
     }
 
-    public void suspend(MetadataProvider mdProvider)
-            throws AlgebricksException, HyracksDataException, InterruptedException {
+    public void suspend(MetadataProvider mdProvider) throws HyracksDataException {
         synchronized (this) {
             if (suspended) {
                 throw new RuntimeDataException(ErrorCode.ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED);
@@ -260,54 +258,67 @@
             LOGGER.log(level, "Suspending active events handler");
             suspended = true;
         }
+        Collection<IActiveEntityEventsListener> registeredListeners = entityEventListeners.values();
+        for (IActiveEntityEventsListener listener : registeredListeners) {
+            suspendForDdlOrHalt(listener, mdProvider, null);
+        }
+    }
+
+    public void resume(MetadataProvider mdProvider) {
+        LOGGER.log(level, "Resuming active events handler");
+        for (IActiveEntityEventsListener listener : entityEventListeners.values()) {
+            resumeOrHalt(listener, mdProvider);
+        }
+        synchronized (this) {
+            suspended = false;
+        }
+    }
+
+    public void suspendForDdlOrHalt(IActiveEntityEventsListener listener, MetadataProvider metadataProvider,
+            Dataset targetDataset) {
         try {
-            IMetadataLockManager lockManager = mdProvider.getApplicationContext().getMetadataLockManager();
-            Collection<IActiveEntityEventsListener> registeredListeners = entityEventListeners.values();
-            for (IActiveEntityEventsListener listener : registeredListeners) {
-                // write lock the listener
-                // exclusive lock all the datasets
-                String dataverseName = listener.getEntityId().getDataverse();
-                String entityName = listener.getEntityId().getEntityName();
-                if (LOGGER.isEnabled(level)) {
-                    LOGGER.log(level, "Suspending " + listener.getEntityId());
-                }
-                LOGGER.log(level, "Acquiring locks");
-                lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName);
-                List<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets();
-                for (Dataset dataset : datasets) {
-                    lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(),
-                            DatasetUtil.getFullyQualifiedName(dataset));
-                }
-                LOGGER.log(level, "locks acquired");
-                ((ActiveEntityEventsListener) listener).suspend(mdProvider);
-                if (LOGGER.isEnabled(level)) {
-                    LOGGER.log(level, listener.getEntityId() + " suspended");
-                }
+            // write lock the listener
+            // exclusive lock all the datasets (except the target dataset)
+            IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
+            String dataverseName = listener.getEntityId().getDataverse();
+            String entityName = listener.getEntityId().getEntityName();
+            if (LOGGER.isEnabled(level)) {
+                LOGGER.log(level, "Suspending " + listener.getEntityId());
             }
-        } catch (Throwable th) {
+            LOGGER.log(level, "Acquiring locks");
+            lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), dataverseName + '.' + entityName);
+            List<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets();
+            for (Dataset dataset : datasets) {
+                if (targetDataset != null && targetDataset.equals(dataset)) {
+                    // DDL operation already acquired the proper lock for the operation
+                    continue;
+                }
+                lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
+                        DatasetUtil.getFullyQualifiedName(dataset));
+            }
+            LOGGER.log(level, "locks acquired");
+            ((ActiveEntityEventsListener) listener).suspend(metadataProvider);
+            if (LOGGER.isEnabled(level)) {
+                LOGGER.log(level, listener.getEntityId() + " suspended");
+            }
+        } catch (Throwable th) { // NOSONAR must halt in case of any failure
             LOGGER.error("Suspend active failed", th);
             ExitUtil.halt(ExitUtil.EC_ACTIVE_SUSPEND_FAILURE);
         }
     }
 
-    public void resume(MetadataProvider mdProvider) throws HyracksDataException {
-        LOGGER.log(level, "Resuming active events handler");
+    public void resumeOrHalt(IActiveEntityEventsListener listener, MetadataProvider metadataProvider) {
         try {
-            for (IActiveEntityEventsListener listener : entityEventListeners.values()) {
-                if (LOGGER.isEnabled(level)) {
-                    LOGGER.log(level, "Resuming " + listener.getEntityId());
-                }
-                ((ActiveEntityEventsListener) listener).resume(mdProvider);
-                if (LOGGER.isEnabled(level)) {
-                    LOGGER.log(level, listener.getEntityId() + " resumed");
-                }
+            if (LOGGER.isEnabled(level)) {
+                LOGGER.log(level, "Resuming " + listener.getEntityId());
             }
-        } catch (Throwable th) {
+            ((ActiveEntityEventsListener) listener).resume(metadataProvider);
+            if (LOGGER.isEnabled(level)) {
+                LOGGER.log(level, listener.getEntityId() + " resumed");
+            }
+        } catch (Throwable th) { // NOSONAR must halt in case of any failure
             LOGGER.error("Resume active failed", th);
             ExitUtil.halt(ExitUtil.EC_ACTIVE_RESUME_FAILURE);
         }
-        synchronized (this) {
-            suspended = false;
-        }
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 8e86b9c..cffa178 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -70,7 +70,6 @@
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
@@ -728,23 +727,15 @@
 
     protected static void validateIfResourceIsActiveInFeed(ICcApplicationContext appCtx, Dataset dataset,
             SourceLocation sourceLoc) throws CompilationException {
-        StringBuilder builder = null;
         ActiveNotificationHandler activeEventHandler =
                 (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
         IActiveEntityEventsListener[] listeners = activeEventHandler.getEventListeners();
         for (IActiveEntityEventsListener listener : listeners) {
             if (listener.isEntityUsingDataset(dataset) && listener.isActive()) {
-                if (builder == null) {
-                    builder = new StringBuilder();
-                }
-                builder.append(listener.getEntityId() + "\n");
+                throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET, sourceLoc,
+                        dataset.getFullyQualifiedName(), listener.getEntityId().toString());
             }
         }
-        if (builder != null) {
-            throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
-                    "Dataset " + dataset.getDataverseName() + "." + dataset.getDatasetName() + " is currently being "
-                            + "fed into by the following active entities.\n" + builder.toString());
-        }
     }
 
     protected static String configureNodegroupForDataset(ICcApplicationContext appCtx, Map<String, String> hints,
@@ -935,7 +926,7 @@
         }
     }
 
-    public static void doCreateIndex(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Dataset ds,
+    protected void doCreateIndex(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Dataset ds,
             Index index, EnumSet<JobFlag> jobFlags, SourceLocation sourceLoc) throws Exception {
         ProgressState progress = ProgressState.NO_PROGRESS;
         boolean bActiveTxn = true;
@@ -949,7 +940,7 @@
         try {
             index.setPendingOp(MetadataUtil.PENDING_ADD_OP);
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                validateIfResourceIsActiveInFeed(metadataProvider.getApplicationContext(), ds, sourceLoc);
+                validateDatasetState(metadataProvider, ds, sourceLoc);
             } else {
                 // External dataset
                 // Check if the dataset is indexible
@@ -1414,7 +1405,7 @@
         }
     }
 
-    public static void doDropDataset(String dataverseName, String datasetName, MetadataProvider metadataProvider,
+    public void doDropDataset(String dataverseName, String datasetName, MetadataProvider metadataProvider,
             boolean ifExists, IHyracksClientConnection hcc, boolean dropCorrespondingNodeGroup,
             SourceLocation sourceLoc) throws Exception {
         MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
@@ -1434,6 +1425,7 @@
                             dataverseName);
                 }
             }
+            validateDatasetState(metadataProvider, ds, sourceLoc);
             ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, progress, hcc, dropCorrespondingNodeGroup,
                     sourceLoc);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
@@ -1497,23 +1489,6 @@
                 throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName,
                         dataverseName);
             }
-            ActiveNotificationHandler activeEventHandler =
-                    (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
-            IActiveEntityEventsListener[] listeners = activeEventHandler.getEventListeners();
-            StringBuilder builder = null;
-            for (IActiveEntityEventsListener listener : listeners) {
-                if (listener.isEntityUsingDataset(ds)) {
-                    if (builder == null) {
-                        builder = new StringBuilder();
-                    }
-                    builder.append(new FeedConnectionId(listener.getEntityId(), datasetName) + "\n");
-                }
-            }
-            if (builder != null) {
-                throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Dataset" + datasetName
-                        + " is currently being fed into by the following active entities: " + builder.toString());
-            }
-
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 if (index == null) {
@@ -1525,6 +1500,7 @@
                     }
                 }
                 ensureNonPrimaryIndexDrop(index, sourceLoc);
+                validateDatasetState(metadataProvider, ds, sourceLoc);
                 // #. prepare a job to drop the index in NC.
                 jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds, sourceLoc));
 
@@ -2986,4 +2962,9 @@
         }
         return m;
     }
+
+    protected void validateDatasetState(MetadataProvider metadataProvider, Dataset dataset, SourceLocation sourceLoc)
+            throws Exception {
+        validateIfResourceIsActiveInFeed(metadataProvider.getApplicationContext(), dataset, sourceLoc);
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 8471d45..a25ed20 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -341,17 +341,6 @@
             throws Exception {
         Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>();
         if (getDatasetType() == DatasetType.INTERNAL) {
-            // prepare job spec(s) that would disconnect any active feeds involving the dataset.
-            IActiveNotificationHandler activeListener = (IActiveNotificationHandler) metadataProvider
-                    .getApplicationContext().getActiveNotificationHandler();
-            IActiveEntityEventsListener[] activeListeners = activeListener.getEventListeners();
-            for (IActiveEntityEventsListener listener : activeListeners) {
-                if (listener.isEntityUsingDataset(this)) {
-                    throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET,
-                            RecordUtil.toFullyQualifiedName(dataverseName, datasetName),
-                            listener.getEntityId().toString());
-                }
-            }
             // #. prepare jobs to drop the datatset and the indexes in NC
             List<Index> indexes =
                     MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, datasetName);