[NO ISSUE][ING] Refactor Active Suspend/Resume Logic
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Refactor the logic for checking DDLs on connected datasets.
- Refactor suspend listener API to allow for suspend for a DDL
on a dataset.
- Allow suspended active listeners to be unregistered. This is
done to support removing suspended listeners on active entities
that were dropped.
Change-Id: I38254582e08d97951a949f7327c8c3d7cf2ab51d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2999
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 2d36bb2..ca610aa 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -85,6 +85,11 @@
boolean isActive();
/**
+ * @return true, if this {@link IActiveEntityEventsListener} is suspended. Otherwise false.
+ */
+ boolean isSuspended();
+
+ /**
* unregister the listener upon deletion of entity
*
* @throws HyracksDataException
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 62f2c02..783d823 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -681,6 +681,11 @@
}
@Override
+ public synchronized boolean isSuspended() {
+ return suspended;
+ }
+
+ @Override
public String toString() {
return "{\"class\":\"" + getClass().getSimpleName() + "\"" + "\"entityId\":\"" + entityId + "\""
+ "\"state\":\"" + state + "\"" + "}";
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 6eba4ea..a572e28 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -36,7 +36,6 @@
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.IJobLifecycleListener;
@@ -219,7 +218,7 @@
if (registeredListener == null) {
throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED, listener.getEntityId());
}
- if (registeredListener.isActive()) {
+ if (registeredListener.isActive() && !registeredListener.isSuspended()) {
entityEventListeners.put(registeredListener.getEntityId(), registeredListener);
throw new RuntimeDataException(ErrorCode.CANNOT_DERIGESTER_ACTIVE_ENTITY_LISTENER, listener.getEntityId());
}
@@ -251,8 +250,7 @@
}
}
- public void suspend(MetadataProvider mdProvider)
- throws AlgebricksException, HyracksDataException, InterruptedException {
+ public void suspend(MetadataProvider mdProvider) throws HyracksDataException {
synchronized (this) {
if (suspended) {
throw new RuntimeDataException(ErrorCode.ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED);
@@ -260,54 +258,67 @@
LOGGER.log(level, "Suspending active events handler");
suspended = true;
}
+ Collection<IActiveEntityEventsListener> registeredListeners = entityEventListeners.values();
+ for (IActiveEntityEventsListener listener : registeredListeners) {
+ suspendForDdlOrHalt(listener, mdProvider, null);
+ }
+ }
+
+ public void resume(MetadataProvider mdProvider) {
+ LOGGER.log(level, "Resuming active events handler");
+ for (IActiveEntityEventsListener listener : entityEventListeners.values()) {
+ resumeOrHalt(listener, mdProvider);
+ }
+ synchronized (this) {
+ suspended = false;
+ }
+ }
+
+ public void suspendForDdlOrHalt(IActiveEntityEventsListener listener, MetadataProvider metadataProvider,
+ Dataset targetDataset) {
try {
- IMetadataLockManager lockManager = mdProvider.getApplicationContext().getMetadataLockManager();
- Collection<IActiveEntityEventsListener> registeredListeners = entityEventListeners.values();
- for (IActiveEntityEventsListener listener : registeredListeners) {
- // write lock the listener
- // exclusive lock all the datasets
- String dataverseName = listener.getEntityId().getDataverse();
- String entityName = listener.getEntityId().getEntityName();
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Suspending " + listener.getEntityId());
- }
- LOGGER.log(level, "Acquiring locks");
- lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName);
- List<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets();
- for (Dataset dataset : datasets) {
- lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(),
- DatasetUtil.getFullyQualifiedName(dataset));
- }
- LOGGER.log(level, "locks acquired");
- ((ActiveEntityEventsListener) listener).suspend(mdProvider);
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, listener.getEntityId() + " suspended");
- }
+ // write lock the listener
+ // exclusive lock all the datasets (except the target dataset)
+ IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
+ String dataverseName = listener.getEntityId().getDataverse();
+ String entityName = listener.getEntityId().getEntityName();
+ if (LOGGER.isEnabled(level)) {
+ LOGGER.log(level, "Suspending " + listener.getEntityId());
}
- } catch (Throwable th) {
+ LOGGER.log(level, "Acquiring locks");
+ lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), dataverseName + '.' + entityName);
+ List<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets();
+ for (Dataset dataset : datasets) {
+ if (targetDataset != null && targetDataset.equals(dataset)) {
+ // DDL operation already acquired the proper lock for the operation
+ continue;
+ }
+ lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
+ DatasetUtil.getFullyQualifiedName(dataset));
+ }
+ LOGGER.log(level, "locks acquired");
+ ((ActiveEntityEventsListener) listener).suspend(metadataProvider);
+ if (LOGGER.isEnabled(level)) {
+ LOGGER.log(level, listener.getEntityId() + " suspended");
+ }
+ } catch (Throwable th) { // NOSONAR must halt in case of any failure
LOGGER.error("Suspend active failed", th);
ExitUtil.halt(ExitUtil.EC_ACTIVE_SUSPEND_FAILURE);
}
}
- public void resume(MetadataProvider mdProvider) throws HyracksDataException {
- LOGGER.log(level, "Resuming active events handler");
+ public void resumeOrHalt(IActiveEntityEventsListener listener, MetadataProvider metadataProvider) {
try {
- for (IActiveEntityEventsListener listener : entityEventListeners.values()) {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Resuming " + listener.getEntityId());
- }
- ((ActiveEntityEventsListener) listener).resume(mdProvider);
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, listener.getEntityId() + " resumed");
- }
+ if (LOGGER.isEnabled(level)) {
+ LOGGER.log(level, "Resuming " + listener.getEntityId());
}
- } catch (Throwable th) {
+ ((ActiveEntityEventsListener) listener).resume(metadataProvider);
+ if (LOGGER.isEnabled(level)) {
+ LOGGER.log(level, listener.getEntityId() + " resumed");
+ }
+ } catch (Throwable th) { // NOSONAR must halt in case of any failure
LOGGER.error("Resume active failed", th);
ExitUtil.halt(ExitUtil.EC_ACTIVE_RESUME_FAILURE);
}
- synchronized (this) {
- suspended = false;
- }
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 8e86b9c..cffa178 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -70,7 +70,6 @@
import org.apache.asterix.common.utils.JobUtils;
import org.apache.asterix.common.utils.JobUtils.ProgressState;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
@@ -728,23 +727,15 @@
protected static void validateIfResourceIsActiveInFeed(ICcApplicationContext appCtx, Dataset dataset,
SourceLocation sourceLoc) throws CompilationException {
- StringBuilder builder = null;
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
IActiveEntityEventsListener[] listeners = activeEventHandler.getEventListeners();
for (IActiveEntityEventsListener listener : listeners) {
if (listener.isEntityUsingDataset(dataset) && listener.isActive()) {
- if (builder == null) {
- builder = new StringBuilder();
- }
- builder.append(listener.getEntityId() + "\n");
+ throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET, sourceLoc,
+ dataset.getFullyQualifiedName(), listener.getEntityId().toString());
}
}
- if (builder != null) {
- throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
- "Dataset " + dataset.getDataverseName() + "." + dataset.getDatasetName() + " is currently being "
- + "fed into by the following active entities.\n" + builder.toString());
- }
}
protected static String configureNodegroupForDataset(ICcApplicationContext appCtx, Map<String, String> hints,
@@ -935,7 +926,7 @@
}
}
- public static void doCreateIndex(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Dataset ds,
+ protected void doCreateIndex(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Dataset ds,
Index index, EnumSet<JobFlag> jobFlags, SourceLocation sourceLoc) throws Exception {
ProgressState progress = ProgressState.NO_PROGRESS;
boolean bActiveTxn = true;
@@ -949,7 +940,7 @@
try {
index.setPendingOp(MetadataUtil.PENDING_ADD_OP);
if (ds.getDatasetType() == DatasetType.INTERNAL) {
- validateIfResourceIsActiveInFeed(metadataProvider.getApplicationContext(), ds, sourceLoc);
+ validateDatasetState(metadataProvider, ds, sourceLoc);
} else {
// External dataset
// Check if the dataset is indexible
@@ -1414,7 +1405,7 @@
}
}
- public static void doDropDataset(String dataverseName, String datasetName, MetadataProvider metadataProvider,
+ public void doDropDataset(String dataverseName, String datasetName, MetadataProvider metadataProvider,
boolean ifExists, IHyracksClientConnection hcc, boolean dropCorrespondingNodeGroup,
SourceLocation sourceLoc) throws Exception {
MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
@@ -1434,6 +1425,7 @@
dataverseName);
}
}
+ validateDatasetState(metadataProvider, ds, sourceLoc);
ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, progress, hcc, dropCorrespondingNodeGroup,
sourceLoc);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
@@ -1497,23 +1489,6 @@
throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName,
dataverseName);
}
- ActiveNotificationHandler activeEventHandler =
- (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
- IActiveEntityEventsListener[] listeners = activeEventHandler.getEventListeners();
- StringBuilder builder = null;
- for (IActiveEntityEventsListener listener : listeners) {
- if (listener.isEntityUsingDataset(ds)) {
- if (builder == null) {
- builder = new StringBuilder();
- }
- builder.append(new FeedConnectionId(listener.getEntityId(), datasetName) + "\n");
- }
- }
- if (builder != null) {
- throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Dataset" + datasetName
- + " is currently being fed into by the following active entities: " + builder.toString());
- }
-
if (ds.getDatasetType() == DatasetType.INTERNAL) {
Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
if (index == null) {
@@ -1525,6 +1500,7 @@
}
}
ensureNonPrimaryIndexDrop(index, sourceLoc);
+ validateDatasetState(metadataProvider, ds, sourceLoc);
// #. prepare a job to drop the index in NC.
jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds, sourceLoc));
@@ -2986,4 +2962,9 @@
}
return m;
}
+
+ protected void validateDatasetState(MetadataProvider metadataProvider, Dataset dataset, SourceLocation sourceLoc)
+ throws Exception {
+ validateIfResourceIsActiveInFeed(metadataProvider.getApplicationContext(), dataset, sourceLoc);
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 8471d45..a25ed20 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -341,17 +341,6 @@
throws Exception {
Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>();
if (getDatasetType() == DatasetType.INTERNAL) {
- // prepare job spec(s) that would disconnect any active feeds involving the dataset.
- IActiveNotificationHandler activeListener = (IActiveNotificationHandler) metadataProvider
- .getApplicationContext().getActiveNotificationHandler();
- IActiveEntityEventsListener[] activeListeners = activeListener.getEventListeners();
- for (IActiveEntityEventsListener listener : activeListeners) {
- if (listener.isEntityUsingDataset(this)) {
- throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET,
- RecordUtil.toFullyQualifiedName(dataverseName, datasetName),
- listener.getEntityId().toString());
- }
- }
// #. prepare jobs to drop the datatset and the indexes in NC
List<Index> indexes =
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, datasetName);