remove global metadata lock
Change-Id: Id05ff463fee356b3270b53d0b3137c4b1bc3d830
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/84
Reviewed-by: Sattam Alsubaiee <salsubaiee@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
index 600295c..c121da3 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
@@ -21,6 +21,7 @@
import org.apache.commons.lang3.mutable.Mutable;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.Index;
@@ -256,7 +257,8 @@
List<Index> indexCandidates = new ArrayList<Index>();
// Add an index to the candidates if one of the indexed fields is fieldName
for (Index index : datasetIndexes) {
- if (index.getKeyFieldNames().contains(fieldName)) {
+ // Need to also verify the index is pending no op
+ if (index.getKeyFieldNames().contains(fieldName) && index.getPendingOp() == IMetadataEntity.PENDING_NO_OP) {
indexCandidates.add(index);
}
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index b6d09b1..e0635c6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -110,6 +110,7 @@
import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
import edu.uci.ics.asterix.metadata.utils.ExternalDatasetsRegistry;
+import edu.uci.ics.asterix.metadata.utils.MetadataLockManager;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
@@ -364,14 +365,12 @@
private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
throws Exception {
-
+ DataverseDecl dvd = (DataverseDecl) stmt;
+ String dvName = dvd.getDataverseName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireReadLatch();
-
+ MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
try {
- DataverseDecl dvd = (DataverseDecl) stmt;
- String dvName = dvd.getDataverseName().getValue();
Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
if (dv == null) {
throw new MetadataException("Unknown dataverse " + dvName);
@@ -382,19 +381,19 @@
abort(e, e, mdTxnCtx);
throw new MetadataException(e);
} finally {
- releaseReadLatch();
+ MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
}
}
private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
+ CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
+ String dvName = stmtCreateDataverse.getDataverseName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireWriteLatch();
+ MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
try {
- CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
- String dvName = stmtCreateDataverse.getDataverseName().getValue();
Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
if (dv != null) {
if (stmtCreateDataverse.getIfNotExists()) {
@@ -411,7 +410,7 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- releaseWriteLatch();
+ MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
}
}
@@ -436,21 +435,25 @@
IHyracksClientConnection hcc) throws AsterixException, Exception {
ProgressState progress = ProgressState.NO_PROGRESS;
+ DatasetDecl dd = (DatasetDecl) stmt;
+ String dataverseName = getActiveDataverseName(dd.getDataverse());
+ String datasetName = dd.getName().getValue();
+ DatasetType dsType = dd.getDatasetType();
+ String itemTypeName = dd.getItemTypeName().getValue();
+ Identifier ngNameId = dd.getDatasetDetailsDecl().getNodegroupName();
+ String nodegroupName = getNodeGroupName(ngNameId, dd, dataverseName);
+ String compactionPolicy = dd.getDatasetDetailsDecl().getCompactionPolicy();
+ Map<String, String> compactionPolicyProperties = dd.getDatasetDetailsDecl().getCompactionPolicyProperties();
+ boolean defaultCompactionPolicy = (compactionPolicy == null);
+
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireWriteLatch();
- String dataverseName = null;
- String datasetName = null;
+ MetadataLockManager.INSTANCE.createDatasetBegin(dataverseName, dataverseName + "." + itemTypeName,
+ nodegroupName, compactionPolicy, dataverseName + "." + datasetName, defaultCompactionPolicy);
Dataset dataset = null;
try {
- DatasetDecl dd = (DatasetDecl) stmt;
- dataverseName = getActiveDataverseName(dd.getDataverse());
- datasetName = dd.getName().getValue();
-
- DatasetType dsType = dd.getDatasetType();
- String itemTypeName = dd.getItemTypeName().getValue();
IDatasetDetails datasetDetails = null;
Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
@@ -480,13 +483,8 @@
ARecordType aRecordType = (ARecordType) itemType;
aRecordType.validatePartitioningExpressions(partitioningExprs, autogenerated);
- Identifier ngNameId = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName();
String ngName = ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd,
dataverseName, mdTxnCtx);
-
- String compactionPolicy = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getCompactionPolicy();
- Map<String, String> compactionPolicyProperties = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
- .getCompactionPolicyProperties();
if (compactionPolicy == null) {
compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
@@ -505,12 +503,9 @@
case EXTERNAL: {
String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
- Identifier ngNameId = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName();
+
String ngName = ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd,
dataverseName, mdTxnCtx);
- String compactionPolicy = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getCompactionPolicy();
- Map<String, String> compactionPolicyProperties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl())
- .getCompactionPolicyProperties();
if (compactionPolicy == null) {
compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
@@ -605,7 +600,20 @@
throw e;
} finally {
- releaseWriteLatch();
+ MetadataLockManager.INSTANCE.createDatasetEnd(dataverseName, dataverseName + "." + itemTypeName,
+ nodegroupName, compactionPolicy, dataverseName + "." + datasetName, defaultCompactionPolicy);
+ }
+ }
+
+ private String getNodeGroupName(Identifier ngNameId, DatasetDecl dd, String dataverse) {
+ if (ngNameId != null) {
+ return ngNameId.getValue();
+ }
+ String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
+ if (hintValue == null) {
+ return MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
+ } else {
+ return (dataverse + ":" + dd.getName().getValue());
}
}
@@ -664,13 +672,16 @@
private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
ProgressState progress = ProgressState.NO_PROGRESS;
+ CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
+ String dataverseName = getActiveDataverseName(stmtCreateIndex.getDataverseName());
+ String datasetName = stmtCreateIndex.getDatasetName().getValue();
+
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireWriteLatch();
- String dataverseName = null;
- String datasetName = null;
+ MetadataLockManager.INSTANCE.createIndexBegin(dataverseName, dataverseName + "." + datasetName);
+
String indexName = null;
JobSpecification spec = null;
Dataset ds = null;
@@ -681,9 +692,6 @@
Index filesIndex = null;
boolean datasetLocked = false;
try {
- CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
- dataverseName = getActiveDataverseName(stmtCreateIndex.getDataverseName());
- datasetName = stmtCreateIndex.getDatasetName().getValue();
ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
datasetName);
@@ -738,13 +746,24 @@
if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) {
throw new AlgebricksException("external dataset index name is invalid");
}
- // lock external dataset
- ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds);
- datasetLocked = true;
+
// Check if the files index exist
filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
firstExternalDatasetIndex = (filesIndex == null);
+ // lock external dataset
+ ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
+ datasetLocked = true;
+ if (firstExternalDatasetIndex) {
+ // verify that no one has created an index before we acquire the lock
+ filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+ dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
+ if (filesIndex != null) {
+ ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
+ firstExternalDatasetIndex = false;
+ ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
+ }
+ }
if (firstExternalDatasetIndex) {
// Get snapshot from External File System
externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
@@ -914,23 +933,22 @@
}
throw e;
} finally {
- releaseWriteLatch();
+ MetadataLockManager.INSTANCE.createIndexEnd(dataverseName, dataverseName + "." + datasetName);
if (datasetLocked) {
- ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds);
+ ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
}
}
}
private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
-
+ TypeDecl stmtCreateType = (TypeDecl) stmt;
+ String dataverseName = getActiveDataverseName(stmtCreateType.getDataverseName());
+ String typeName = stmtCreateType.getIdent().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireWriteLatch();
-
+ MetadataLockManager.INSTANCE.createTypeBegin(dataverseName, dataverseName + "." + typeName);
try {
- TypeDecl stmtCreateType = (TypeDecl) stmt;
- String dataverseName = getActiveDataverseName(stmtCreateType.getDataverseName());
- String typeName = stmtCreateType.getIdent().getValue();
+
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
if (dv == null) {
throw new AlgebricksException("Unknown dataverse " + dataverseName);
@@ -956,24 +974,22 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- releaseWriteLatch();
+ MetadataLockManager.INSTANCE.createTypeEnd(dataverseName, dataverseName + "." + typeName);
}
}
private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
+ DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
+ String dataverseName = stmtDelete.getDataverseName().getValue();
ProgressState progress = ProgressState.NO_PROGRESS;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireWriteLatch();
-
- String dataverseName = null;
+ MetadataLockManager.INSTANCE.acquireDataverseWriteLock(dataverseName);
List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
try {
- DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
- dataverseName = stmtDelete.getDataverseName().getValue();
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
if (dv == null) {
@@ -1041,6 +1057,7 @@
datasets.get(j)));
}
}
+ ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j));
}
}
jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
@@ -1069,7 +1086,6 @@
if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
activeDefaultDataverse = null;
}
- ExternalDatasetsRegistry.INSTANCE.removeDataverse(dv);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
if (bActiveTxn) {
@@ -1107,26 +1123,24 @@
throw e;
} finally {
- releaseWriteLatch();
+ MetadataLockManager.INSTANCE.releaseDataverseWriteLock(dataverseName);
}
}
private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
+ DropStatement stmtDelete = (DropStatement) stmt;
+ String dataverseName = getActiveDataverseName(stmtDelete.getDataverseName());
+ String datasetName = stmtDelete.getDatasetName().getValue();
ProgressState progress = ProgressState.NO_PROGRESS;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireWriteLatch();
- String dataverseName = null;
- String datasetName = null;
+ MetadataLockManager.INSTANCE.dropDatasetBegin(dataverseName, dataverseName + "." + datasetName);
List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
try {
- DropStatement stmtDelete = (DropStatement) stmt;
- dataverseName = getActiveDataverseName(stmtDelete.getDataverseName());
- datasetName = stmtDelete.getDatasetName().getValue();
Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
if (ds == null) {
@@ -1194,6 +1208,7 @@
metadataProvider.setMetadataTxnContext(mdTxnCtx);
} else {
// External dataset
+ ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
//#. prepare jobs to drop the datatset and the indexes in NC
List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
for (int j = 0; j < indexes.size(); j++) {
@@ -1275,28 +1290,28 @@
throw e;
} finally {
- releaseWriteLatch();
+ MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, dataverseName + "." + datasetName);
}
}
private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
+ IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
+ String datasetName = stmtIndexDrop.getDatasetName().getValue();
+ String dataverseName = getActiveDataverseName(stmtIndexDrop.getDataverseName());
ProgressState progress = ProgressState.NO_PROGRESS;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireWriteLatch();
- String dataverseName = null;
- String datasetName = null;
+ MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, dataverseName + "." + datasetName);
+
String indexName = null;
+ // For external index
boolean dropFilesIndex = false;
List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
try {
- IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
- datasetName = stmtIndexDrop.getDatasetName().getValue();
- dataverseName = getActiveDataverseName(stmtIndexDrop.getDataverseName());
Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
if (ds == null) {
@@ -1465,20 +1480,21 @@
throw e;
} finally {
- releaseWriteLatch();
+ MetadataLockManager.INSTANCE.dropIndexEnd(dataverseName, dataverseName + "." + datasetName);
}
}
private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
+ TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
+ String dataverseName = getActiveDataverseName(stmtTypeDrop.getDataverseName());
+ String typeName = stmtTypeDrop.getTypeName().getValue();
+
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireWriteLatch();
+ MetadataLockManager.INSTANCE.dropTypeBegin(dataverseName, dataverseName + "." + typeName);
try {
- TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
- String dataverseName = getActiveDataverseName(stmtTypeDrop.getDataverseName());
- String typeName = stmtTypeDrop.getTypeName().getValue();
Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
if (dt == null) {
if (!stmtTypeDrop.getIfExists())
@@ -1491,19 +1507,18 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- releaseWriteLatch();
+ MetadataLockManager.INSTANCE.dropTypeEnd(dataverseName, dataverseName + "." + typeName);
}
}
private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
+ NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
+ String nodegroupName = stmtDelete.getNodeGroupName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireWriteLatch();
-
+ MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(nodegroupName);
try {
- NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
- String nodegroupName = stmtDelete.getNodeGroupName().getValue();
NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
if (ng == null) {
if (!stmtDelete.getIfExists())
@@ -1517,25 +1532,27 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- releaseWriteLatch();
+ MetadataLockManager.INSTANCE.releaseNodeGroupWriteLock(nodegroupName);
}
}
private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
+ CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
+ String dataverse = getActiveDataverseName(cfs.getSignature().getNamespace());
+ String functionName = cfs.getaAterixFunction().getName();
+
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireWriteLatch();
-
+ MetadataLockManager.INSTANCE.functionStatementBegin(dataverse, dataverse + "." + functionName);
try {
- CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
- String dataverse = getActiveDataverseName(cfs.getSignature().getNamespace());
+
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
if (dv == null) {
throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
}
- Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
- .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
- Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
+ Function function = new Function(dataverse, functionName, cfs.getaAterixFunction().getArity(),
+ cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(), Function.LANGUAGE_AQL,
+ FunctionKind.SCALAR.toString());
MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1543,18 +1560,18 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- releaseWriteLatch();
+ MetadataLockManager.INSTANCE.functionStatementEnd(dataverse, dataverse + "." + functionName);
}
}
private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
+ FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
+ FunctionSignature signature = stmtDropFunction.getFunctionSignature();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireWriteLatch();
-
+ MetadataLockManager.INSTANCE.functionStatementBegin(signature.getNamespace(), signature.getNamespace() + "."
+ + signature.getName());
try {
- FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
- FunctionSignature signature = stmtDropFunction.getFunctionSignature();
Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
if (function == null) {
if (!stmtDropFunction.getIfExists())
@@ -1567,24 +1584,25 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- releaseWriteLatch();
+ MetadataLockManager.INSTANCE.functionStatementEnd(signature.getNamespace(), signature.getNamespace() + "."
+ + signature.getName());
}
}
private void handleLoadStatement(AqlMetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc)
throws Exception {
+ LoadStatement loadStmt = (LoadStatement) stmt;
+ String dataverseName = getActiveDataverseName(loadStmt.getDataverseName());
+ String datasetName = loadStmt.getDatasetName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireReadLatch();
+ MetadataLockManager.INSTANCE.modifyDatasetBegin(dataverseName, dataverseName + "." + datasetName);
List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
try {
- LoadStatement loadStmt = (LoadStatement) stmt;
- String dataverseName = getActiveDataverseName(loadStmt.getDataverseName());
- CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt
- .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
- loadStmt.dataIsAlreadySorted());
+ CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, datasetName,
+ loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
@@ -1615,25 +1633,27 @@
throw e;
} finally {
- releaseReadLatch();
+ MetadataLockManager.INSTANCE.modifyDatasetEnd(dataverseName, dataverseName + "." + datasetName);
}
}
private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
+ InsertStatement stmtInsert = (InsertStatement) stmt;
+ String dataverseName = getActiveDataverseName(stmtInsert.getDataverseName());
+ Query query = stmtInsert.getQuery();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireReadLatch();
+ MetadataLockManager.INSTANCE.insertDeleteBegin(dataverseName,
+ dataverseName + "." + stmtInsert.getDatasetName(), query.getDataverses(), query.getDatasets());
try {
metadataProvider.setWriteTransaction(true);
- InsertStatement stmtInsert = (InsertStatement) stmt;
- String dataverseName = getActiveDataverseName(stmtInsert.getDataverseName());
CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
- .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
- JobSpecification compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+ .getValue(), query, stmtInsert.getVarCounter());
+ JobSpecification compiled = rewriteCompileQuery(metadataProvider, query, clfrqs);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
@@ -1648,22 +1668,25 @@
}
throw e;
} finally {
- releaseReadLatch();
+ MetadataLockManager.INSTANCE.insertDeleteEnd(dataverseName,
+ dataverseName + "." + stmtInsert.getDatasetName(), query.getDataverses(), query.getDatasets());
}
}
private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
+ DeleteStatement stmtDelete = (DeleteStatement) stmt;
+ String dataverseName = getActiveDataverseName(stmtDelete.getDataverseName());
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireReadLatch();
+ MetadataLockManager.INSTANCE
+ .insertDeleteBegin(dataverseName, dataverseName + "." + stmtDelete.getDatasetName(),
+ stmtDelete.getDataverses(), stmtDelete.getDatasets());
try {
metadataProvider.setWriteTransaction(true);
- DeleteStatement stmtDelete = (DeleteStatement) stmt;
- String dataverseName = getActiveDataverseName(stmtDelete.getDataverseName());
CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getVarCounter(),
metadataProvider);
@@ -1682,7 +1705,9 @@
}
throw e;
} finally {
- releaseReadLatch();
+ MetadataLockManager.INSTANCE.insertDeleteEnd(dataverseName,
+ dataverseName + "." + stmtDelete.getDatasetName(), stmtDelete.getDataverses(),
+ stmtDelete.getDatasets());
}
}
@@ -1706,18 +1731,16 @@
private void handleCreateFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
+ CreateFeedStatement cfs = (CreateFeedStatement) stmt;
+ String dataverseName = getActiveDataverseName(cfs.getDataverseName());
+ String feedName = cfs.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireWriteLatch();
+ MetadataLockManager.INSTANCE.createFeedBegin(dataverseName, dataverseName + "." + feedName);
- String dataverseName = null;
- String feedName = null;
String adaptorName = null;
Feed feed = null;
try {
- CreateFeedStatement cfs = (CreateFeedStatement) stmt;
- dataverseName = getActiveDataverseName(cfs.getDataverseName());
- feedName = cfs.getFeedName().getValue();
adaptorName = cfs.getAdaptorName();
feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName, feedName);
@@ -1738,20 +1761,20 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- releaseWriteLatch();
+ MetadataLockManager.INSTANCE.createFeedEnd(dataverseName, dataverseName + "." + feedName);
}
}
private void handleDropFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
+ FeedDropStatement stmtFeedDrop = (FeedDropStatement) stmt;
+ String dataverseName = getActiveDataverseName(stmtFeedDrop.getDataverseName());
+ String feedName = stmtFeedDrop.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireWriteLatch();
+ MetadataLockManager.INSTANCE.dropFeedBegin(dataverseName, dataverseName + "." + feedName);
try {
- FeedDropStatement stmtFeedDrop = (FeedDropStatement) stmt;
- String dataverseName = getActiveDataverseName(stmtFeedDrop.getDataverseName());
- String feedName = stmtFeedDrop.getFeedName().getValue();
Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedName);
if (feed == null) {
if (!stmtFeedDrop.getIfExists()) {
@@ -1785,21 +1808,26 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- releaseWriteLatch();
+ MetadataLockManager.INSTANCE.dropFeedEnd(dataverseName, dataverseName + "." + feedName);
}
}
private void handleConnectFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
+ ConnectFeedStatement cfs = (ConnectFeedStatement) stmt;
+ String dataverseName = getActiveDataverseName(cfs.getDataverseName());
+ String feedName = cfs.getFeedName();
+ String datasetName = cfs.getDatasetName().getValue();
+
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireReadLatch();
+ MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName, dataverseName
+ + "." + feedName);
boolean readLatchAcquired = true;
try {
- ConnectFeedStatement cfs = (ConnectFeedStatement) stmt;
- String dataverseName = getActiveDataverseName(cfs.getDataverseName());
+
metadataProvider.setWriteTransaction(true);
CompiledConnectFeedStatement cbfs = new CompiledConnectFeedStatement(dataverseName, cfs.getFeedName(), cfs
@@ -1859,7 +1887,8 @@
boolean waitForCompletion = waitForCompletionParam == null ? false : Boolean
.valueOf(waitForCompletionParam);
if (waitForCompletion) {
- releaseReadLatch();
+ MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
+ dataverseName + "." + feedName);
readLatchAcquired = false;
}
runJob(hcc, newJobSpec, waitForCompletion);
@@ -1870,23 +1899,24 @@
throw e;
} finally {
if (readLatchAcquired) {
- releaseReadLatch();
+ MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
+ dataverseName + "." + feedName);
}
}
}
private void handleDisconnectFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
+ DisconnectFeedStatement cfs = (DisconnectFeedStatement) stmt;
+ String dataverseName = getActiveDataverseName(cfs.getDataverseName());
+ String datasetName = cfs.getDatasetName().getValue();
+
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireReadLatch();
-
+ MetadataLockManager.INSTANCE.disconnectFeedBegin(dataverseName, dataverseName + "." + datasetName,
+ dataverseName + "." + cfs.getFeedName());
try {
- DisconnectFeedStatement cfs = (DisconnectFeedStatement) stmt;
- String dataverseName = getActiveDataverseName(cfs.getDataverseName());
-
- String datasetName = cfs.getDatasetName().getValue();
Dataset dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
dataverseName, cfs.getDatasetName().getValue());
if (dataset == null) {
@@ -1925,24 +1955,23 @@
}
throw e;
} finally {
- releaseReadLatch();
+ MetadataLockManager.INSTANCE.disconnectFeedEnd(dataverseName, dataverseName + "." + datasetName,
+ dataverseName + "." + cfs.getFeedName());
}
}
private void handleCompactStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
+ CompactStatement compactStatement = (CompactStatement) stmt;
+ String dataverseName = getActiveDataverseName(compactStatement.getDataverseName());
+ String datasetName = compactStatement.getDatasetName().getValue();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireReadLatch();
+ MetadataLockManager.INSTANCE.compactBegin(dataverseName, dataverseName + "." + datasetName);
- String dataverseName = null;
- String datasetName = null;
List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
try {
- CompactStatement compactStatement = (CompactStatement) stmt;
- dataverseName = getActiveDataverseName(compactStatement.getDataverseName());
- datasetName = compactStatement.getDatasetName().getValue();
Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
if (ds == null) {
throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
@@ -1991,7 +2020,7 @@
}
throw e;
} finally {
- releaseReadLatch();
+ MetadataLockManager.INSTANCE.compactEnd(dataverseName, dataverseName + "." + datasetName);
}
}
@@ -2001,7 +2030,7 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireReadLatch();
+ MetadataLockManager.INSTANCE.queryBegin(activeDefaultDataverse, query.getDataverses(), query.getDatasets());
JobSpecification compiled = null;
try {
compiled = rewriteCompileQuery(metadataProvider, query, null);
@@ -2083,21 +2112,22 @@
}
throw e;
} finally {
- releaseReadLatch();
- // release locks aquired during compilation of the query
+ MetadataLockManager.INSTANCE.queryEnd(query.getDataverses(), query.getDatasets());
+ // release external datasets' locks acquired during compilation of the query
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
}
}
private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
+ NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
+ String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
+
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireWriteLatch();
+ MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(ngName);
try {
- NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
- String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
if (ng != null) {
if (!stmtCreateNodegroup.getIfNotExists())
@@ -2115,20 +2145,20 @@
abort(e, e, mdTxnCtx);
throw e;
} finally {
- releaseWriteLatch();
+ MetadataLockManager.INSTANCE.releaseNodeGroupWriteLock(ngName);
}
}
private void handleExternalDatasetRefreshStatement(AqlMetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc) throws Exception {
RefreshExternalDatasetStatement stmtRefresh = (RefreshExternalDatasetStatement) stmt;
+ String dataverseName = getActiveDataverseName(stmtRefresh.getDataverseName());
+ String datasetName = stmtRefresh.getDatasetName().getValue();
ExternalDatasetTransactionState transactionState = ExternalDatasetTransactionState.COMMIT;
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- acquireWriteLatch();
+ MetadataLockManager.INSTANCE.refreshDatasetBegin(dataverseName, dataverseName + "." + datasetName);
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- String dataverseName = null;
- String datasetName = null;
JobSpecification spec = null;
Dataset ds = null;
List<ExternalFile> metadataFiles = null;
@@ -2140,8 +2170,6 @@
boolean lockAquired = false;
boolean success = false;
try {
- dataverseName = getActiveDataverseName(stmtRefresh.getDataverseName());
- datasetName = stmtRefresh.getDatasetName().getValue();
ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
datasetName);
@@ -2338,10 +2366,10 @@
}
}
} finally {
- releaseWriteLatch();
if (lockAquired) {
ExternalDatasetsRegistry.INSTANCE.refreshEnd(ds, success);
}
+ MetadataLockManager.INSTANCE.refreshDatasetEnd(dataverseName, dataverseName + "." + datasetName);
}
}
@@ -2393,22 +2421,6 @@
return getActiveDataverseName(dataverse != null ? dataverse.getValue() : null);
}
- private void acquireWriteLatch() {
- MetadataManager.INSTANCE.acquireWriteLatch();
- }
-
- private void releaseWriteLatch() {
- MetadataManager.INSTANCE.releaseWriteLatch();
- }
-
- private void acquireReadLatch() {
- MetadataManager.INSTANCE.acquireReadLatch();
- }
-
- private void releaseReadLatch() {
- MetadataManager.INSTANCE.releaseReadLatch();
- }
-
private void abort(Exception rootE, Exception parentE, MetadataTransactionContext mdTxnCtx) {
try {
if (IS_DEBUG_MODE) {
@@ -2420,4 +2432,4 @@
throw new IllegalStateException(rootE);
}
}
-}
\ No newline at end of file
+}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/DeleteStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/DeleteStatement.java
index ded6946..f50b91e 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/DeleteStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/DeleteStatement.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.asterix.aql.expression;
+import java.util.List;
+
import edu.uci.ics.asterix.aql.base.Expression;
import edu.uci.ics.asterix.aql.base.Statement;
import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
@@ -27,14 +29,18 @@
private Identifier datasetName;
private Expression condition;
private int varCounter;
+ private List<String> dataverses;
+ private List<String> datasets;
public DeleteStatement(VariableExpr vars, Identifier dataverseName, Identifier datasetName, Expression condition,
- int varCounter) {
+ int varCounter, List<String> dataverses, List<String> datasets) {
this.vars = vars;
this.dataverseName = dataverseName;
this.datasetName = datasetName;
this.condition = condition;
this.varCounter = varCounter;
+ this.dataverses = dataverses;
+ this.datasets = datasets;
}
@Override
@@ -72,4 +78,12 @@
visitor.visit(this, arg);
}
+ public List<String> getDataverses() {
+ return dataverses;
+ }
+
+ public List<String> getDatasets() {
+ return datasets;
+ }
+
}
\ No newline at end of file
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ExternalDetailsDecl.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ExternalDetailsDecl.java
index 5f58a38..a22d4b4 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ExternalDetailsDecl.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ExternalDetailsDecl.java
@@ -17,36 +17,38 @@
import java.util.Map;
public class ExternalDetailsDecl implements IDatasetDetailsDecl {
- private Map<String, String> properties;
- private String adapter;
- private Identifier nodegroupName;
- private String compactionPolicy;
+ private Map<String, String> properties;
+ private String adapter;
+ private Identifier nodegroupName;
+ private String compactionPolicy;
private Map<String, String> compactionPolicyProperties;
- public void setAdapter(String adapter) {
- this.adapter = adapter;
- }
+ public void setAdapter(String adapter) {
+ this.adapter = adapter;
+ }
- public void setProperties(Map<String, String> properties) {
- this.properties = properties;
- }
+ public void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
- public String getAdapter() {
- return adapter;
- }
+ public String getAdapter() {
+ return adapter;
+ }
- public Map<String, String> getProperties() {
- return properties;
- }
+ public Map<String, String> getProperties() {
+ return properties;
+ }
- public Identifier getNodegroupName() {
- return nodegroupName;
- }
+ @Override
+ public Identifier getNodegroupName() {
+ return nodegroupName;
+ }
- public void setNodegroupName(Identifier nodegroupName) {
- this.nodegroupName = nodegroupName;
- }
+ public void setNodegroupName(Identifier nodegroupName) {
+ this.nodegroupName = nodegroupName;
+ }
+ @Override
public String getCompactionPolicy() {
return compactionPolicy;
}
@@ -55,6 +57,7 @@
this.compactionPolicy = compactionPolicy;
}
+ @Override
public Map<String, String> getCompactionPolicyProperties() {
return compactionPolicyProperties;
}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/IDatasetDetailsDecl.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/IDatasetDetailsDecl.java
index 0f9a74c..51f2a1b 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/IDatasetDetailsDecl.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/IDatasetDetailsDecl.java
@@ -14,6 +14,14 @@
*/
package edu.uci.ics.asterix.aql.expression;
+import java.util.Map;
+
public interface IDatasetDetailsDecl {
+ public Identifier getNodegroupName();
+
+ public String getCompactionPolicy();
+
+ public Map<String, String> getCompactionPolicyProperties();
+
}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/InternalDetailsDecl.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/InternalDetailsDecl.java
index 3be08b2..fc71a8f 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/InternalDetailsDecl.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/InternalDetailsDecl.java
@@ -42,6 +42,7 @@
return partitioningExprs;
}
+ @Override
public Identifier getNodegroupName() {
return nodegroupName;
}
@@ -50,10 +51,12 @@
return autogenerated;
}
+ @Override
public String getCompactionPolicy() {
return compactionPolicy;
}
+ @Override
public Map<String, String> getCompactionPolicyProperties() {
return compactionPolicyProperties;
}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/Query.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/Query.java
index b8538b8..665f3b7 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/Query.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/Query.java
@@ -14,6 +14,9 @@
*/
package edu.uci.ics.asterix.aql.expression;
+import java.util.ArrayList;
+import java.util.List;
+
import edu.uci.ics.asterix.aql.base.Expression;
import edu.uci.ics.asterix.aql.base.Statement;
import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
@@ -23,6 +26,8 @@
public class Query implements Statement {
private Expression body;
private int varCounter;
+ private List<String> dataverses = new ArrayList<String>();
+ private List<String> datasets = new ArrayList<String>();
public Expression getBody() {
return body;
@@ -54,4 +59,20 @@
public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
return visitor.visitQuery(this, arg);
}
+
+ public void setDataverses(List<String> dataverses) {
+ this.dataverses = dataverses;
+ }
+
+ public void setDatasets(List<String> datasets) {
+ this.datasets = datasets;
+ }
+
+ public List<String> getDataverses() {
+ return dataverses;
+ }
+
+ public List<String> getDatasets() {
+ return datasets;
+ }
}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/parser/ScopeChecker.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/parser/ScopeChecker.java
index d95596b..12c5de6 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/parser/ScopeChecker.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/parser/ScopeChecker.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.asterix.aql.parser;
+import java.util.List;
import java.util.Stack;
import edu.uci.ics.asterix.aql.context.Scope;
@@ -33,6 +34,9 @@
protected String defaultDataverse;
+ private List<String> dataverses;
+ private List<String> datasets;
+
protected void setInput(String s) {
inputLines = s.split("\n");
}
@@ -174,4 +178,31 @@
return extract.toString().trim();
}
+ public void addDataverse(String dataverseName) {
+ if (dataverses != null) {
+ dataverses.add(dataverseName);
+ }
+ }
+
+ public void addDataset(String datasetName) {
+ if (datasets != null) {
+ datasets.add(datasetName);
+ }
+ }
+
+ public void setDataverses(List<String> dataverses) {
+ this.dataverses = dataverses;
+ }
+
+ public void setDatasets(List<String> datasets) {
+ this.datasets = datasets;
+ }
+
+ public List<String> getDataverses() {
+ return dataverses;
+ }
+
+ public List<String> getDatasets() {
+ return datasets;
+ }
}
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index 430ea7d..bb78860 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -629,7 +629,7 @@
}
// TODO use fctName.library
- String fqFunctionName = fctName.library == null ? fctName.function : fctName.library + "#" + fctName.function;
+ String fqFunctionName = fctName.library == null ? fctName.function : fctName.library + "#" + fctName.function;
return new FunctionSignature(fctName.dataverse, fqFunctionName, arity);
}
}
@@ -731,6 +731,10 @@
VariableExpr var = null;
Expression condition = null;
Pair<Identifier, Identifier> nameComponents;
+ // This is related to the new metadata lock management
+ setDataverses(new ArrayList<String>());
+ setDatasets(new ArrayList<String>());
+
}
{
"delete" var = Variable()
@@ -740,7 +744,14 @@
"from" <DATASET> nameComponents = QualifiedName()
(<WHERE> condition = Expression())?
{
- return new DeleteStatement(var, nameComponents.first, nameComponents.second, condition, getVarCounter());
+ // First we get the dataverses and datasets that we want to lock
+ List<String> dataverses = getDataverses();
+ List<String> datasets = getDatasets();
+ // we remove the pointer to the dataverses and datasets
+ setDataverses(null);
+ setDatasets(null);
+ return new DeleteStatement(var, nameComponents.first, nameComponents.second,
+ condition, getVarCounter(), dataverses, datasets);
}
}
@@ -1233,6 +1244,9 @@
Query Query() throws ParseException:
{
Query query = new Query();
+ // we set the pointers to the dataverses and datasets lists to fill them with entities to be locked
+ setDataverses(query.getDataverses());
+ setDatasets(query.getDatasets());
Expression expr;
}
{
@@ -1240,6 +1254,9 @@
{
query.setBody(expr);
query.setVarCounter(getVarCounter());
+ // we remove the pointers to the locked entities before we return the query object
+ setDataverses(null);
+ setDatasets(null);
return query;
}
@@ -1833,6 +1850,12 @@
LiteralExpr ds = new LiteralExpr();
ds.setValue( new StringLiteral(name) );
nameArg = ds;
+ if(arg2 != null){
+ addDataverse(arg1.toString());
+ addDataset(name);
+ } else {
+ addDataset(defaultDataverse + "." + name);
+ }
}
| ( <LEFTPAREN> nameArg = Expression() <RIGHTPAREN> ) )
{
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetLock.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetLock.java
new file mode 100644
index 0000000..6ca5c42
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetLock.java
@@ -0,0 +1,88 @@
+package edu.uci.ics.asterix.metadata.utils;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+
+public class DatasetLock {
+
+ private ReentrantReadWriteLock dsLock;
+ private ReentrantReadWriteLock dsModifyLock;
+ private AMutableInt32 indexBuildCounter;
+
+ public DatasetLock() {
+ dsLock = new ReentrantReadWriteLock(true);
+ dsModifyLock = new ReentrantReadWriteLock(true);
+ indexBuildCounter = new AMutableInt32(0);
+ }
+
+ public void acquireReadLock() {
+ // query
+ // build index
+ // insert
+ dsLock.readLock().lock();
+ }
+
+ public void releaseReadLock() {
+ // query
+ // build index
+ // insert
+ dsLock.readLock().unlock();
+ }
+
+ public void acquireWriteLock() {
+ // create ds
+ // delete ds
+ // drop index
+ dsLock.writeLock().lock();
+ }
+
+ public void releaseWriteLock() {
+ // create ds
+ // delete ds
+ // drop index
+ dsLock.writeLock().unlock();
+ }
+
+ public void acquireReadModifyLock() {
+ // insert
+ dsModifyLock.readLock().lock();
+ }
+
+ public void releaseReadModifyLock() {
+ // insert
+ dsModifyLock.readLock().unlock();
+ }
+
+ public void acquireWriteModifyLock() {
+ // Build index statement
+ synchronized (indexBuildCounter) {
+ if (indexBuildCounter.getIntegerValue() > 0) {
+ indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() + 1);
+ } else {
+ dsModifyLock.writeLock().lock();
+ indexBuildCounter.setValue(1);
+ }
+ }
+ }
+
+ public void releaseWriteModifyLock() {
+ // Build index statement
+ synchronized (indexBuildCounter) {
+ if (indexBuildCounter.getIntegerValue() == 1) {
+ dsModifyLock.writeLock().unlock();
+ }
+ indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() - 1);
+ }
+ }
+
+ public void acquireRefreshLock() {
+ // Refresh External Dataset statement
+ dsModifyLock.writeLock().lock();
+ }
+
+ public void releaseRefreshLock() {
+ // Refresh External Dataset statement
+ dsModifyLock.writeLock().unlock();
+ }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetAccessManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetAccessManager.java
index 79d695c..b6ab546 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetAccessManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetAccessManager.java
@@ -67,13 +67,21 @@
datasetLock.writeLock().unlock();
}
- public int buildIndexBegin() {
- datasetLock.readLock().lock();
+ public synchronized int buildIndexBegin(boolean isFirstIndex) {
+ if (isFirstIndex) {
+ datasetLock.writeLock().lock();
+ } else {
+ datasetLock.readLock().lock();
+ }
return version;
}
- public void buildIndexEnd() {
- datasetLock.readLock().unlock();
+ public void buildIndexEnd(boolean isFirstIndex) {
+ if (isFirstIndex) {
+ datasetLock.writeLock().unlock();
+ } else {
+ datasetLock.readLock().unlock();
+ }
}
public int queryBegin() {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetsRegistry.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetsRegistry.java
index 84d25d3..528dd50 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetsRegistry.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetsRegistry.java
@@ -4,10 +4,10 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
-import edu.uci.ics.asterix.metadata.entities.Dataverse;
/**
* This is a singelton class used to maintain the version of each external dataset with indexes
@@ -17,10 +17,10 @@
*/
public class ExternalDatasetsRegistry {
public static ExternalDatasetsRegistry INSTANCE = new ExternalDatasetsRegistry();
- private HashMap<String, HashMap<String, ExternalDatasetAccessManager>> globalRegister;
+ private ConcurrentHashMap<String, ExternalDatasetAccessManager> globalRegister;
private ExternalDatasetsRegistry() {
- globalRegister = new HashMap<String, HashMap<String, ExternalDatasetAccessManager>>();
+ globalRegister = new ConcurrentHashMap<String, ExternalDatasetAccessManager>();
}
/**
@@ -30,23 +30,11 @@
* @return
*/
public int getDatasetVersion(Dataset dataset) {
- HashMap<String, ExternalDatasetAccessManager> dataverseRegister;
- ExternalDatasetAccessManager datasetAccessMgr;
- synchronized (this) {
- dataverseRegister = globalRegister.get(dataset.getDataverseName());
- if (dataverseRegister == null) {
- // Create a register for the dataverse, and put the dataset their with the initial value of 0
- dataverseRegister = new HashMap<String, ExternalDatasetAccessManager>();
- datasetAccessMgr = new ExternalDatasetAccessManager();
- dataverseRegister.put(dataset.getDatasetName(), datasetAccessMgr);
- globalRegister.put(dataset.getDataverseName(), dataverseRegister);
- } else {
- datasetAccessMgr = dataverseRegister.get(dataset.getDatasetName());
- if (datasetAccessMgr == null) {
- datasetAccessMgr = new ExternalDatasetAccessManager();
- dataverseRegister.put(dataset.getDatasetName(), datasetAccessMgr);
- }
- }
+ String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
+ ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key);
+ if (datasetAccessMgr == null) {
+ globalRegister.putIfAbsent(key, new ExternalDatasetAccessManager());
+ datasetAccessMgr = globalRegister.get(key);
}
return datasetAccessMgr.getVersion();
}
@@ -68,31 +56,10 @@
}
}
- HashMap<String, ExternalDatasetAccessManager> dataverseRegister;
- ExternalDatasetAccessManager datasetAccessMgr;
- dataverseRegister = globalRegister.get(dataset.getDataverseName());
- if (dataverseRegister == null) {
- synchronized (this) {
- // A second time synchronized just to make sure
- dataverseRegister = globalRegister.get(dataset.getDataverseName());
- if (dataverseRegister == null) {
- // Create a register for the dataverse, and put the dataset their with the initial value of 0
- dataverseRegister = new HashMap<String, ExternalDatasetAccessManager>();
- globalRegister.put(dataset.getDataverseName(), dataverseRegister);
- }
- }
- }
-
- datasetAccessMgr = dataverseRegister.get(dataset.getDatasetName());
+ ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(lockKey);
if (datasetAccessMgr == null) {
- synchronized (this) {
- // a second time synchronized just to make sure
- datasetAccessMgr = dataverseRegister.get(dataset.getDatasetName());
- if (datasetAccessMgr == null) {
- datasetAccessMgr = new ExternalDatasetAccessManager();
- dataverseRegister.put(dataset.getDatasetName(), datasetAccessMgr);
- }
- }
+ globalRegister.putIfAbsent(lockKey, new ExternalDatasetAccessManager());
+ datasetAccessMgr = globalRegister.get(lockKey);
}
// aquire the correct lock
@@ -102,74 +69,39 @@
}
public void refreshBegin(Dataset dataset) {
- HashMap<String, ExternalDatasetAccessManager> dataverseRegister;
- ExternalDatasetAccessManager datasetAccessMgr;
- synchronized (this) {
- dataverseRegister = globalRegister.get(dataset.getDataverseName());
- if (dataverseRegister == null) {
- // Create a register for the dataverse, and put the dataset their with the initial value of 0
- dataverseRegister = new HashMap<String, ExternalDatasetAccessManager>();
- datasetAccessMgr = new ExternalDatasetAccessManager();
- dataverseRegister.put(dataset.getDatasetName(), datasetAccessMgr);
- globalRegister.put(dataset.getDataverseName(), dataverseRegister);
- } else {
- datasetAccessMgr = dataverseRegister.get(dataset.getDatasetName());
- if (datasetAccessMgr == null) {
- datasetAccessMgr = new ExternalDatasetAccessManager();
- dataverseRegister.put(dataset.getDatasetName(), datasetAccessMgr);
- }
- }
+ String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
+ ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key);
+ if (datasetAccessMgr == null) {
+ datasetAccessMgr = globalRegister.put(key, new ExternalDatasetAccessManager());
}
// aquire the correct lock
datasetAccessMgr.refreshBegin();
}
- public synchronized void removeDatasetInfo(Dataset dataset) {
- HashMap<String, ExternalDatasetAccessManager> dataverseRegister = globalRegister
- .get(dataset.getDataverseName());
- if (dataverseRegister != null) {
- dataverseRegister.remove(dataset.getDatasetName());
- }
- }
-
- public synchronized void removeDataverse(Dataverse dataverse) {
- globalRegister.remove(dataverse.getDataverseName());
+ public void removeDatasetInfo(Dataset dataset) {
+ String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
+ globalRegister.remove(key);
}
public void refreshEnd(Dataset dataset, boolean success) {
- HashMap<String, ExternalDatasetAccessManager> dataverseRegistry = globalRegister
- .get(dataset.getDataverseName());
- ExternalDatasetAccessManager datasetLockManager = dataverseRegistry.get(dataset.getDatasetName());
- datasetLockManager.refreshEnd(success);
+ String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
+ globalRegister.get(key).refreshEnd(success);
}
- public void buildIndexBegin(Dataset dataset) {
- HashMap<String, ExternalDatasetAccessManager> dataverseRegister;
- ExternalDatasetAccessManager datasetAccessMgr;
- synchronized (this) {
- dataverseRegister = globalRegister.get(dataset.getDataverseName());
- if (dataverseRegister == null) {
- // Create a register for the dataverse, and put the dataset their with the initial value of 0
- dataverseRegister = new HashMap<String, ExternalDatasetAccessManager>();
- datasetAccessMgr = new ExternalDatasetAccessManager();
- dataverseRegister.put(dataset.getDatasetName(), datasetAccessMgr);
- globalRegister.put(dataset.getDataverseName(), dataverseRegister);
- } else {
- datasetAccessMgr = dataverseRegister.get(dataset.getDatasetName());
- if (datasetAccessMgr == null) {
- datasetAccessMgr = new ExternalDatasetAccessManager();
- dataverseRegister.put(dataset.getDatasetName(), datasetAccessMgr);
- }
- }
+ public void buildIndexBegin(Dataset dataset, boolean firstIndex) {
+ String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
+ ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key);
+ if (datasetAccessMgr == null) {
+ globalRegister.putIfAbsent(key, new ExternalDatasetAccessManager());
+ datasetAccessMgr = globalRegister.get(key);
}
- datasetAccessMgr.buildIndexBegin();
+ // aquire the correct lock
+ datasetAccessMgr.buildIndexBegin(firstIndex);
}
- public void buildIndexEnd(Dataset dataset) {
- HashMap<String, ExternalDatasetAccessManager> dataverseRegistry = globalRegister
- .get(dataset.getDataverseName());
- ExternalDatasetAccessManager datasetLockManager = dataverseRegistry.get(dataset.getDatasetName());
- datasetLockManager.buildIndexEnd();
+ public void buildIndexEnd(Dataset dataset, boolean firstIndex) {
+ String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
+ globalRegister.get(key).buildIndexEnd(firstIndex);
}
public void releaseAcquiredLocks(AqlMetadataProvider metadataProvider) {
@@ -180,12 +112,7 @@
// if dataset was accessed already by this job, return the registered version
Set<Entry<String, Integer>> aquiredLocks = locks.entrySet();
for (Entry<String, Integer> entry : aquiredLocks) {
- //Get dataverse name
- String dvName = entry.getKey().substring(0, entry.getKey().indexOf("."));
- String dsName = entry.getKey().substring(entry.getKey().indexOf(".") + 1);
- HashMap<String, ExternalDatasetAccessManager> dataverseRegistry = globalRegister.get(dvName);
- ExternalDatasetAccessManager datasetLockManager = dataverseRegistry.get(dsName);
- datasetLockManager.queryEnd(entry.getValue());
+ globalRegister.get(entry.getKey()).queryEnd(entry.getValue());
}
locks.clear();
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
new file mode 100644
index 0000000..02e40b2
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
@@ -0,0 +1,512 @@
+package edu.uci.ics.asterix.metadata.utils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import edu.uci.ics.asterix.metadata.entities.Dataverse;
+
+public class MetadataLockManager {
+
+ public static MetadataLockManager INSTANCE = new MetadataLockManager();
+ private ConcurrentHashMap<String, ReentrantReadWriteLock> dataversesLocks;
+ private ConcurrentHashMap<String, DatasetLock> datasetsLocks;
+ private ConcurrentHashMap<String, ReentrantReadWriteLock> functionsLocks;
+ private ConcurrentHashMap<String, ReentrantReadWriteLock> nodeGroupsLocks;
+ private ConcurrentHashMap<String, ReentrantReadWriteLock> feedsLocks;
+ private ConcurrentHashMap<String, ReentrantReadWriteLock> compactionPolicyLocks;
+ private ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks;
+
+ private MetadataLockManager() {
+ dataversesLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
+ datasetsLocks = new ConcurrentHashMap<String, DatasetLock>();
+ functionsLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
+ nodeGroupsLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
+ feedsLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
+ compactionPolicyLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
+ dataTypeLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
+ }
+
+ public void acquireDataverseReadLock(String dataverseName) {
+ ReentrantReadWriteLock dvLock = dataversesLocks.get(dataverseName);
+ if (dvLock == null) {
+ dataversesLocks.putIfAbsent(dataverseName, new ReentrantReadWriteLock());
+ dvLock = dataversesLocks.get(dataverseName);
+ }
+ dvLock.readLock().lock();
+ }
+
+ public void releaseDataverseReadLock(String dataverseName) {
+ dataversesLocks.get(dataverseName).readLock().unlock();
+ }
+
+ public void acquireDataverseWriteLock(String dataverseName) {
+ ReentrantReadWriteLock dvLock = dataversesLocks.get(dataverseName);
+ if (dvLock == null) {
+ dataversesLocks.putIfAbsent(dataverseName, new ReentrantReadWriteLock());
+ dvLock = dataversesLocks.get(dataverseName);
+ }
+ dvLock.writeLock().lock();
+ }
+
+ public void releaseDataverseWriteLock(String dataverseName) {
+ dataversesLocks.get(dataverseName).writeLock().unlock();
+ }
+
+ public void acquireDatasetReadLock(String datasetName) {
+ DatasetLock dsLock = datasetsLocks.get(datasetName);
+ if (dsLock == null) {
+ datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
+ dsLock = datasetsLocks.get(datasetName);
+ }
+ dsLock.acquireReadLock();
+ }
+
+ public void releaseDatasetReadLock(String datasetName) {
+ datasetsLocks.get(datasetName).releaseReadLock();
+ }
+
+ public void acquireDatasetWriteLock(String datasetName) {
+ DatasetLock dsLock = datasetsLocks.get(datasetName);
+ if (dsLock == null) {
+ datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
+ dsLock = datasetsLocks.get(datasetName);
+ }
+ dsLock.acquireWriteLock();
+ }
+
+ public void releaseDatasetWriteLock(String datasetName) {
+ datasetsLocks.get(datasetName).releaseWriteLock();
+ }
+
+ public void acquireDatasetModifyLock(String datasetName) {
+ DatasetLock dsLock = datasetsLocks.get(datasetName);
+ if (dsLock == null) {
+ datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
+ dsLock = datasetsLocks.get(datasetName);
+ }
+ dsLock.acquireReadLock();
+ dsLock.acquireReadModifyLock();
+ }
+
+ public void releaseDatasetModifyLock(String datasetName) {
+ DatasetLock dsLock = datasetsLocks.get(datasetName);
+ dsLock.releaseReadModifyLock();
+ dsLock.releaseReadLock();
+ }
+
+ public void acquireDatasetCreateIndexLock(String datasetName) {
+ DatasetLock dsLock = datasetsLocks.get(datasetName);
+ if (dsLock == null) {
+ datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
+ dsLock = datasetsLocks.get(datasetName);
+ }
+ dsLock.acquireReadLock();
+ dsLock.acquireWriteModifyLock();
+ }
+
+ public void releaseDatasetCreateIndexLock(String datasetName) {
+ DatasetLock dsLock = datasetsLocks.get(datasetName);
+ dsLock.releaseWriteModifyLock();
+ dsLock.releaseReadLock();
+ }
+
+ public void acquireExternalDatasetRefreshLock(String datasetName) {
+ DatasetLock dsLock = datasetsLocks.get(datasetName);
+ if (dsLock == null) {
+ datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
+ dsLock = datasetsLocks.get(datasetName);
+ }
+ dsLock.acquireReadLock();
+ dsLock.acquireRefreshLock();
+ }
+
+ public void releaseExternalDatasetRefreshLock(String datasetName) {
+ DatasetLock dsLock = datasetsLocks.get(datasetName);
+ dsLock.releaseRefreshLock();
+ dsLock.releaseReadLock();
+ }
+
+ public void acquireFunctionReadLock(String functionName) {
+ ReentrantReadWriteLock fLock = functionsLocks.get(functionName);
+ if (fLock == null) {
+ functionsLocks.putIfAbsent(functionName, new ReentrantReadWriteLock());
+ fLock = functionsLocks.get(functionName);
+ }
+ fLock.readLock().lock();
+ }
+
+ public void releaseFunctionReadLock(String functionName) {
+ functionsLocks.get(functionName).readLock().unlock();
+ }
+
+ public void acquireFunctionWriteLock(String functionName) {
+ ReentrantReadWriteLock fLock = functionsLocks.get(functionName);
+ if (fLock == null) {
+ functionsLocks.putIfAbsent(functionName, new ReentrantReadWriteLock());
+ fLock = functionsLocks.get(functionName);
+ }
+ fLock.writeLock().lock();
+ }
+
+ public void releaseFunctionWriteLock(String functionName) {
+ functionsLocks.get(functionName).writeLock().unlock();
+ }
+
+ public void acquireNodeGroupReadLock(String nodeGroupName) {
+ ReentrantReadWriteLock ngLock = nodeGroupsLocks.get(nodeGroupName);
+ if (ngLock == null) {
+ nodeGroupsLocks.putIfAbsent(nodeGroupName, new ReentrantReadWriteLock());
+ ngLock = nodeGroupsLocks.get(nodeGroupName);
+ }
+ ngLock.readLock().lock();
+ }
+
+ public void releaseNodeGroupReadLock(String nodeGroupName) {
+ nodeGroupsLocks.get(nodeGroupName).readLock().unlock();
+ }
+
+ public void acquireNodeGroupWriteLock(String nodeGroupName) {
+ ReentrantReadWriteLock ngLock = nodeGroupsLocks.get(nodeGroupName);
+ if (ngLock == null) {
+ nodeGroupsLocks.putIfAbsent(nodeGroupName, new ReentrantReadWriteLock());
+ ngLock = nodeGroupsLocks.get(nodeGroupName);
+ }
+ ngLock.writeLock().lock();
+ }
+
+ public void releaseNodeGroupWriteLock(String nodeGroupName) {
+ nodeGroupsLocks.get(nodeGroupName).writeLock().unlock();
+ }
+
+ public void acquireFeedReadLock(String feedName) {
+ ReentrantReadWriteLock fLock = feedsLocks.get(feedName);
+ if (fLock == null) {
+ feedsLocks.putIfAbsent(feedName, new ReentrantReadWriteLock());
+ fLock = feedsLocks.get(feedName);
+ }
+ fLock.readLock().lock();
+ }
+
+ public void releaseFeedReadLock(String feedName) {
+ feedsLocks.get(feedName).readLock().unlock();
+ }
+
+ public void acquireFeedWriteLock(String feedName) {
+ ReentrantReadWriteLock fLock = feedsLocks.get(feedName);
+ if (fLock == null) {
+ feedsLocks.putIfAbsent(feedName, new ReentrantReadWriteLock());
+ fLock = feedsLocks.get(feedName);
+ }
+ fLock.writeLock().lock();
+ }
+
+ public void releaseFeedWriteLock(String feedName) {
+ feedsLocks.get(feedName).writeLock().unlock();
+ }
+
+ public void acquireCompactionPolicyReadLock(String compactionPolicyName) {
+ ReentrantReadWriteLock compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
+ if (compactionPolicyLock == null) {
+ compactionPolicyLocks.putIfAbsent(compactionPolicyName, new ReentrantReadWriteLock());
+ compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
+ }
+ compactionPolicyLock.readLock().lock();
+ }
+
+ public void releaseCompactionPolicyReadLock(String compactionPolicyName) {
+ compactionPolicyLocks.get(compactionPolicyName).readLock().unlock();
+ }
+
+ public void acquireCompactionPolicyWriteLock(String compactionPolicyName) {
+ ReentrantReadWriteLock compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
+ if (compactionPolicyLock == null) {
+ compactionPolicyLocks.putIfAbsent(compactionPolicyName, new ReentrantReadWriteLock());
+ compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
+ }
+ compactionPolicyLock.writeLock().lock();
+ }
+
+ public void releaseCompactionPolicyWriteLock(String compactionPolicyName) {
+ compactionPolicyLocks.get(compactionPolicyName).writeLock().unlock();
+ }
+
+ public void acquireDataTypeReadLock(String dataTypeName) {
+ ReentrantReadWriteLock dataTypeLock = dataTypeLocks.get(dataTypeName);
+ if (dataTypeLock == null) {
+ dataTypeLocks.putIfAbsent(dataTypeName, new ReentrantReadWriteLock());
+ dataTypeLock = dataTypeLocks.get(dataTypeName);
+ }
+ dataTypeLock.readLock().lock();
+ }
+
+ public void releaseDataTypeReadLock(String dataTypeName) {
+ dataTypeLocks.get(dataTypeName).readLock().unlock();
+ }
+
+ public void acquireDataTypeWriteLock(String dataTypeName) {
+ ReentrantReadWriteLock dataTypeLock = dataTypeLocks.get(dataTypeName);
+ if (dataTypeLock == null) {
+ dataTypeLocks.putIfAbsent(dataTypeName, new ReentrantReadWriteLock());
+ dataTypeLock = dataTypeLocks.get(dataTypeName);
+ }
+ dataTypeLock.writeLock().lock();
+ }
+
+ public void releaseDataTypeWriteLock(String dataTypeName) {
+ dataTypeLocks.get(dataTypeName).writeLock().unlock();
+ }
+
+ public void createDatasetBegin(String dataverseName, String itemTypeFullyQualifiedName, String nodeGroupName,
+ String compactionPolicyName, String datasetFullyQualifiedName, boolean isDefaultCompactionPolicy) {
+ acquireDataverseReadLock(dataverseName);
+ acquireDataTypeReadLock(itemTypeFullyQualifiedName);
+ acquireNodeGroupReadLock(nodeGroupName);
+ if (!isDefaultCompactionPolicy) {
+ acquireCompactionPolicyReadLock(compactionPolicyName);
+ }
+ acquireDatasetWriteLock(datasetFullyQualifiedName);
+ }
+
+ public void createDatasetEnd(String dataverseName, String itemTypeFullyQualifiedName, String nodeGroupName,
+ String compactionPolicyName, String datasetFullyQualifiedName, boolean isDefaultCompactionPolicy) {
+ releaseDatasetWriteLock(datasetFullyQualifiedName);
+ if (!isDefaultCompactionPolicy) {
+ releaseCompactionPolicyReadLock(compactionPolicyName);
+ }
+ releaseNodeGroupReadLock(nodeGroupName);
+ releaseDataTypeReadLock(itemTypeFullyQualifiedName);
+ releaseDataverseReadLock(dataverseName);
+ }
+
+ public void createIndexBegin(String dataverseName, String datasetFullyQualifiedName) {
+ acquireDataverseReadLock(dataverseName);
+ acquireDatasetCreateIndexLock(datasetFullyQualifiedName);
+ }
+
+ public void createIndexEnd(String dataverseName, String datasetFullyQualifiedName) {
+ releaseDatasetCreateIndexLock(datasetFullyQualifiedName);
+ releaseDataverseReadLock(dataverseName);
+ }
+
+ public void createTypeBegin(String dataverseName, String itemTypeFullyQualifiedName) {
+ acquireDataverseReadLock(dataverseName);
+ acquireDataTypeWriteLock(itemTypeFullyQualifiedName);
+ }
+
+ public void createTypeEnd(String dataverseName, String itemTypeFullyQualifiedName) {
+ releaseDataTypeWriteLock(itemTypeFullyQualifiedName);
+ releaseDataverseReadLock(dataverseName);
+ }
+
+ public void dropDatasetBegin(String dataverseName, String datasetFullyQualifiedName) {
+ acquireDataverseReadLock(dataverseName);
+ acquireDatasetWriteLock(datasetFullyQualifiedName);
+ }
+
+ public void dropDatasetEnd(String dataverseName, String datasetFullyQualifiedName) {
+ releaseDatasetWriteLock(datasetFullyQualifiedName);
+ releaseDataverseReadLock(dataverseName);
+ }
+
+ public void dropIndexBegin(String dataverseName, String datasetFullyQualifiedName) {
+ acquireDataverseReadLock(dataverseName);
+ acquireDatasetWriteLock(datasetFullyQualifiedName);
+ }
+
+ public void dropIndexEnd(String dataverseName, String datasetFullyQualifiedName) {
+ releaseDatasetWriteLock(datasetFullyQualifiedName);
+ releaseDataverseReadLock(dataverseName);
+ }
+
+ public void dropTypeBegin(String dataverseName, String dataTypeFullyQualifiedName) {
+ acquireDataverseReadLock(dataverseName);
+ acquireDataTypeWriteLock(dataTypeFullyQualifiedName);
+ }
+
+ public void dropTypeEnd(String dataverseName, String dataTypeFullyQualifiedName) {
+ releaseDataTypeWriteLock(dataTypeFullyQualifiedName);
+ releaseDataverseReadLock(dataverseName);
+ }
+
+ public void functionStatementBegin(String dataverseName, String functionFullyQualifiedName) {
+ acquireDataverseReadLock(dataverseName);
+ acquireFunctionWriteLock(functionFullyQualifiedName);
+ }
+
+ public void functionStatementEnd(String dataverseName, String functionFullyQualifiedName) {
+ releaseFunctionWriteLock(functionFullyQualifiedName);
+ releaseDataverseReadLock(dataverseName);
+ }
+
+ public void modifyDatasetBegin(String dataverseName, String datasetFullyQualifiedName) {
+ acquireDataverseReadLock(dataverseName);
+ acquireDatasetModifyLock(datasetFullyQualifiedName);
+ }
+
+ public void modifyDatasetEnd(String dataverseName, String datasetFullyQualifiedName) {
+ releaseDatasetModifyLock(datasetFullyQualifiedName);
+ releaseDataverseReadLock(dataverseName);
+ }
+
+ public void insertDeleteBegin(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
+ List<String> datasets) {
+ dataverses.add(dataverseName);
+ datasets.add(datasetFullyQualifiedName);
+ Collections.sort(dataverses);
+ Collections.sort(datasets);
+
+ String previous = null;
+ for (int i = 0; i < dataverses.size(); i++) {
+ String current = dataverses.get(i);
+ if (!current.equals(previous)) {
+ acquireDataverseReadLock(current);
+ previous = current;
+ }
+ }
+
+ for (int i = 0; i < datasets.size(); i++) {
+ String current = datasets.get(i);
+ if (!current.equals(previous)) {
+ if (current.equals(datasetFullyQualifiedName)) {
+ acquireDatasetModifyLock(current);
+ } else {
+ acquireDatasetReadLock(current);
+ }
+ previous = current;
+ }
+ }
+ }
+
+ public void insertDeleteEnd(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
+ List<String> datasets) {
+ String previous = null;
+ for (int i = dataverses.size() - 1; i >= 0; i--) {
+ String current = dataverses.get(i);
+ if (!current.equals(previous)) {
+ releaseDataverseReadLock(current);
+ previous = current;
+ }
+ }
+ for (int i = datasets.size() - 1; i >= 0; i--) {
+ String current = datasets.get(i);
+ if (!current.equals(previous)) {
+ if (current.equals(datasetFullyQualifiedName)) {
+ releaseDatasetModifyLock(current);
+ } else {
+ releaseDatasetReadLock(current);
+ }
+ previous = current;
+ }
+ }
+ }
+
+ public void dropFeedBegin(String dataverseName, String feedFullyQualifiedName) {
+ acquireDataverseReadLock(dataverseName);
+ acquireFeedWriteLock(feedFullyQualifiedName);
+ }
+
+ public void dropFeedEnd(String dataverseName, String feedFullyQualifiedName) {
+ releaseFeedWriteLock(feedFullyQualifiedName);
+ releaseDataverseReadLock(dataverseName);
+ }
+
+ public void createFeedBegin(String dataverseName, String feedFullyQualifiedName) {
+ acquireDataverseReadLock(dataverseName);
+ acquireFeedWriteLock(feedFullyQualifiedName);
+ }
+
+ public void createFeedEnd(String dataverseName, String feedFullyQualifiedName) {
+ releaseFeedWriteLock(feedFullyQualifiedName);
+ releaseDataverseReadLock(dataverseName);
+ }
+
+ public void connectFeedBegin(String dataverseName, String datasetFullyQualifiedName, String feedFullyQualifiedName) {
+ acquireDataverseReadLock(dataverseName);
+ acquireDatasetReadLock(datasetFullyQualifiedName);
+ acquireFeedReadLock(feedFullyQualifiedName);
+ }
+
+ public void connectFeedEnd(String dataverseName, String datasetFullyQualifiedName, String feedFullyQualifiedName) {
+ releaseFeedReadLock(feedFullyQualifiedName);
+ releaseDatasetReadLock(datasetFullyQualifiedName);
+ releaseDataverseReadLock(dataverseName);
+ }
+
+ public void disconnectFeedBegin(String dataverseName, String datasetFullyQualifiedName,
+ String feedFullyQualifiedName) {
+ acquireDataverseReadLock(dataverseName);
+ acquireDatasetReadLock(datasetFullyQualifiedName);
+ acquireFeedReadLock(feedFullyQualifiedName);
+ }
+
+ public void disconnectFeedEnd(String dataverseName, String datasetFullyQualifiedName, String feedFullyQualifiedName) {
+ releaseFeedReadLock(feedFullyQualifiedName);
+ releaseDatasetReadLock(datasetFullyQualifiedName);
+ releaseDataverseReadLock(dataverseName);
+ }
+
+ public void compactBegin(String dataverseName, String datasetFullyQualifiedName) {
+ acquireDataverseReadLock(dataverseName);
+ acquireDatasetReadLock(datasetFullyQualifiedName);
+ }
+
+ public void compactEnd(String dataverseName, String datasetFullyQualifiedName) {
+ releaseDatasetReadLock(datasetFullyQualifiedName);
+ releaseDataverseReadLock(dataverseName);
+ }
+
+ public void queryBegin(Dataverse dataverse, List<String> dataverses, List<String> datasets) {
+ if (dataverse != null) {
+ dataverses.add(dataverse.getDataverseName());
+ }
+ Collections.sort(dataverses);
+ Collections.sort(datasets);
+
+ String previous = null;
+ for (int i = 0; i < dataverses.size(); i++) {
+ String current = dataverses.get(i);
+ if (!current.equals(previous)) {
+ acquireDataverseReadLock(current);
+ previous = current;
+ }
+ }
+
+ for (int i = 0; i < datasets.size(); i++) {
+ String current = datasets.get(i);
+ if (!current.equals(previous)) {
+ acquireDatasetReadLock(current);
+ previous = current;
+ }
+ }
+ }
+
+ public void queryEnd(List<String> dataverses, List<String> datasets) {
+ String previous = null;
+ for (int i = dataverses.size() - 1; i >= 0; i--) {
+ String current = dataverses.get(i);
+ if (!current.equals(previous)) {
+ releaseDataverseReadLock(current);
+ previous = current;
+ }
+ }
+ for (int i = datasets.size() - 1; i >= 0; i--) {
+ String current = datasets.get(i);
+ if (!current.equals(previous)) {
+ releaseDatasetReadLock(current);
+ previous = current;
+ }
+ }
+ }
+
+ public void refreshDatasetBegin(String dataverseName, String datasetFullyQualifiedName) {
+ acquireDataverseReadLock(dataverseName);
+ acquireExternalDatasetRefreshLock(datasetFullyQualifiedName);
+ }
+
+ public void refreshDatasetEnd(String dataverseName, String datasetFullyQualifiedName) {
+ releaseExternalDatasetRefreshLock(datasetFullyQualifiedName);
+ releaseDataverseReadLock(dataverseName);
+ }
+}