[NO ISSUE][COMP] Refactor locking in the compiler
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Introduce interface IMetadataLockUtil for methods in
MetadataLockUtil
- Make MetadataLockUtil overridable by product extensions
- Refactor dataverse and dataset creation methods in
QueryTranslator for better extensibility
Change-Id: I479be18ae68d9b8d42050e74968816767a454eb3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4424
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
index 15267aa..08f330b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
@@ -22,9 +22,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* An interface that provides an extension mechanism to extend a language with additional statements
@@ -45,10 +43,8 @@
* @param requestParameters
* @param metadataProvider
* @param resultSetId
- * @throws HyracksDataException
- * @throws AlgebricksException
+ * @throws Exception
*/
public abstract void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
- IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
- throws HyracksDataException, AlgebricksException;
+ IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId) throws Exception;
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 0ee41cd..bd70ed4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -28,9 +28,9 @@
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.utils.MetadataLockUtil;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.IRetryPolicy;
@@ -156,8 +156,9 @@
}
}
IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
+ IMetadataLockUtil lockUtil = metadataProvider.getApplicationContext().getMetadataLockUtil();
try {
- acquirePostRecoveryLocks(lockManager);
+ acquirePostRecoveryLocks(lockManager, lockUtil);
synchronized (listener) {
if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId());
@@ -187,12 +188,13 @@
metadataProvider.getLocks().reset();
}
- protected void acquirePostRecoveryLocks(IMetadataLockManager lockManager) throws AlgebricksException {
+ protected void acquirePostRecoveryLocks(IMetadataLockManager lockManager, IMetadataLockUtil lockUtil)
+ throws AlgebricksException {
EntityId entityId = listener.getEntityId();
lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), entityId.getDataverseName(),
entityId.getEntityName());
for (Dataset dataset : listener.getDatasets()) {
- MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataset.getDataverseName(),
+ lockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataset.getDataverseName(),
dataset.getDatasetName());
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 64a9a63..625b95a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -78,6 +78,7 @@
import org.apache.asterix.common.exceptions.WarningUtil;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.utils.JobUtils;
import org.apache.asterix.common.utils.JobUtils.ProgressState;
import org.apache.asterix.common.utils.StorageConstants;
@@ -154,7 +155,6 @@
import org.apache.asterix.metadata.utils.IndexUtil;
import org.apache.asterix.metadata.utils.KeyFieldTypeUtil;
import org.apache.asterix.metadata.utils.MetadataConstants;
-import org.apache.asterix.metadata.utils.MetadataLockUtil;
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.om.base.IAObject;
import org.apache.asterix.om.types.ARecordType;
@@ -234,6 +234,7 @@
protected final ExecutorService executorService;
protected final EnumSet<JobFlag> jobFlags = EnumSet.noneOf(JobFlag.class);
protected final IMetadataLockManager lockManager;
+ protected final IMetadataLockUtil lockUtil;
protected final IResponsePrinter responsePrinter;
protected final WarningCollector warningCollector;
@@ -242,6 +243,7 @@
IResponsePrinter responsePrinter) {
this.appCtx = appCtx;
this.lockManager = appCtx.getMetadataLockManager();
+ this.lockUtil = appCtx.getMetadataLockUtil();
this.statements = statements;
this.sessionOutput = output;
this.sessionConfig = output.config();
@@ -481,37 +483,39 @@
IRequestParameters requestParameters) throws Exception {
CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
DataverseName dvName = stmtCreateDataverse.getDataverseName();
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dvName);
+ lockUtil.createDataverseBegin(lockManager, metadataProvider.getLocks(), dvName);
try {
- doCreateDataverseStatement(mdTxnCtx, metadataProvider, stmtCreateDataverse);
- } catch (Exception e) {
- abort(e, e, mdTxnCtx);
- throw e;
+ doCreateDataverseStatement(metadataProvider, stmtCreateDataverse);
} finally {
metadataProvider.getLocks().unlock();
}
}
@SuppressWarnings("squid:S00112")
- protected boolean doCreateDataverseStatement(MetadataTransactionContext mdTxnCtx, MetadataProvider metadataProvider,
+ protected boolean doCreateDataverseStatement(MetadataProvider metadataProvider,
CreateDataverseStatement stmtCreateDataverse) throws Exception {
- DataverseName dvName = stmtCreateDataverse.getDataverseName();
- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
- if (dv != null) {
- if (stmtCreateDataverse.getIfNotExists()) {
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- return false;
- } else {
- throw new CompilationException(ErrorCode.DATAVERSE_EXISTS, stmtCreateDataverse.getSourceLocation(),
- dvName);
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ try {
+ DataverseName dvName = stmtCreateDataverse.getDataverseName();
+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+ if (dv != null) {
+ if (stmtCreateDataverse.getIfNotExists()) {
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return false;
+ } else {
+ throw new CompilationException(ErrorCode.DATAVERSE_EXISTS, stmtCreateDataverse.getSourceLocation(),
+ dvName);
+ }
}
+ MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(),
+ new Dataverse(dvName, stmtCreateDataverse.getFormat(), MetadataUtil.PENDING_NO_OP));
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return true;
+ } catch (Exception e) {
+ abort(e, e, mdTxnCtx);
+ throw e;
}
- MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(),
- new Dataverse(dvName, stmtCreateDataverse.getFormat(), MetadataUtil.PENDING_NO_OP));
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- return true;
}
protected static void validateCompactionPolicy(String compactionPolicy,
@@ -552,13 +556,10 @@
}
public void handleCreateDatasetStatement(MetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, IRequestParameters requestParameters) throws CompilationException, Exception {
- MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
+ IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
DatasetDecl dd = (DatasetDecl) stmt;
- SourceLocation sourceLoc = dd.getSourceLocation();
DataverseName dataverseName = getActiveDataverseName(dd.getDataverse());
String datasetName = dd.getName().getValue();
- DatasetType dsType = dd.getDatasetType();
DataverseName itemTypeDataverseName = getActiveDataverseName(dd.getItemTypeDataverse());
String itemTypeName = dd.getItemTypeName().getValue();
DataverseName metaItemTypeDataverseName = null;
@@ -571,16 +572,34 @@
Identifier ngNameId = dd.getNodegroupName();
String nodegroupName = ngNameId == null ? null : ngNameId.getValue();
String compactionPolicy = dd.getCompactionPolicy();
+ boolean defaultCompactionPolicy = compactionPolicy == null;
+
+ lockUtil.createDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName,
+ itemTypeDataverseName, itemTypeName, metaItemTypeDataverseName, metaItemTypeName, nodegroupName,
+ compactionPolicy, defaultCompactionPolicy, dd.getDatasetDetailsDecl());
+ try {
+ doCreateDatasetStatement(metadataProvider, dd, dataverseName, datasetName, itemTypeDataverseName,
+ itemTypeName, metaItemTypeDataverseName, metaItemTypeName, hcc, requestParameters);
+ } finally {
+ metadataProvider.getLocks().unlock();
+ }
+ }
+
+ protected void doCreateDatasetStatement(MetadataProvider metadataProvider, DatasetDecl dd,
+ DataverseName dataverseName, String datasetName, DataverseName itemTypeDataverseName, String itemTypeName,
+ DataverseName metaItemTypeDataverseName, String metaItemTypeName, IHyracksClientConnection hcc,
+ IRequestParameters requestParameters) throws Exception {
+ MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
+ SourceLocation sourceLoc = dd.getSourceLocation();
+ DatasetType dsType = dd.getDatasetType();
+ Identifier ngNameId = dd.getNodegroupName();
+ String compactionPolicy = dd.getCompactionPolicy();
Map<String, String> compactionPolicyProperties = dd.getCompactionPolicyProperties();
String compressionScheme = metadataProvider.getCompressionManager()
.getDdlOrDefaultCompressionScheme(dd.getDatasetCompressionScheme());
- boolean defaultCompactionPolicy = compactionPolicy == null;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.createDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName,
- itemTypeDataverseName, itemTypeName, metaItemTypeDataverseName, metaItemTypeName, nodegroupName,
- compactionPolicy, defaultCompactionPolicy);
Dataset dataset = null;
try {
IDatasetDetails datasetDetails;
@@ -739,8 +758,6 @@
}
}
throw e;
- } finally {
- metadataProvider.getLocks().unlock();
}
}
@@ -796,7 +813,7 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
boolean isSecondaryPrimary = stmtCreateIndex.getFieldExprs().isEmpty();
- MetadataLockUtil.createIndexBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.createIndexBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
try {
Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
if (ds == null) {
@@ -1209,7 +1226,7 @@
String typeName = stmtCreateType.getIdent().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.createTypeBegin(lockManager, metadataProvider.getLocks(), dataverseName, typeName);
+ lockUtil.createTypeBegin(lockManager, metadataProvider.getLocks(), dataverseName, typeName);
try {
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
if (dv == null) {
@@ -1243,25 +1260,27 @@
protected void handleDataverseDropStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
- DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
- SourceLocation sourceLoc = stmtDelete.getSourceLocation();
- DataverseName dataverseName = stmtDelete.getDataverseName();
- if (dataverseName.equals(MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME)) {
+ DataverseDropStatement stmtDropDataverse = (DataverseDropStatement) stmt;
+ SourceLocation sourceLoc = stmtDropDataverse.getSourceLocation();
+ DataverseName dataverseName = stmtDropDataverse.getDataverseName();
+ if (dataverseName.equals(MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME)
+ || dataverseName.equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
- MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME + " dataverse can't be dropped");
+ dataverseName + " dataverse can't be dropped");
}
- lockManager.acquireDataverseWriteLock(metadataProvider.getLocks(), dataverseName);
+ lockUtil.dropDataverseBegin(lockManager, metadataProvider.getLocks(), dataverseName);
try {
- doDropDataverse(stmtDelete, sourceLoc, metadataProvider, hcc);
+ doDropDataverse(stmtDropDataverse, metadataProvider, hcc, requestParameters);
} finally {
metadataProvider.getLocks().unlock();
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
- protected boolean doDropDataverse(DataverseDropStatement stmtDelete, SourceLocation sourceLoc,
- MetadataProvider metadataProvider, IHyracksClientConnection hcc) throws Exception {
- DataverseName dataverseName = stmtDelete.getDataverseName();
+ protected boolean doDropDataverse(DataverseDropStatement stmtDropDataverse, MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
+ SourceLocation sourceLoc = stmtDropDataverse.getSourceLocation();
+ DataverseName dataverseName = stmtDropDataverse.getDataverseName();
ProgressState progress = ProgressState.NO_PROGRESS;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
@@ -1270,7 +1289,7 @@
try {
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
if (dv == null) {
- if (stmtDelete.getIfExists()) {
+ if (stmtDropDataverse.getIfExists()) {
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
return false;
} else {
@@ -1420,17 +1439,19 @@
SourceLocation sourceLoc = stmtDelete.getSourceLocation();
DataverseName dataverseName = getActiveDataverseName(stmtDelete.getDataverseName());
String datasetName = stmtDelete.getDatasetName().getValue();
- MetadataLockUtil.dropDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.dropDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
try {
- doDropDataset(dataverseName, datasetName, metadataProvider, stmtDelete.getIfExists(), hcc, true, sourceLoc);
+ doDropDataset(dataverseName, datasetName, metadataProvider, stmtDelete.getIfExists(), hcc,
+ requestParameters, true, sourceLoc);
} finally {
metadataProvider.getLocks().unlock();
+ ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
public void doDropDataset(DataverseName dataverseName, String datasetName, MetadataProvider metadataProvider,
- boolean ifExists, IHyracksClientConnection hcc, boolean dropCorrespondingNodeGroup,
- SourceLocation sourceLoc) throws Exception {
+ boolean ifExists, IHyracksClientConnection hcc, IRequestParameters requestParameters,
+ boolean dropCorrespondingNodeGroup, SourceLocation sourceLoc) throws Exception {
MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
MutableObject<MetadataTransactionContext> mdTxnCtx =
new MutableObject<>(MetadataManager.INSTANCE.beginTransaction());
@@ -1484,8 +1505,6 @@
}
}
throw e;
- } finally {
- ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
@@ -1502,7 +1521,7 @@
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
List<JobSpecification> jobsToExecute = new ArrayList<>();
- MetadataLockUtil.dropIndexBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.dropIndexBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
// For external index
boolean dropFilesIndex = false;
try {
@@ -1676,7 +1695,7 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.dropTypeBegin(lockManager, metadataProvider.getLocks(), dataverseName, typeName);
+ lockUtil.dropTypeBegin(lockManager, metadataProvider.getLocks(), dataverseName, typeName);
try {
Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
if (dt == null) {
@@ -1730,8 +1749,7 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.createFunctionBegin(lockManager, metadataProvider.getLocks(), dataverseName,
- signature.getName());
+ lockUtil.createFunctionBegin(lockManager, metadataProvider.getLocks(), dataverseName, signature.getName());
try {
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
if (dv == null) {
@@ -1808,7 +1826,7 @@
signature.setDataverseName(getActiveDataverseName(signature.getDataverseName()));
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.dropFunctionBegin(lockManager, metadataProvider.getLocks(), signature.getDataverseName(),
+ lockUtil.dropFunctionBegin(lockManager, metadataProvider.getLocks(), signature.getDataverseName(),
signature.getName());
try {
Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
@@ -1839,7 +1857,7 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
try {
CompiledLoadFromFileStatement cls =
new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName().getValue(),
@@ -1872,7 +1890,7 @@
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() throws AlgebricksException {
- MetadataLockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName,
+ lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName,
stmtInsertUpsert.getDatasetName());
}
@@ -1934,7 +1952,7 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName,
+ lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), dataverseName,
stmtDelete.getDatasetName());
try {
metadataProvider.setWriteTransaction(true);
@@ -2027,7 +2045,7 @@
String feedName = cfs.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.createFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, feedName);
+ lockUtil.createFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, feedName);
try {
Feed feed =
MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName, feedName);
@@ -2060,7 +2078,7 @@
SourceLocation sourceLoc = cfps.getSourceLocation();
DataverseName dataverseName = getActiveDataverseName(null);
String policyName = cfps.getPolicyName();
- MetadataLockUtil.createFeedPolicyBegin(lockManager, metadataProvider.getLocks(), dataverseName, policyName);
+ lockUtil.createFeedPolicyBegin(lockManager, metadataProvider.getLocks(), dataverseName, policyName);
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2122,7 +2140,7 @@
String feedName = stmtFeedDrop.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.dropFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, feedName);
+ lockUtil.dropFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, feedName);
try {
Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedName);
if (feed == null) {
@@ -2173,7 +2191,7 @@
SourceLocation sourceLoc = stmtFeedPolicyDrop.getSourceLocation();
DataverseName dataverseName = getActiveDataverseName(stmtFeedPolicyDrop.getDataverseName());
String policyName = stmtFeedPolicyDrop.getPolicyName().getValue();
- MetadataLockUtil.dropFeedPolicyBegin(lockManager, metadataProvider.getLocks(), dataverseName, policyName);
+ lockUtil.dropFeedPolicyBegin(lockManager, metadataProvider.getLocks(), dataverseName, policyName);
try {
FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName, policyName);
if (feedPolicy == null) {
@@ -2202,7 +2220,7 @@
String feedName = sfs.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean committed = false;
- MetadataLockUtil.startFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, feedName);
+ lockUtil.startFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, feedName);
try {
metadataProvider.setMetadataTxnContext(mdTxnCtx);
// Runtime handler
@@ -2262,7 +2280,7 @@
throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
"Feed " + feedName + " is not started.");
}
- MetadataLockUtil.stopFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, feedName);
+ lockUtil.stopFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, feedName);
try {
listener.stop(metadataProvider);
} finally {
@@ -2286,8 +2304,7 @@
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
// Transaction handling
- MetadataLockUtil.connectFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName,
- feedName);
+ lockUtil.connectFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName, feedName);
try {
// validation
Dataset dataset = FeedMetadataUtil.validateIfDatasetExists(metadataProvider, dataverseName, datasetName);
@@ -2337,8 +2354,7 @@
String feedName = cfs.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- MetadataLockUtil.disconnectFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName,
- feedName);
+ lockUtil.disconnectFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName, feedName);
try {
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
@@ -2385,7 +2401,7 @@
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
List<JobSpecification> jobsToExecute = new ArrayList<>();
- MetadataLockUtil.compactBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.compactBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
try {
Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
if (ds == null) {
@@ -2698,7 +2714,7 @@
Dataset transactionDataset = null;
boolean lockAquired = false;
boolean success = false;
- MetadataLockUtil.refreshDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.refreshDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
try {
ds = metadataProvider.findDataset(dataverseName, datasetName);
// Dataset exists ?
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index e2fbe35..336839a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -70,6 +70,7 @@
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.utils.Servlets;
import org.apache.asterix.external.library.ExternalLibraryManager;
@@ -79,6 +80,7 @@
import org.apache.asterix.metadata.api.IAsterixStateProxy;
import org.apache.asterix.metadata.bootstrap.AsterixStateProxy;
import org.apache.asterix.metadata.lock.MetadataLockManager;
+import org.apache.asterix.metadata.utils.MetadataLockUtil;
import org.apache.asterix.runtime.job.resource.JobCapacityController;
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
@@ -188,7 +190,8 @@
CCExtensionManager ccExtensionManager) throws AlgebricksException, IOException {
return new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE,
globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider,
- new MetadataLockManager(), receptionistFactory, configValidatorFactory, ccExtensionManager);
+ new MetadataLockManager(), createMetadataLockUtil(), receptionistFactory, configValidatorFactory,
+ ccExtensionManager);
}
protected IGlobalRecoveryManager createGlobalRecoveryManager() throws Exception {
@@ -199,6 +202,10 @@
return new NcLifecycleCoordinator(ccServiceCtx, replicationEnabled);
}
+ protected IMetadataLockUtil createMetadataLockUtil() {
+ return new MetadataLockUtil();
+ }
+
@Override
public void configureLoggingLevel(Level level) {
super.configureLoggingLevel(level);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index 21b364f..ded5393 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -41,6 +41,7 @@
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.metadata.api.ICCExtensionManager;
@@ -49,6 +50,7 @@
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.lock.MetadataLockManager;
+import org.apache.asterix.metadata.utils.MetadataLockUtil;
import org.apache.asterix.runtime.functions.FunctionCollection;
import org.apache.asterix.runtime.functions.FunctionManager;
import org.apache.asterix.runtime.utils.CcApplicationContext;
@@ -94,6 +96,7 @@
static IStorageComponentProvider componentProvider;
static JobIdFactory jobIdFactory;
static IMetadataLockManager lockManager = new MetadataLockManager();
+ static IMetadataLockUtil lockUtil = new MetadataLockUtil();
static AlgebricksAbsolutePartitionConstraint locations;
static ExecutorService executor;
@@ -122,6 +125,7 @@
hcc = Mockito.mock(IHyracksClientConnection.class);
Mockito.when(appCtx.getActiveNotificationHandler()).thenReturn(handler);
Mockito.when(appCtx.getMetadataLockManager()).thenReturn(lockManager);
+ Mockito.when(appCtx.getMetadataLockUtil()).thenReturn(lockUtil);
Mockito.when(appCtx.getServiceContext()).thenReturn(ccServiceCtx);
Mockito.when(appCtx.getClusterStateManager()).thenReturn(clusterStateManager);
Mockito.when(appCtx.getActiveProperties()).thenReturn(Mockito.mock(ActiveProperties.class));
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
index 36f704c..1e2a795 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
@@ -26,22 +26,24 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.metadata.api.IActiveEntityController;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.utils.DatasetUtil;
-import org.apache.asterix.metadata.utils.MetadataLockUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class TestUserActor extends Actor {
private TestClusterControllerActor clusterController;
private IMetadataLockManager lockManager;
+ private IMetadataLockUtil lockUtil;
public TestUserActor(String name, MetadataProvider metadataProvider, TestClusterControllerActor clusterController) {
super(name, metadataProvider);
this.clusterController = clusterController;
this.lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
+ this.lockUtil = metadataProvider.getApplicationContext().getMetadataLockUtil();
}
public Action startActivity(IActiveEntityController actionListener) {
@@ -54,8 +56,8 @@
lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName, entityName);
List<Dataset> datasets = actionListener.getDatasets();
for (Dataset dataset : datasets) {
- MetadataLockUtil.modifyDatasetBegin(lockManager, mdProvider.getLocks(),
- dataset.getDataverseName(), dataset.getDatasetName());
+ lockUtil.modifyDatasetBegin(lockManager, mdProvider.getLocks(), dataset.getDataverseName(),
+ dataset.getDatasetName());
}
actionListener.start(mdProvider);
} finally {
@@ -77,8 +79,8 @@
lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName, entityName);
List<Dataset> datasets = actionListener.getDatasets();
for (Dataset dataset : datasets) {
- MetadataLockUtil.modifyDatasetBegin(lockManager, mdProvider.getLocks(),
- dataset.getDataverseName(), dataset.getDatasetName());
+ lockUtil.modifyDatasetBegin(lockManager, mdProvider.getLocks(), dataset.getDataverseName(),
+ dataset.getDatasetName());
}
actionListener.stop(mdProvider);
} finally {
@@ -197,7 +199,7 @@
DataverseName dataverseName = dataset.getDataverseName();
String datasetName = dataset.getDatasetName();
try {
- MetadataLockUtil.createIndexBegin(lockManager, mdProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.createIndexBegin(lockManager, mdProvider.getLocks(), dataverseName, datasetName);
if (actionListener.isActive()) {
throw new RuntimeDataException(ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY,
DatasetUtil.getFullyQualifiedDisplayName(dataverseName, datasetName) + ".index",
@@ -219,7 +221,7 @@
DataverseName dataverseName = dataset.getDataverseName();
String datasetName = dataset.getDatasetName();
try {
- MetadataLockUtil.dropIndexBegin(lockManager, mdProvider.getLocks(), dataverseName, datasetName);
+ lockUtil.dropIndexBegin(lockManager, mdProvider.getLocks(), dataverseName, datasetName);
if (actionListener.isActive()) {
throw new RuntimeDataException(
ErrorCode.CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 3389962..d764b98 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.config.ExtensionProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.storage.ICompressionManager;
import org.apache.asterix.common.transactions.IResourceIdManager;
@@ -92,6 +93,11 @@
IMetadataLockManager getMetadataLockManager();
/**
+ * @return the metadata lock utility
+ */
+ IMetadataLockUtil getMetadataLockUtil();
+
+ /**
* @return the metadata bootstrap
*/
IMetadataBootstrap getMetadataBootstrap();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
new file mode 100644
index 0000000..7f02653
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.metadata;
+
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+
+public interface IMetadataLockUtil {
+
+ // Dataverse helpers
+
+ void createDataverseBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName)
+ throws AlgebricksException;
+
+ void dropDataverseBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName)
+ throws AlgebricksException;
+
+ // Dataset helpers
+
+ void createDatasetBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName,
+ String datasetName, DataverseName itemTypeDataverseName, String itemTypeName,
+ DataverseName metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName,
+ String compactionPolicyName, boolean isDefaultCompactionPolicy, Object datasetDetails)
+ throws AlgebricksException;
+
+ void dropDatasetBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName,
+ String datasetName) throws AlgebricksException;
+
+ void modifyDatasetBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName,
+ String datasetName) throws AlgebricksException;
+
+ void refreshDatasetBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName,
+ String datasetName) throws AlgebricksException;
+
+ void compactBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName, String datasetName)
+ throws AlgebricksException;
+
+ void insertDeleteUpsertBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName,
+ String datasetName) throws AlgebricksException;
+
+ // Index helpers
+
+ void createIndexBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName,
+ String datasetName) throws AlgebricksException;
+
+ void dropIndexBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName,
+ String datasetName) throws AlgebricksException;
+
+ // Type helpers
+
+ void createTypeBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName, String typeName)
+ throws AlgebricksException;
+
+ void dropTypeBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName, String typeName)
+ throws AlgebricksException;
+
+ // Function helpers
+
+ void createFunctionBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName,
+ String functionName) throws AlgebricksException;
+
+ void dropFunctionBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName,
+ String functionName) throws AlgebricksException;
+
+ // Feed helpers
+
+ void createFeedPolicyBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName,
+ String policyName) throws AlgebricksException;
+
+ void dropFeedPolicyBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName,
+ String policyName) throws AlgebricksException;
+
+ void createFeedBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName, String feedName)
+ throws AlgebricksException;
+
+ void dropFeedBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName, String feedName)
+ throws AlgebricksException;
+
+ void startFeedBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName, String feedName)
+ throws AlgebricksException;
+
+ void stopFeedBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName, String feedName)
+ throws AlgebricksException;
+
+ void connectFeedBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName,
+ String datasetName, String feedName) throws AlgebricksException;
+
+ void disconnectFeedBegin(IMetadataLockManager lockManager, LockList locks, DataverseName dataverseName,
+ String datasetName, String feedName) throws AlgebricksException;
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
index e6ab9c8..3e5ccd1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
@@ -20,17 +20,39 @@
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.metadata.LockList;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-public class MetadataLockUtil {
+public class MetadataLockUtil implements IMetadataLockUtil {
- private MetadataLockUtil() {
+ @Override
+ public void createDataverseBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName)
+ throws AlgebricksException {
+ lockMgr.acquireDataverseReadLock(locks, dataverseName);
}
- public static void createDatasetBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void dropDataverseBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName)
+ throws AlgebricksException {
+ lockMgr.acquireDataverseWriteLock(locks, dataverseName);
+ }
+
+ @Override
+ public void createDatasetBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String datasetName, DataverseName itemTypeDataverseName, String itemTypeName,
DataverseName metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName,
+ String compactionPolicyName, boolean isDefaultCompactionPolicy, Object datasetDetails)
+ throws AlgebricksException {
+ createDatasetBeginPre(lockMgr, locks, dataverseName, itemTypeDataverseName, itemTypeName,
+ metaItemTypeDataverseName, metaItemTypeName, nodeGroupName, compactionPolicyName,
+ isDefaultCompactionPolicy);
+ lockMgr.acquireDatasetWriteLock(locks, dataverseName, datasetName);
+ }
+
+ protected final void createDatasetBeginPre(IMetadataLockManager lockMgr, LockList locks,
+ DataverseName dataverseName, DataverseName itemTypeDataverseName, String itemTypeName,
+ DataverseName metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName,
String compactionPolicyName, boolean isDefaultCompactionPolicy) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
if (!dataverseName.equals(itemTypeDataverseName)) {
@@ -51,121 +73,139 @@
if (!isDefaultCompactionPolicy) {
lockMgr.acquireMergePolicyReadLock(locks, compactionPolicyName);
}
- lockMgr.acquireDatasetWriteLock(locks, dataverseName, datasetName);
}
- public static void createIndexBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void createIndexBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDatasetCreateIndexLock(locks, dataverseName, datasetName);
}
- public static void dropIndexBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void dropIndexBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDatasetWriteLock(locks, dataverseName, datasetName);
}
- public static void createTypeBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void createTypeBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String typeName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDataTypeWriteLock(locks, dataverseName, typeName);
}
- public static void dropDatasetBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void dropDatasetBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDatasetWriteLock(locks, dataverseName, datasetName);
}
- public static void dropTypeBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void dropTypeBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String typeName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDataTypeWriteLock(locks, dataverseName, typeName);
}
- public static void createFunctionBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void createFunctionBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String functionName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireFunctionWriteLock(locks, dataverseName, functionName);
}
- public static void dropFunctionBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void dropFunctionBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String functionName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireFunctionWriteLock(locks, dataverseName, functionName);
}
- public static void modifyDatasetBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void modifyDatasetBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDatasetModifyLock(locks, dataverseName, datasetName);
}
- public static void insertDeleteUpsertBegin(IMetadataLockManager lockMgr, LockList locks,
- DataverseName dataverseName, String datasetName) throws AlgebricksException {
+ @Override
+ public void insertDeleteUpsertBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDatasetModifyLock(locks, dataverseName, datasetName);
}
- public static void dropFeedBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void dropFeedBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String feedName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireActiveEntityWriteLock(locks, dataverseName, feedName);
}
- public static void dropFeedPolicyBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void dropFeedPolicyBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String policyName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireActiveEntityWriteLock(locks, dataverseName, policyName);
}
- public static void startFeedBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void startFeedBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String feedName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireActiveEntityReadLock(locks, dataverseName, feedName);
}
- public static void stopFeedBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void stopFeedBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String feedName) throws AlgebricksException {
// TODO: dataset lock?
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireActiveEntityReadLock(locks, dataverseName, feedName);
}
- public static void createFeedBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void createFeedBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String feedName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireActiveEntityWriteLock(locks, dataverseName, feedName);
}
- public static void connectFeedBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void connectFeedBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String datasetName, String feedName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireActiveEntityReadLock(locks, dataverseName, feedName);
lockMgr.acquireDatasetReadLock(locks, dataverseName, datasetName);
}
- public static void createFeedPolicyBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void createFeedPolicyBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String policyName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireFeedPolicyWriteLock(locks, dataverseName, policyName);
}
- public static void disconnectFeedBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void disconnectFeedBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String datasetName, String feedName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireActiveEntityReadLock(locks, dataverseName, feedName);
lockMgr.acquireDatasetReadLock(locks, dataverseName, datasetName);
}
- public static void compactBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void compactBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDatasetReadLock(locks, dataverseName, datasetName);
}
- public static void refreshDatasetBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
+ @Override
+ public void refreshDatasetBegin(IMetadataLockManager lockMgr, LockList locks, DataverseName dataverseName,
String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
lockMgr.acquireDatasetExclusiveModificationLock(locks, dataverseName, datasetName);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index a0b10c6..3de4a88 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -47,6 +47,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
+import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.storage.ICompressionManager;
import org.apache.asterix.common.transactions.IResourceIdManager;
@@ -91,6 +92,7 @@
private INcLifecycleCoordinator ftStrategy;
private IJobLifecycleListener activeLifeCycleListener;
private IMetadataLockManager mdLockManager;
+ private IMetadataLockUtil mdLockUtil;
private IClusterStateManager clusterStateManager;
private final INodeJobTracker nodeJobTracker;
private final ITxnIdFactory txnIdFactory;
@@ -103,7 +105,7 @@
ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy,
IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider,
- IMetadataLockManager mdLockManager, IReceptionistFactory receptionistFactory,
+ IMetadataLockManager mdLockManager, IMetadataLockUtil mdLockUtil, IReceptionistFactory receptionistFactory,
IConfigValidatorFactory configValidatorFactory, Object extensionManager)
throws AlgebricksException, IOException {
this.ccServiceCtx = ccServiceCtx;
@@ -130,6 +132,7 @@
this.globalRecoveryManager = globalRecoveryManager;
this.storageComponentProvider = storageComponentProvider;
this.mdLockManager = mdLockManager;
+ this.mdLockUtil = mdLockUtil;
clusterStateManager = new ClusterStateManager();
clusterStateManager.setCcAppCtx(this);
this.resourceIdManager = new ResourceIdManager(clusterStateManager);
@@ -268,6 +271,11 @@
}
@Override
+ public IMetadataLockUtil getMetadataLockUtil() {
+ return mdLockUtil;
+ }
+
+ @Override
public IClusterStateManager getClusterStateManager() {
return clusterStateManager;
}