remove global metadata lock

Change-Id: Id05ff463fee356b3270b53d0b3137c4b1bc3d830
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/84
Reviewed-by: Sattam Alsubaiee <salsubaiee@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
index 600295c..c121da3 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
@@ -21,6 +21,7 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.Index;
@@ -256,7 +257,8 @@
         List<Index> indexCandidates = new ArrayList<Index>();
         // Add an index to the candidates if one of the indexed fields is fieldName
         for (Index index : datasetIndexes) {
-            if (index.getKeyFieldNames().contains(fieldName)) {
+            // Need to also verify the index is pending no op
+            if (index.getKeyFieldNames().contains(fieldName) && index.getPendingOp() == IMetadataEntity.PENDING_NO_OP) {
                 indexCandidates.add(index);
             }
         }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index b6d09b1..e0635c6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -110,6 +110,7 @@
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
 import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
 import edu.uci.ics.asterix.metadata.utils.ExternalDatasetsRegistry;
+import edu.uci.ics.asterix.metadata.utils.MetadataLockManager;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -364,14 +365,12 @@
 
     private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt)
             throws Exception {
-
+        DataverseDecl dvd = (DataverseDecl) stmt;
+        String dvName = dvd.getDataverseName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireReadLatch();
-
+        MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
         try {
-            DataverseDecl dvd = (DataverseDecl) stmt;
-            String dvName = dvd.getDataverseName().getValue();
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
             if (dv == null) {
                 throw new MetadataException("Unknown dataverse " + dvName);
@@ -382,19 +381,19 @@
             abort(e, e, mdTxnCtx);
             throw new MetadataException(e);
         } finally {
-            releaseReadLatch();
+            MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
         }
     }
 
     private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
 
+        CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
+        String dvName = stmtCreateDataverse.getDataverseName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireWriteLatch();
 
+        MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
         try {
-            CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
-            String dvName = stmtCreateDataverse.getDataverseName().getValue();
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
             if (dv != null) {
                 if (stmtCreateDataverse.getIfNotExists()) {
@@ -411,7 +410,7 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            releaseWriteLatch();
+            MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
         }
     }
 
@@ -436,21 +435,25 @@
             IHyracksClientConnection hcc) throws AsterixException, Exception {
 
         ProgressState progress = ProgressState.NO_PROGRESS;
+        DatasetDecl dd = (DatasetDecl) stmt;
+        String dataverseName = getActiveDataverseName(dd.getDataverse());
+        String datasetName = dd.getName().getValue();
+        DatasetType dsType = dd.getDatasetType();
+        String itemTypeName = dd.getItemTypeName().getValue();
+        Identifier ngNameId = dd.getDatasetDetailsDecl().getNodegroupName();
+        String nodegroupName = getNodeGroupName(ngNameId, dd, dataverseName);
+        String compactionPolicy = dd.getDatasetDetailsDecl().getCompactionPolicy();
+        Map<String, String> compactionPolicyProperties = dd.getDatasetDetailsDecl().getCompactionPolicyProperties();
+        boolean defaultCompactionPolicy = (compactionPolicy == null);
+
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireWriteLatch();
 
-        String dataverseName = null;
-        String datasetName = null;
+        MetadataLockManager.INSTANCE.createDatasetBegin(dataverseName, dataverseName + "." + itemTypeName,
+                nodegroupName, compactionPolicy, dataverseName + "." + datasetName, defaultCompactionPolicy);
         Dataset dataset = null;
         try {
-            DatasetDecl dd = (DatasetDecl) stmt;
-            dataverseName = getActiveDataverseName(dd.getDataverse());
-            datasetName = dd.getName().getValue();
-
-            DatasetType dsType = dd.getDatasetType();
-            String itemTypeName = dd.getItemTypeName().getValue();
 
             IDatasetDetails datasetDetails = null;
             Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
@@ -480,13 +483,8 @@
                     ARecordType aRecordType = (ARecordType) itemType;
                     aRecordType.validatePartitioningExpressions(partitioningExprs, autogenerated);
 
-                    Identifier ngNameId = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName();
                     String ngName = ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd,
                             dataverseName, mdTxnCtx);
-
-                    String compactionPolicy = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getCompactionPolicy();
-                    Map<String, String> compactionPolicyProperties = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
-                            .getCompactionPolicyProperties();
                     if (compactionPolicy == null) {
                         compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
                         compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
@@ -505,12 +503,9 @@
                 case EXTERNAL: {
                     String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
                     Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
-                    Identifier ngNameId = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName();
+
                     String ngName = ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd,
                             dataverseName, mdTxnCtx);
-                    String compactionPolicy = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getCompactionPolicy();
-                    Map<String, String> compactionPolicyProperties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl())
-                            .getCompactionPolicyProperties();
                     if (compactionPolicy == null) {
                         compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
                         compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
@@ -605,7 +600,20 @@
 
             throw e;
         } finally {
-            releaseWriteLatch();
+            MetadataLockManager.INSTANCE.createDatasetEnd(dataverseName, dataverseName + "." + itemTypeName,
+                    nodegroupName, compactionPolicy, dataverseName + "." + datasetName, defaultCompactionPolicy);
+        }
+    }
+
+    private String getNodeGroupName(Identifier ngNameId, DatasetDecl dd, String dataverse) {
+        if (ngNameId != null) {
+            return ngNameId.getValue();
+        }
+        String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
+        if (hintValue == null) {
+            return MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
+        } else {
+            return (dataverse + ":" + dd.getName().getValue());
         }
     }
 
@@ -664,13 +672,16 @@
     private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         ProgressState progress = ProgressState.NO_PROGRESS;
+        CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
+        String dataverseName = getActiveDataverseName(stmtCreateIndex.getDataverseName());
+        String datasetName = stmtCreateIndex.getDatasetName().getValue();
+
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireWriteLatch();
 
-        String dataverseName = null;
-        String datasetName = null;
+        MetadataLockManager.INSTANCE.createIndexBegin(dataverseName, dataverseName + "." + datasetName);
+
         String indexName = null;
         JobSpecification spec = null;
         Dataset ds = null;
@@ -681,9 +692,6 @@
         Index filesIndex = null;
         boolean datasetLocked = false;
         try {
-            CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
-            dataverseName = getActiveDataverseName(stmtCreateIndex.getDataverseName());
-            datasetName = stmtCreateIndex.getDatasetName().getValue();
 
             ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
                     datasetName);
@@ -738,13 +746,24 @@
                 if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) {
                     throw new AlgebricksException("external dataset index name is invalid");
                 }
-                // lock external dataset
-                ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds);
-                datasetLocked = true;
+
                 // Check if the files index exist
                 filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
                         datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
                 firstExternalDatasetIndex = (filesIndex == null);
+                // lock external dataset
+                ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
+                datasetLocked = true;
+                if (firstExternalDatasetIndex) {
+                    // verify that no one has created an index before we acquire the lock
+                    filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                            dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
+                    if (filesIndex != null) {
+                        ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
+                        firstExternalDatasetIndex = false;
+                        ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
+                    }
+                }
                 if (firstExternalDatasetIndex) {
                     // Get snapshot from External File System
                     externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
@@ -914,23 +933,22 @@
             }
             throw e;
         } finally {
-            releaseWriteLatch();
+            MetadataLockManager.INSTANCE.createIndexEnd(dataverseName, dataverseName + "." + datasetName);
             if (datasetLocked) {
-                ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds);
+                ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
             }
         }
     }
 
     private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
-
+        TypeDecl stmtCreateType = (TypeDecl) stmt;
+        String dataverseName = getActiveDataverseName(stmtCreateType.getDataverseName());
+        String typeName = stmtCreateType.getIdent().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireWriteLatch();
-
+        MetadataLockManager.INSTANCE.createTypeBegin(dataverseName, dataverseName + "." + typeName);
         try {
-            TypeDecl stmtCreateType = (TypeDecl) stmt;
-            String dataverseName = getActiveDataverseName(stmtCreateType.getDataverseName());
-            String typeName = stmtCreateType.getIdent().getValue();
+
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
             if (dv == null) {
                 throw new AlgebricksException("Unknown dataverse " + dataverseName);
@@ -956,24 +974,22 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            releaseWriteLatch();
+            MetadataLockManager.INSTANCE.createTypeEnd(dataverseName, dataverseName + "." + typeName);
         }
     }
 
     private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
+        DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
+        String dataverseName = stmtDelete.getDataverseName().getValue();
 
         ProgressState progress = ProgressState.NO_PROGRESS;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireWriteLatch();
-
-        String dataverseName = null;
+        MetadataLockManager.INSTANCE.acquireDataverseWriteLock(dataverseName);
         List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
         try {
-            DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
-            dataverseName = stmtDelete.getDataverseName().getValue();
 
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
             if (dv == null) {
@@ -1041,6 +1057,7 @@
                                     datasets.get(j)));
                         }
                     }
+                    ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j));
                 }
             }
             jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
@@ -1069,7 +1086,6 @@
             if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
                 activeDefaultDataverse = null;
             }
-            ExternalDatasetsRegistry.INSTANCE.removeDataverse(dv);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -1107,26 +1123,24 @@
 
             throw e;
         } finally {
-            releaseWriteLatch();
+            MetadataLockManager.INSTANCE.releaseDataverseWriteLock(dataverseName);
         }
     }
 
     private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
+        DropStatement stmtDelete = (DropStatement) stmt;
+        String dataverseName = getActiveDataverseName(stmtDelete.getDataverseName());
+        String datasetName = stmtDelete.getDatasetName().getValue();
 
         ProgressState progress = ProgressState.NO_PROGRESS;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireWriteLatch();
 
-        String dataverseName = null;
-        String datasetName = null;
+        MetadataLockManager.INSTANCE.dropDatasetBegin(dataverseName, dataverseName + "." + datasetName);
         List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
         try {
-            DropStatement stmtDelete = (DropStatement) stmt;
-            dataverseName = getActiveDataverseName(stmtDelete.getDataverseName());
-            datasetName = stmtDelete.getDatasetName().getValue();
 
             Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
             if (ds == null) {
@@ -1194,6 +1208,7 @@
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
             } else {
                 // External dataset
+                ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
                 //#. prepare jobs to drop the datatset and the indexes in NC
                 List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                 for (int j = 0; j < indexes.size(); j++) {
@@ -1275,28 +1290,28 @@
 
             throw e;
         } finally {
-            releaseWriteLatch();
+            MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, dataverseName + "." + datasetName);
         }
     }
 
     private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
 
+        IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
+        String datasetName = stmtIndexDrop.getDatasetName().getValue();
+        String dataverseName = getActiveDataverseName(stmtIndexDrop.getDataverseName());
         ProgressState progress = ProgressState.NO_PROGRESS;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireWriteLatch();
 
-        String dataverseName = null;
-        String datasetName = null;
+        MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, dataverseName + "." + datasetName);
+
         String indexName = null;
+        // For external index
         boolean dropFilesIndex = false;
         List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
         try {
-            IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
-            datasetName = stmtIndexDrop.getDatasetName().getValue();
-            dataverseName = getActiveDataverseName(stmtIndexDrop.getDataverseName());
 
             Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
             if (ds == null) {
@@ -1465,20 +1480,21 @@
             throw e;
 
         } finally {
-            releaseWriteLatch();
+            MetadataLockManager.INSTANCE.dropIndexEnd(dataverseName, dataverseName + "." + datasetName);
         }
     }
 
     private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
 
+        TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
+        String dataverseName = getActiveDataverseName(stmtTypeDrop.getDataverseName());
+        String typeName = stmtTypeDrop.getTypeName().getValue();
+
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireWriteLatch();
+        MetadataLockManager.INSTANCE.dropTypeBegin(dataverseName, dataverseName + "." + typeName);
 
         try {
-            TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
-            String dataverseName = getActiveDataverseName(stmtTypeDrop.getDataverseName());
-            String typeName = stmtTypeDrop.getTypeName().getValue();
             Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
             if (dt == null) {
                 if (!stmtTypeDrop.getIfExists())
@@ -1491,19 +1507,18 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            releaseWriteLatch();
+            MetadataLockManager.INSTANCE.dropTypeEnd(dataverseName, dataverseName + "." + typeName);
         }
     }
 
     private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
 
+        NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
+        String nodegroupName = stmtDelete.getNodeGroupName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireWriteLatch();
-
+        MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(nodegroupName);
         try {
-            NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
-            String nodegroupName = stmtDelete.getNodeGroupName().getValue();
             NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
             if (ng == null) {
                 if (!stmtDelete.getIfExists())
@@ -1517,25 +1532,27 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            releaseWriteLatch();
+            MetadataLockManager.INSTANCE.releaseNodeGroupWriteLock(nodegroupName);
         }
     }
 
     private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
+        CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
+        String dataverse = getActiveDataverseName(cfs.getSignature().getNamespace());
+        String functionName = cfs.getaAterixFunction().getName();
+
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireWriteLatch();
-
+        MetadataLockManager.INSTANCE.functionStatementBegin(dataverse, dataverse + "." + functionName);
         try {
-            CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
-            String dataverse = getActiveDataverseName(cfs.getSignature().getNamespace());
+
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
             if (dv == null) {
                 throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
             }
-            Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
-                    .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
-                    Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
+            Function function = new Function(dataverse, functionName, cfs.getaAterixFunction().getArity(),
+                    cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(), Function.LANGUAGE_AQL,
+                    FunctionKind.SCALAR.toString());
             MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1543,18 +1560,18 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            releaseWriteLatch();
+            MetadataLockManager.INSTANCE.functionStatementEnd(dataverse, dataverse + "." + functionName);
         }
     }
 
     private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
+        FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
+        FunctionSignature signature = stmtDropFunction.getFunctionSignature();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireWriteLatch();
-
+        MetadataLockManager.INSTANCE.functionStatementBegin(signature.getNamespace(), signature.getNamespace() + "."
+                + signature.getName());
         try {
-            FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
-            FunctionSignature signature = stmtDropFunction.getFunctionSignature();
             Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
             if (function == null) {
                 if (!stmtDropFunction.getIfExists())
@@ -1567,24 +1584,25 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            releaseWriteLatch();
+            MetadataLockManager.INSTANCE.functionStatementEnd(signature.getNamespace(), signature.getNamespace() + "."
+                    + signature.getName());
         }
     }
 
     private void handleLoadStatement(AqlMetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc)
             throws Exception {
 
+        LoadStatement loadStmt = (LoadStatement) stmt;
+        String dataverseName = getActiveDataverseName(loadStmt.getDataverseName());
+        String datasetName = loadStmt.getDatasetName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireReadLatch();
+        MetadataLockManager.INSTANCE.modifyDatasetBegin(dataverseName, dataverseName + "." + datasetName);
         List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
         try {
-            LoadStatement loadStmt = (LoadStatement) stmt;
-            String dataverseName = getActiveDataverseName(loadStmt.getDataverseName());
-            CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt
-                    .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
-                    loadStmt.dataIsAlreadySorted());
+            CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, datasetName,
+                    loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
 
             IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
             Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
@@ -1615,25 +1633,27 @@
 
             throw e;
         } finally {
-            releaseReadLatch();
+            MetadataLockManager.INSTANCE.modifyDatasetEnd(dataverseName, dataverseName + "." + datasetName);
         }
     }
 
     private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
 
+        InsertStatement stmtInsert = (InsertStatement) stmt;
+        String dataverseName = getActiveDataverseName(stmtInsert.getDataverseName());
+        Query query = stmtInsert.getQuery();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireReadLatch();
+        MetadataLockManager.INSTANCE.insertDeleteBegin(dataverseName,
+                dataverseName + "." + stmtInsert.getDatasetName(), query.getDataverses(), query.getDatasets());
 
         try {
             metadataProvider.setWriteTransaction(true);
-            InsertStatement stmtInsert = (InsertStatement) stmt;
-            String dataverseName = getActiveDataverseName(stmtInsert.getDataverseName());
             CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
-                    .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
-            JobSpecification compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
+                    .getValue(), query, stmtInsert.getVarCounter());
+            JobSpecification compiled = rewriteCompileQuery(metadataProvider, query, clfrqs);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -1648,22 +1668,25 @@
             }
             throw e;
         } finally {
-            releaseReadLatch();
+            MetadataLockManager.INSTANCE.insertDeleteEnd(dataverseName,
+                    dataverseName + "." + stmtInsert.getDatasetName(), query.getDataverses(), query.getDatasets());
         }
     }
 
     private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
 
+        DeleteStatement stmtDelete = (DeleteStatement) stmt;
+        String dataverseName = getActiveDataverseName(stmtDelete.getDataverseName());
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireReadLatch();
+        MetadataLockManager.INSTANCE
+                .insertDeleteBegin(dataverseName, dataverseName + "." + stmtDelete.getDatasetName(),
+                        stmtDelete.getDataverses(), stmtDelete.getDatasets());
 
         try {
             metadataProvider.setWriteTransaction(true);
-            DeleteStatement stmtDelete = (DeleteStatement) stmt;
-            String dataverseName = getActiveDataverseName(stmtDelete.getDataverseName());
             CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
                     stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getVarCounter(),
                     metadataProvider);
@@ -1682,7 +1705,9 @@
             }
             throw e;
         } finally {
-            releaseReadLatch();
+            MetadataLockManager.INSTANCE.insertDeleteEnd(dataverseName,
+                    dataverseName + "." + stmtDelete.getDatasetName(), stmtDelete.getDataverses(),
+                    stmtDelete.getDatasets());
         }
     }
 
@@ -1706,18 +1731,16 @@
     private void handleCreateFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
 
+        CreateFeedStatement cfs = (CreateFeedStatement) stmt;
+        String dataverseName = getActiveDataverseName(cfs.getDataverseName());
+        String feedName = cfs.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireWriteLatch();
+        MetadataLockManager.INSTANCE.createFeedBegin(dataverseName, dataverseName + "." + feedName);
 
-        String dataverseName = null;
-        String feedName = null;
         String adaptorName = null;
         Feed feed = null;
         try {
-            CreateFeedStatement cfs = (CreateFeedStatement) stmt;
-            dataverseName = getActiveDataverseName(cfs.getDataverseName());
-            feedName = cfs.getFeedName().getValue();
             adaptorName = cfs.getAdaptorName();
 
             feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName, feedName);
@@ -1738,20 +1761,20 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            releaseWriteLatch();
+            MetadataLockManager.INSTANCE.createFeedEnd(dataverseName, dataverseName + "." + feedName);
         }
     }
 
     private void handleDropFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
+        FeedDropStatement stmtFeedDrop = (FeedDropStatement) stmt;
+        String dataverseName = getActiveDataverseName(stmtFeedDrop.getDataverseName());
+        String feedName = stmtFeedDrop.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireWriteLatch();
+        MetadataLockManager.INSTANCE.dropFeedBegin(dataverseName, dataverseName + "." + feedName);
 
         try {
-            FeedDropStatement stmtFeedDrop = (FeedDropStatement) stmt;
-            String dataverseName = getActiveDataverseName(stmtFeedDrop.getDataverseName());
-            String feedName = stmtFeedDrop.getFeedName().getValue();
             Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedName);
             if (feed == null) {
                 if (!stmtFeedDrop.getIfExists()) {
@@ -1785,21 +1808,26 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            releaseWriteLatch();
+            MetadataLockManager.INSTANCE.dropFeedEnd(dataverseName, dataverseName + "." + feedName);
         }
     }
 
     private void handleConnectFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
 
+        ConnectFeedStatement cfs = (ConnectFeedStatement) stmt;
+        String dataverseName = getActiveDataverseName(cfs.getDataverseName());
+        String feedName = cfs.getFeedName();
+        String datasetName = cfs.getDatasetName().getValue();
+
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireReadLatch();
+        MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName, dataverseName
+                + "." + feedName);
         boolean readLatchAcquired = true;
         try {
-            ConnectFeedStatement cfs = (ConnectFeedStatement) stmt;
-            String dataverseName = getActiveDataverseName(cfs.getDataverseName());
+
             metadataProvider.setWriteTransaction(true);
 
             CompiledConnectFeedStatement cbfs = new CompiledConnectFeedStatement(dataverseName, cfs.getFeedName(), cfs
@@ -1859,7 +1887,8 @@
             boolean waitForCompletion = waitForCompletionParam == null ? false : Boolean
                     .valueOf(waitForCompletionParam);
             if (waitForCompletion) {
-                releaseReadLatch();
+                MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
+                        dataverseName + "." + feedName);
                 readLatchAcquired = false;
             }
             runJob(hcc, newJobSpec, waitForCompletion);
@@ -1870,23 +1899,24 @@
             throw e;
         } finally {
             if (readLatchAcquired) {
-                releaseReadLatch();
+                MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
+                        dataverseName + "." + feedName);
             }
         }
     }
 
     private void handleDisconnectFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
+        DisconnectFeedStatement cfs = (DisconnectFeedStatement) stmt;
+        String dataverseName = getActiveDataverseName(cfs.getDataverseName());
+        String datasetName = cfs.getDatasetName().getValue();
+
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireReadLatch();
-
+        MetadataLockManager.INSTANCE.disconnectFeedBegin(dataverseName, dataverseName + "." + datasetName,
+                dataverseName + "." + cfs.getFeedName());
         try {
-            DisconnectFeedStatement cfs = (DisconnectFeedStatement) stmt;
-            String dataverseName = getActiveDataverseName(cfs.getDataverseName());
-
-            String datasetName = cfs.getDatasetName().getValue();
             Dataset dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
                     dataverseName, cfs.getDatasetName().getValue());
             if (dataset == null) {
@@ -1925,24 +1955,23 @@
             }
             throw e;
         } finally {
-            releaseReadLatch();
+            MetadataLockManager.INSTANCE.disconnectFeedEnd(dataverseName, dataverseName + "." + datasetName,
+                    dataverseName + "." + cfs.getFeedName());
         }
     }
 
     private void handleCompactStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
+        CompactStatement compactStatement = (CompactStatement) stmt;
+        String dataverseName = getActiveDataverseName(compactStatement.getDataverseName());
+        String datasetName = compactStatement.getDatasetName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireReadLatch();
+        MetadataLockManager.INSTANCE.compactBegin(dataverseName, dataverseName + "." + datasetName);
 
-        String dataverseName = null;
-        String datasetName = null;
         List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
         try {
-            CompactStatement compactStatement = (CompactStatement) stmt;
-            dataverseName = getActiveDataverseName(compactStatement.getDataverseName());
-            datasetName = compactStatement.getDatasetName().getValue();
             Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
             if (ds == null) {
                 throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
@@ -1991,7 +2020,7 @@
             }
             throw e;
         } finally {
-            releaseReadLatch();
+            MetadataLockManager.INSTANCE.compactEnd(dataverseName, dataverseName + "." + datasetName);
         }
     }
 
@@ -2001,7 +2030,7 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireReadLatch();
+        MetadataLockManager.INSTANCE.queryBegin(activeDefaultDataverse, query.getDataverses(), query.getDatasets());
         JobSpecification compiled = null;
         try {
             compiled = rewriteCompileQuery(metadataProvider, query, null);
@@ -2083,21 +2112,22 @@
             }
             throw e;
         } finally {
-            releaseReadLatch();
-            // release locks aquired during compilation of the query
+            MetadataLockManager.INSTANCE.queryEnd(query.getDataverses(), query.getDatasets());
+            // release external datasets' locks acquired during compilation of the query
             ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
     }
 
     private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
 
+        NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
+        String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
+
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireWriteLatch();
+        MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(ngName);
 
         try {
-            NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
-            String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
             NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
             if (ng != null) {
                 if (!stmtCreateNodegroup.getIfNotExists())
@@ -2115,20 +2145,20 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            releaseWriteLatch();
+            MetadataLockManager.INSTANCE.releaseNodeGroupWriteLock(ngName);
         }
     }
 
     private void handleExternalDatasetRefreshStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         RefreshExternalDatasetStatement stmtRefresh = (RefreshExternalDatasetStatement) stmt;
+        String dataverseName = getActiveDataverseName(stmtRefresh.getDataverseName());
+        String datasetName = stmtRefresh.getDatasetName().getValue();
         ExternalDatasetTransactionState transactionState = ExternalDatasetTransactionState.COMMIT;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        acquireWriteLatch();
+        MetadataLockManager.INSTANCE.refreshDatasetBegin(dataverseName, dataverseName + "." + datasetName);
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        String dataverseName = null;
-        String datasetName = null;
         JobSpecification spec = null;
         Dataset ds = null;
         List<ExternalFile> metadataFiles = null;
@@ -2140,8 +2170,6 @@
         boolean lockAquired = false;
         boolean success = false;
         try {
-            dataverseName = getActiveDataverseName(stmtRefresh.getDataverseName());
-            datasetName = stmtRefresh.getDatasetName().getValue();
             ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
                     datasetName);
 
@@ -2338,10 +2366,10 @@
                 }
             }
         } finally {
-            releaseWriteLatch();
             if (lockAquired) {
                 ExternalDatasetsRegistry.INSTANCE.refreshEnd(ds, success);
             }
+            MetadataLockManager.INSTANCE.refreshDatasetEnd(dataverseName, dataverseName + "." + datasetName);
         }
     }
 
@@ -2393,22 +2421,6 @@
         return getActiveDataverseName(dataverse != null ? dataverse.getValue() : null);
     }
 
-    private void acquireWriteLatch() {
-        MetadataManager.INSTANCE.acquireWriteLatch();
-    }
-
-    private void releaseWriteLatch() {
-        MetadataManager.INSTANCE.releaseWriteLatch();
-    }
-
-    private void acquireReadLatch() {
-        MetadataManager.INSTANCE.acquireReadLatch();
-    }
-
-    private void releaseReadLatch() {
-        MetadataManager.INSTANCE.releaseReadLatch();
-    }
-
     private void abort(Exception rootE, Exception parentE, MetadataTransactionContext mdTxnCtx) {
         try {
             if (IS_DEBUG_MODE) {
@@ -2420,4 +2432,4 @@
             throw new IllegalStateException(rootE);
         }
     }
-}
\ No newline at end of file
+}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/DeleteStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/DeleteStatement.java
index ded6946..f50b91e 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/DeleteStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/DeleteStatement.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.asterix.aql.expression;
 
+import java.util.List;
+
 import edu.uci.ics.asterix.aql.base.Expression;
 import edu.uci.ics.asterix.aql.base.Statement;
 import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
@@ -27,14 +29,18 @@
     private Identifier datasetName;
     private Expression condition;
     private int varCounter;
+    private List<String> dataverses;
+    private List<String> datasets;
 
     public DeleteStatement(VariableExpr vars, Identifier dataverseName, Identifier datasetName, Expression condition,
-            int varCounter) {
+            int varCounter, List<String> dataverses, List<String> datasets) {
         this.vars = vars;
         this.dataverseName = dataverseName;
         this.datasetName = datasetName;
         this.condition = condition;
         this.varCounter = varCounter;
+        this.dataverses = dataverses;
+        this.datasets = datasets;
     }
 
     @Override
@@ -72,4 +78,12 @@
         visitor.visit(this, arg);
     }
 
+    public List<String> getDataverses() {
+        return dataverses;
+    }
+
+    public List<String> getDatasets() {
+        return datasets;
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ExternalDetailsDecl.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ExternalDetailsDecl.java
index 5f58a38..a22d4b4 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ExternalDetailsDecl.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ExternalDetailsDecl.java
@@ -17,36 +17,38 @@
 import java.util.Map;
 
 public class ExternalDetailsDecl implements IDatasetDetailsDecl {
-	private Map<String, String> properties;
-	private String adapter;
-	private Identifier nodegroupName;
-	private String compactionPolicy;
+    private Map<String, String> properties;
+    private String adapter;
+    private Identifier nodegroupName;
+    private String compactionPolicy;
     private Map<String, String> compactionPolicyProperties;
 
-	public void setAdapter(String adapter) {
-		this.adapter = adapter;
-	}
+    public void setAdapter(String adapter) {
+        this.adapter = adapter;
+    }
 
-	public void setProperties(Map<String, String> properties) {
-		this.properties = properties;
-	}
+    public void setProperties(Map<String, String> properties) {
+        this.properties = properties;
+    }
 
-	public String getAdapter() {
-		return adapter;
-	}
+    public String getAdapter() {
+        return adapter;
+    }
 
-	public Map<String, String> getProperties() {
-		return properties;
-	}
+    public Map<String, String> getProperties() {
+        return properties;
+    }
 
-	public Identifier getNodegroupName() {
-		return nodegroupName;
-	}
+    @Override
+    public Identifier getNodegroupName() {
+        return nodegroupName;
+    }
 
-	public void setNodegroupName(Identifier nodegroupName) {
-		this.nodegroupName = nodegroupName;
-	}
+    public void setNodegroupName(Identifier nodegroupName) {
+        this.nodegroupName = nodegroupName;
+    }
 
+    @Override
     public String getCompactionPolicy() {
         return compactionPolicy;
     }
@@ -55,6 +57,7 @@
         this.compactionPolicy = compactionPolicy;
     }
 
+    @Override
     public Map<String, String> getCompactionPolicyProperties() {
         return compactionPolicyProperties;
     }
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/IDatasetDetailsDecl.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/IDatasetDetailsDecl.java
index 0f9a74c..51f2a1b 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/IDatasetDetailsDecl.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/IDatasetDetailsDecl.java
@@ -14,6 +14,14 @@
  */
 package edu.uci.ics.asterix.aql.expression;
 
+import java.util.Map;
+
 public interface IDatasetDetailsDecl {
 
+    public Identifier getNodegroupName();
+
+    public String getCompactionPolicy();
+
+    public Map<String, String> getCompactionPolicyProperties();
+
 }
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/InternalDetailsDecl.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/InternalDetailsDecl.java
index 3be08b2..fc71a8f 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/InternalDetailsDecl.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/InternalDetailsDecl.java
@@ -42,6 +42,7 @@
         return partitioningExprs;
     }
 
+    @Override
     public Identifier getNodegroupName() {
         return nodegroupName;
     }
@@ -50,10 +51,12 @@
         return autogenerated;
     }
 
+    @Override
     public String getCompactionPolicy() {
         return compactionPolicy;
     }
 
+    @Override
     public Map<String, String> getCompactionPolicyProperties() {
         return compactionPolicyProperties;
     }
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/Query.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/Query.java
index b8538b8..665f3b7 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/Query.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/Query.java
@@ -14,6 +14,9 @@
  */
 package edu.uci.ics.asterix.aql.expression;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import edu.uci.ics.asterix.aql.base.Expression;
 import edu.uci.ics.asterix.aql.base.Statement;
 import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
@@ -23,6 +26,8 @@
 public class Query implements Statement {
     private Expression body;
     private int varCounter;
+    private List<String> dataverses = new ArrayList<String>();
+    private List<String> datasets = new ArrayList<String>();
 
     public Expression getBody() {
         return body;
@@ -54,4 +59,20 @@
     public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
         return visitor.visitQuery(this, arg);
     }
+
+    public void setDataverses(List<String> dataverses) {
+        this.dataverses = dataverses;
+    }
+
+    public void setDatasets(List<String> datasets) {
+        this.datasets = datasets;
+    }
+
+    public List<String> getDataverses() {
+        return dataverses;
+    }
+
+    public List<String> getDatasets() {
+        return datasets;
+    }
 }
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/parser/ScopeChecker.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/parser/ScopeChecker.java
index d95596b..12c5de6 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/parser/ScopeChecker.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/parser/ScopeChecker.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.asterix.aql.parser;
 
+import java.util.List;
 import java.util.Stack;
 
 import edu.uci.ics.asterix.aql.context.Scope;
@@ -33,6 +34,9 @@
 
     protected String defaultDataverse;
 
+    private List<String> dataverses;
+    private List<String> datasets;
+
     protected void setInput(String s) {
         inputLines = s.split("\n");
     }
@@ -174,4 +178,31 @@
         return extract.toString().trim();
     }
 
+    public void addDataverse(String dataverseName) {
+        if (dataverses != null) {
+            dataverses.add(dataverseName);
+        }
+    }
+
+    public void addDataset(String datasetName) {
+        if (datasets != null) {
+            datasets.add(datasetName);
+        }
+    }
+
+    public void setDataverses(List<String> dataverses) {
+        this.dataverses = dataverses;
+    }
+
+    public void setDatasets(List<String> datasets) {
+        this.datasets = datasets;
+    }
+
+    public List<String> getDataverses() {
+        return dataverses;
+    }
+
+    public List<String> getDatasets() {
+        return datasets;
+    }
 }
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index 430ea7d..bb78860 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -629,7 +629,7 @@
       }
 
       // TODO use fctName.library
-      String fqFunctionName = fctName.library == null ? fctName.function : fctName.library + "#" + fctName.function;
+      String fqFunctionName = fctName.library == null ? fctName.function : fctName.library + "#" + fctName.function;      
       return new FunctionSignature(fctName.dataverse, fqFunctionName, arity);
     }
 }
@@ -731,6 +731,10 @@
   VariableExpr var = null;
   Expression condition = null;
   Pair<Identifier, Identifier> nameComponents;
+  // This is related to the new metadata lock management
+  setDataverses(new ArrayList<String>());
+  setDatasets(new ArrayList<String>());
+  
 }
 {
   "delete" var = Variable()
@@ -740,7 +744,14 @@
   "from" <DATASET> nameComponents  = QualifiedName() 
   (<WHERE> condition = Expression())?
     {
-      return new DeleteStatement(var, nameComponents.first, nameComponents.second, condition, getVarCounter());
+      // First we get the dataverses and datasets that we want to lock
+      List<String> dataverses = getDataverses();
+      List<String> datasets = getDatasets();
+      // we remove the pointer to the dataverses and datasets
+      setDataverses(null);
+      setDatasets(null);
+      return new DeleteStatement(var, nameComponents.first, nameComponents.second, 
+      	condition, getVarCounter(), dataverses, datasets);
     }
 }
 
@@ -1233,6 +1244,9 @@
 Query Query() throws ParseException:
 {
   Query query = new Query();
+  // we set the pointers to the dataverses and datasets lists to fill them with entities to be locked
+  setDataverses(query.getDataverses());
+  setDatasets(query.getDatasets());
   Expression expr;
 }
 {
@@ -1240,6 +1254,9 @@
     {
       query.setBody(expr);
       query.setVarCounter(getVarCounter());
+      // we remove the pointers to the locked entities before we return the query object
+      setDataverses(null);
+      setDatasets(null);
       return query;
     }
      
@@ -1833,6 +1850,12 @@
       LiteralExpr ds = new LiteralExpr();
       ds.setValue( new StringLiteral(name) );
       nameArg = ds;
+      if(arg2 != null){
+      	addDataverse(arg1.toString());
+      	addDataset(name);
+      } else {
+      	addDataset(defaultDataverse + "." + name);
+      }
     }
   | ( <LEFTPAREN> nameArg = Expression() <RIGHTPAREN> ) )  
     {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetLock.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetLock.java
new file mode 100644
index 0000000..6ca5c42
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/DatasetLock.java
@@ -0,0 +1,88 @@
+package edu.uci.ics.asterix.metadata.utils;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+
+public class DatasetLock {
+
+    private ReentrantReadWriteLock dsLock;
+    private ReentrantReadWriteLock dsModifyLock;
+    private AMutableInt32 indexBuildCounter;
+
+    public DatasetLock() {
+        dsLock = new ReentrantReadWriteLock(true);
+        dsModifyLock = new ReentrantReadWriteLock(true);
+        indexBuildCounter = new AMutableInt32(0);
+    }
+
+    public void acquireReadLock() {
+        // query
+        // build index
+        // insert
+        dsLock.readLock().lock();
+    }
+
+    public void releaseReadLock() {
+        // query
+        // build index
+        // insert
+        dsLock.readLock().unlock();
+    }
+
+    public void acquireWriteLock() {
+        // create ds
+        // delete ds
+        // drop index
+        dsLock.writeLock().lock();
+    }
+
+    public void releaseWriteLock() {
+        // create ds
+        // delete ds
+        // drop index
+        dsLock.writeLock().unlock();
+    }
+
+    public void acquireReadModifyLock() {
+        // insert
+        dsModifyLock.readLock().lock();
+    }
+
+    public void releaseReadModifyLock() {
+        // insert
+        dsModifyLock.readLock().unlock();
+    }
+
+    public void acquireWriteModifyLock() {
+        // Build index statement
+        synchronized (indexBuildCounter) {
+            if (indexBuildCounter.getIntegerValue() > 0) {
+                indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() + 1);
+            } else {
+                dsModifyLock.writeLock().lock();
+                indexBuildCounter.setValue(1);
+            }
+        }
+    }
+
+    public void releaseWriteModifyLock() {
+        // Build index statement
+        synchronized (indexBuildCounter) {
+            if (indexBuildCounter.getIntegerValue() == 1) {
+                dsModifyLock.writeLock().unlock();
+            }
+            indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() - 1);
+        }
+    }
+
+    public void acquireRefreshLock() {
+        // Refresh External Dataset statement
+        dsModifyLock.writeLock().lock();
+    }
+
+    public void releaseRefreshLock() {
+        // Refresh External Dataset statement
+        dsModifyLock.writeLock().unlock();
+    }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetAccessManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetAccessManager.java
index 79d695c..b6ab546 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetAccessManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetAccessManager.java
@@ -67,13 +67,21 @@
         datasetLock.writeLock().unlock();
     }
 
-    public int buildIndexBegin() {
-        datasetLock.readLock().lock();
+    public synchronized int buildIndexBegin(boolean isFirstIndex) {
+        if (isFirstIndex) {
+            datasetLock.writeLock().lock();
+        } else {
+            datasetLock.readLock().lock();
+        }
         return version;
     }
 
-    public void buildIndexEnd() {
-        datasetLock.readLock().unlock();
+    public void buildIndexEnd(boolean isFirstIndex) {
+        if (isFirstIndex) {
+            datasetLock.writeLock().unlock();
+        } else {
+            datasetLock.readLock().unlock();
+        }
     }
 
     public int queryBegin() {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetsRegistry.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetsRegistry.java
index 84d25d3..528dd50 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetsRegistry.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/ExternalDatasetsRegistry.java
@@ -4,10 +4,10 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
-import edu.uci.ics.asterix.metadata.entities.Dataverse;
 
 /**
  * This is a singelton class used to maintain the version of each external dataset with indexes
@@ -17,10 +17,10 @@
  */
 public class ExternalDatasetsRegistry {
     public static ExternalDatasetsRegistry INSTANCE = new ExternalDatasetsRegistry();
-    private HashMap<String, HashMap<String, ExternalDatasetAccessManager>> globalRegister;
+    private ConcurrentHashMap<String, ExternalDatasetAccessManager> globalRegister;
 
     private ExternalDatasetsRegistry() {
-        globalRegister = new HashMap<String, HashMap<String, ExternalDatasetAccessManager>>();
+        globalRegister = new ConcurrentHashMap<String, ExternalDatasetAccessManager>();
     }
 
     /**
@@ -30,23 +30,11 @@
      * @return
      */
     public int getDatasetVersion(Dataset dataset) {
-        HashMap<String, ExternalDatasetAccessManager> dataverseRegister;
-        ExternalDatasetAccessManager datasetAccessMgr;
-        synchronized (this) {
-            dataverseRegister = globalRegister.get(dataset.getDataverseName());
-            if (dataverseRegister == null) {
-                // Create a register for the dataverse, and put the dataset their with the initial value of 0
-                dataverseRegister = new HashMap<String, ExternalDatasetAccessManager>();
-                datasetAccessMgr = new ExternalDatasetAccessManager();
-                dataverseRegister.put(dataset.getDatasetName(), datasetAccessMgr);
-                globalRegister.put(dataset.getDataverseName(), dataverseRegister);
-            } else {
-                datasetAccessMgr = dataverseRegister.get(dataset.getDatasetName());
-                if (datasetAccessMgr == null) {
-                    datasetAccessMgr = new ExternalDatasetAccessManager();
-                    dataverseRegister.put(dataset.getDatasetName(), datasetAccessMgr);
-                }
-            }
+        String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
+        ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key);
+        if (datasetAccessMgr == null) {
+            globalRegister.putIfAbsent(key, new ExternalDatasetAccessManager());
+            datasetAccessMgr = globalRegister.get(key);
         }
         return datasetAccessMgr.getVersion();
     }
@@ -68,31 +56,10 @@
             }
         }
 
-        HashMap<String, ExternalDatasetAccessManager> dataverseRegister;
-        ExternalDatasetAccessManager datasetAccessMgr;
-        dataverseRegister = globalRegister.get(dataset.getDataverseName());
-        if (dataverseRegister == null) {
-            synchronized (this) {
-                // A second time synchronized just to make sure
-                dataverseRegister = globalRegister.get(dataset.getDataverseName());
-                if (dataverseRegister == null) {
-                    // Create a register for the dataverse, and put the dataset their with the initial value of 0
-                    dataverseRegister = new HashMap<String, ExternalDatasetAccessManager>();
-                    globalRegister.put(dataset.getDataverseName(), dataverseRegister);
-                }
-            }
-        }
-
-        datasetAccessMgr = dataverseRegister.get(dataset.getDatasetName());
+        ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(lockKey);
         if (datasetAccessMgr == null) {
-            synchronized (this) {
-                // a second time synchronized just to make sure
-                datasetAccessMgr = dataverseRegister.get(dataset.getDatasetName());
-                if (datasetAccessMgr == null) {
-                    datasetAccessMgr = new ExternalDatasetAccessManager();
-                    dataverseRegister.put(dataset.getDatasetName(), datasetAccessMgr);
-                }
-            }
+            globalRegister.putIfAbsent(lockKey, new ExternalDatasetAccessManager());
+            datasetAccessMgr = globalRegister.get(lockKey);
         }
 
         // aquire the correct lock
@@ -102,74 +69,39 @@
     }
 
     public void refreshBegin(Dataset dataset) {
-        HashMap<String, ExternalDatasetAccessManager> dataverseRegister;
-        ExternalDatasetAccessManager datasetAccessMgr;
-        synchronized (this) {
-            dataverseRegister = globalRegister.get(dataset.getDataverseName());
-            if (dataverseRegister == null) {
-                // Create a register for the dataverse, and put the dataset their with the initial value of 0
-                dataverseRegister = new HashMap<String, ExternalDatasetAccessManager>();
-                datasetAccessMgr = new ExternalDatasetAccessManager();
-                dataverseRegister.put(dataset.getDatasetName(), datasetAccessMgr);
-                globalRegister.put(dataset.getDataverseName(), dataverseRegister);
-            } else {
-                datasetAccessMgr = dataverseRegister.get(dataset.getDatasetName());
-                if (datasetAccessMgr == null) {
-                    datasetAccessMgr = new ExternalDatasetAccessManager();
-                    dataverseRegister.put(dataset.getDatasetName(), datasetAccessMgr);
-                }
-            }
+        String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
+        ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key);
+        if (datasetAccessMgr == null) {
+            datasetAccessMgr = globalRegister.put(key, new ExternalDatasetAccessManager());
         }
         // aquire the correct lock
         datasetAccessMgr.refreshBegin();
     }
 
-    public synchronized void removeDatasetInfo(Dataset dataset) {
-        HashMap<String, ExternalDatasetAccessManager> dataverseRegister = globalRegister
-                .get(dataset.getDataverseName());
-        if (dataverseRegister != null) {
-            dataverseRegister.remove(dataset.getDatasetName());
-        }
-    }
-
-    public synchronized void removeDataverse(Dataverse dataverse) {
-        globalRegister.remove(dataverse.getDataverseName());
+    public void removeDatasetInfo(Dataset dataset) {
+        String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
+        globalRegister.remove(key);
     }
 
     public void refreshEnd(Dataset dataset, boolean success) {
-        HashMap<String, ExternalDatasetAccessManager> dataverseRegistry = globalRegister
-                .get(dataset.getDataverseName());
-        ExternalDatasetAccessManager datasetLockManager = dataverseRegistry.get(dataset.getDatasetName());
-        datasetLockManager.refreshEnd(success);
+        String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
+        globalRegister.get(key).refreshEnd(success);
     }
 
-    public void buildIndexBegin(Dataset dataset) {
-        HashMap<String, ExternalDatasetAccessManager> dataverseRegister;
-        ExternalDatasetAccessManager datasetAccessMgr;
-        synchronized (this) {
-            dataverseRegister = globalRegister.get(dataset.getDataverseName());
-            if (dataverseRegister == null) {
-                // Create a register for the dataverse, and put the dataset their with the initial value of 0
-                dataverseRegister = new HashMap<String, ExternalDatasetAccessManager>();
-                datasetAccessMgr = new ExternalDatasetAccessManager();
-                dataverseRegister.put(dataset.getDatasetName(), datasetAccessMgr);
-                globalRegister.put(dataset.getDataverseName(), dataverseRegister);
-            } else {
-                datasetAccessMgr = dataverseRegister.get(dataset.getDatasetName());
-                if (datasetAccessMgr == null) {
-                    datasetAccessMgr = new ExternalDatasetAccessManager();
-                    dataverseRegister.put(dataset.getDatasetName(), datasetAccessMgr);
-                }
-            }
+    public void buildIndexBegin(Dataset dataset, boolean firstIndex) {
+        String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
+        ExternalDatasetAccessManager datasetAccessMgr = globalRegister.get(key);
+        if (datasetAccessMgr == null) {
+            globalRegister.putIfAbsent(key, new ExternalDatasetAccessManager());
+            datasetAccessMgr = globalRegister.get(key);
         }
-        datasetAccessMgr.buildIndexBegin();
+        // aquire the correct lock
+        datasetAccessMgr.buildIndexBegin(firstIndex);
     }
 
-    public void buildIndexEnd(Dataset dataset) {
-        HashMap<String, ExternalDatasetAccessManager> dataverseRegistry = globalRegister
-                .get(dataset.getDataverseName());
-        ExternalDatasetAccessManager datasetLockManager = dataverseRegistry.get(dataset.getDatasetName());
-        datasetLockManager.buildIndexEnd();
+    public void buildIndexEnd(Dataset dataset, boolean firstIndex) {
+        String key = dataset.getDataverseName() + "." + dataset.getDatasetName();
+        globalRegister.get(key).buildIndexEnd(firstIndex);
     }
 
     public void releaseAcquiredLocks(AqlMetadataProvider metadataProvider) {
@@ -180,12 +112,7 @@
             // if dataset was accessed already by this job, return the registered version
             Set<Entry<String, Integer>> aquiredLocks = locks.entrySet();
             for (Entry<String, Integer> entry : aquiredLocks) {
-                //Get dataverse name
-                String dvName = entry.getKey().substring(0, entry.getKey().indexOf("."));
-                String dsName = entry.getKey().substring(entry.getKey().indexOf(".") + 1);
-                HashMap<String, ExternalDatasetAccessManager> dataverseRegistry = globalRegister.get(dvName);
-                ExternalDatasetAccessManager datasetLockManager = dataverseRegistry.get(dsName);
-                datasetLockManager.queryEnd(entry.getValue());
+                globalRegister.get(entry.getKey()).queryEnd(entry.getValue());
             }
             locks.clear();
         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
new file mode 100644
index 0000000..02e40b2
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
@@ -0,0 +1,512 @@
+package edu.uci.ics.asterix.metadata.utils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import edu.uci.ics.asterix.metadata.entities.Dataverse;
+
+public class MetadataLockManager {
+
+    public static MetadataLockManager INSTANCE = new MetadataLockManager();
+    private ConcurrentHashMap<String, ReentrantReadWriteLock> dataversesLocks;
+    private ConcurrentHashMap<String, DatasetLock> datasetsLocks;
+    private ConcurrentHashMap<String, ReentrantReadWriteLock> functionsLocks;
+    private ConcurrentHashMap<String, ReentrantReadWriteLock> nodeGroupsLocks;
+    private ConcurrentHashMap<String, ReentrantReadWriteLock> feedsLocks;
+    private ConcurrentHashMap<String, ReentrantReadWriteLock> compactionPolicyLocks;
+    private ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks;
+
+    private MetadataLockManager() {
+        dataversesLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
+        datasetsLocks = new ConcurrentHashMap<String, DatasetLock>();
+        functionsLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
+        nodeGroupsLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
+        feedsLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
+        compactionPolicyLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
+        dataTypeLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
+    }
+
+    public void acquireDataverseReadLock(String dataverseName) {
+        ReentrantReadWriteLock dvLock = dataversesLocks.get(dataverseName);
+        if (dvLock == null) {
+            dataversesLocks.putIfAbsent(dataverseName, new ReentrantReadWriteLock());
+            dvLock = dataversesLocks.get(dataverseName);
+        }
+        dvLock.readLock().lock();
+    }
+
+    public void releaseDataverseReadLock(String dataverseName) {
+        dataversesLocks.get(dataverseName).readLock().unlock();
+    }
+
+    public void acquireDataverseWriteLock(String dataverseName) {
+        ReentrantReadWriteLock dvLock = dataversesLocks.get(dataverseName);
+        if (dvLock == null) {
+            dataversesLocks.putIfAbsent(dataverseName, new ReentrantReadWriteLock());
+            dvLock = dataversesLocks.get(dataverseName);
+        }
+        dvLock.writeLock().lock();
+    }
+
+    public void releaseDataverseWriteLock(String dataverseName) {
+        dataversesLocks.get(dataverseName).writeLock().unlock();
+    }
+
+    public void acquireDatasetReadLock(String datasetName) {
+        DatasetLock dsLock = datasetsLocks.get(datasetName);
+        if (dsLock == null) {
+            datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
+            dsLock = datasetsLocks.get(datasetName);
+        }
+        dsLock.acquireReadLock();
+    }
+
+    public void releaseDatasetReadLock(String datasetName) {
+        datasetsLocks.get(datasetName).releaseReadLock();
+    }
+
+    public void acquireDatasetWriteLock(String datasetName) {
+        DatasetLock dsLock = datasetsLocks.get(datasetName);
+        if (dsLock == null) {
+            datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
+            dsLock = datasetsLocks.get(datasetName);
+        }
+        dsLock.acquireWriteLock();
+    }
+
+    public void releaseDatasetWriteLock(String datasetName) {
+        datasetsLocks.get(datasetName).releaseWriteLock();
+    }
+
+    public void acquireDatasetModifyLock(String datasetName) {
+        DatasetLock dsLock = datasetsLocks.get(datasetName);
+        if (dsLock == null) {
+            datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
+            dsLock = datasetsLocks.get(datasetName);
+        }
+        dsLock.acquireReadLock();
+        dsLock.acquireReadModifyLock();
+    }
+
+    public void releaseDatasetModifyLock(String datasetName) {
+        DatasetLock dsLock = datasetsLocks.get(datasetName);
+        dsLock.releaseReadModifyLock();
+        dsLock.releaseReadLock();
+    }
+
+    public void acquireDatasetCreateIndexLock(String datasetName) {
+        DatasetLock dsLock = datasetsLocks.get(datasetName);
+        if (dsLock == null) {
+            datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
+            dsLock = datasetsLocks.get(datasetName);
+        }
+        dsLock.acquireReadLock();
+        dsLock.acquireWriteModifyLock();
+    }
+
+    public void releaseDatasetCreateIndexLock(String datasetName) {
+        DatasetLock dsLock = datasetsLocks.get(datasetName);
+        dsLock.releaseWriteModifyLock();
+        dsLock.releaseReadLock();
+    }
+
+    public void acquireExternalDatasetRefreshLock(String datasetName) {
+        DatasetLock dsLock = datasetsLocks.get(datasetName);
+        if (dsLock == null) {
+            datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
+            dsLock = datasetsLocks.get(datasetName);
+        }
+        dsLock.acquireReadLock();
+        dsLock.acquireRefreshLock();
+    }
+
+    public void releaseExternalDatasetRefreshLock(String datasetName) {
+        DatasetLock dsLock = datasetsLocks.get(datasetName);
+        dsLock.releaseRefreshLock();
+        dsLock.releaseReadLock();
+    }
+
+    public void acquireFunctionReadLock(String functionName) {
+        ReentrantReadWriteLock fLock = functionsLocks.get(functionName);
+        if (fLock == null) {
+            functionsLocks.putIfAbsent(functionName, new ReentrantReadWriteLock());
+            fLock = functionsLocks.get(functionName);
+        }
+        fLock.readLock().lock();
+    }
+
+    public void releaseFunctionReadLock(String functionName) {
+        functionsLocks.get(functionName).readLock().unlock();
+    }
+
+    public void acquireFunctionWriteLock(String functionName) {
+        ReentrantReadWriteLock fLock = functionsLocks.get(functionName);
+        if (fLock == null) {
+            functionsLocks.putIfAbsent(functionName, new ReentrantReadWriteLock());
+            fLock = functionsLocks.get(functionName);
+        }
+        fLock.writeLock().lock();
+    }
+
+    public void releaseFunctionWriteLock(String functionName) {
+        functionsLocks.get(functionName).writeLock().unlock();
+    }
+
+    public void acquireNodeGroupReadLock(String nodeGroupName) {
+        ReentrantReadWriteLock ngLock = nodeGroupsLocks.get(nodeGroupName);
+        if (ngLock == null) {
+            nodeGroupsLocks.putIfAbsent(nodeGroupName, new ReentrantReadWriteLock());
+            ngLock = nodeGroupsLocks.get(nodeGroupName);
+        }
+        ngLock.readLock().lock();
+    }
+
+    public void releaseNodeGroupReadLock(String nodeGroupName) {
+        nodeGroupsLocks.get(nodeGroupName).readLock().unlock();
+    }
+
+    public void acquireNodeGroupWriteLock(String nodeGroupName) {
+        ReentrantReadWriteLock ngLock = nodeGroupsLocks.get(nodeGroupName);
+        if (ngLock == null) {
+            nodeGroupsLocks.putIfAbsent(nodeGroupName, new ReentrantReadWriteLock());
+            ngLock = nodeGroupsLocks.get(nodeGroupName);
+        }
+        ngLock.writeLock().lock();
+    }
+
+    public void releaseNodeGroupWriteLock(String nodeGroupName) {
+        nodeGroupsLocks.get(nodeGroupName).writeLock().unlock();
+    }
+
+    public void acquireFeedReadLock(String feedName) {
+        ReentrantReadWriteLock fLock = feedsLocks.get(feedName);
+        if (fLock == null) {
+            feedsLocks.putIfAbsent(feedName, new ReentrantReadWriteLock());
+            fLock = feedsLocks.get(feedName);
+        }
+        fLock.readLock().lock();
+    }
+
+    public void releaseFeedReadLock(String feedName) {
+        feedsLocks.get(feedName).readLock().unlock();
+    }
+
+    public void acquireFeedWriteLock(String feedName) {
+        ReentrantReadWriteLock fLock = feedsLocks.get(feedName);
+        if (fLock == null) {
+            feedsLocks.putIfAbsent(feedName, new ReentrantReadWriteLock());
+            fLock = feedsLocks.get(feedName);
+        }
+        fLock.writeLock().lock();
+    }
+
+    public void releaseFeedWriteLock(String feedName) {
+        feedsLocks.get(feedName).writeLock().unlock();
+    }
+
+    public void acquireCompactionPolicyReadLock(String compactionPolicyName) {
+        ReentrantReadWriteLock compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
+        if (compactionPolicyLock == null) {
+            compactionPolicyLocks.putIfAbsent(compactionPolicyName, new ReentrantReadWriteLock());
+            compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
+        }
+        compactionPolicyLock.readLock().lock();
+    }
+
+    public void releaseCompactionPolicyReadLock(String compactionPolicyName) {
+        compactionPolicyLocks.get(compactionPolicyName).readLock().unlock();
+    }
+
+    public void acquireCompactionPolicyWriteLock(String compactionPolicyName) {
+        ReentrantReadWriteLock compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
+        if (compactionPolicyLock == null) {
+            compactionPolicyLocks.putIfAbsent(compactionPolicyName, new ReentrantReadWriteLock());
+            compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
+        }
+        compactionPolicyLock.writeLock().lock();
+    }
+
+    public void releaseCompactionPolicyWriteLock(String compactionPolicyName) {
+        compactionPolicyLocks.get(compactionPolicyName).writeLock().unlock();
+    }
+
+    public void acquireDataTypeReadLock(String dataTypeName) {
+        ReentrantReadWriteLock dataTypeLock = dataTypeLocks.get(dataTypeName);
+        if (dataTypeLock == null) {
+            dataTypeLocks.putIfAbsent(dataTypeName, new ReentrantReadWriteLock());
+            dataTypeLock = dataTypeLocks.get(dataTypeName);
+        }
+        dataTypeLock.readLock().lock();
+    }
+
+    public void releaseDataTypeReadLock(String dataTypeName) {
+        dataTypeLocks.get(dataTypeName).readLock().unlock();
+    }
+
+    public void acquireDataTypeWriteLock(String dataTypeName) {
+        ReentrantReadWriteLock dataTypeLock = dataTypeLocks.get(dataTypeName);
+        if (dataTypeLock == null) {
+            dataTypeLocks.putIfAbsent(dataTypeName, new ReentrantReadWriteLock());
+            dataTypeLock = dataTypeLocks.get(dataTypeName);
+        }
+        dataTypeLock.writeLock().lock();
+    }
+
+    public void releaseDataTypeWriteLock(String dataTypeName) {
+        dataTypeLocks.get(dataTypeName).writeLock().unlock();
+    }
+
+    public void createDatasetBegin(String dataverseName, String itemTypeFullyQualifiedName, String nodeGroupName,
+            String compactionPolicyName, String datasetFullyQualifiedName, boolean isDefaultCompactionPolicy) {
+        acquireDataverseReadLock(dataverseName);
+        acquireDataTypeReadLock(itemTypeFullyQualifiedName);
+        acquireNodeGroupReadLock(nodeGroupName);
+        if (!isDefaultCompactionPolicy) {
+            acquireCompactionPolicyReadLock(compactionPolicyName);
+        }
+        acquireDatasetWriteLock(datasetFullyQualifiedName);
+    }
+
+    public void createDatasetEnd(String dataverseName, String itemTypeFullyQualifiedName, String nodeGroupName,
+            String compactionPolicyName, String datasetFullyQualifiedName, boolean isDefaultCompactionPolicy) {
+        releaseDatasetWriteLock(datasetFullyQualifiedName);
+        if (!isDefaultCompactionPolicy) {
+            releaseCompactionPolicyReadLock(compactionPolicyName);
+        }
+        releaseNodeGroupReadLock(nodeGroupName);
+        releaseDataTypeReadLock(itemTypeFullyQualifiedName);
+        releaseDataverseReadLock(dataverseName);
+    }
+
+    public void createIndexBegin(String dataverseName, String datasetFullyQualifiedName) {
+        acquireDataverseReadLock(dataverseName);
+        acquireDatasetCreateIndexLock(datasetFullyQualifiedName);
+    }
+
+    public void createIndexEnd(String dataverseName, String datasetFullyQualifiedName) {
+        releaseDatasetCreateIndexLock(datasetFullyQualifiedName);
+        releaseDataverseReadLock(dataverseName);
+    }
+
+    public void createTypeBegin(String dataverseName, String itemTypeFullyQualifiedName) {
+        acquireDataverseReadLock(dataverseName);
+        acquireDataTypeWriteLock(itemTypeFullyQualifiedName);
+    }
+
+    public void createTypeEnd(String dataverseName, String itemTypeFullyQualifiedName) {
+        releaseDataTypeWriteLock(itemTypeFullyQualifiedName);
+        releaseDataverseReadLock(dataverseName);
+    }
+
+    public void dropDatasetBegin(String dataverseName, String datasetFullyQualifiedName) {
+        acquireDataverseReadLock(dataverseName);
+        acquireDatasetWriteLock(datasetFullyQualifiedName);
+    }
+
+    public void dropDatasetEnd(String dataverseName, String datasetFullyQualifiedName) {
+        releaseDatasetWriteLock(datasetFullyQualifiedName);
+        releaseDataverseReadLock(dataverseName);
+    }
+
+    public void dropIndexBegin(String dataverseName, String datasetFullyQualifiedName) {
+        acquireDataverseReadLock(dataverseName);
+        acquireDatasetWriteLock(datasetFullyQualifiedName);
+    }
+
+    public void dropIndexEnd(String dataverseName, String datasetFullyQualifiedName) {
+        releaseDatasetWriteLock(datasetFullyQualifiedName);
+        releaseDataverseReadLock(dataverseName);
+    }
+
+    public void dropTypeBegin(String dataverseName, String dataTypeFullyQualifiedName) {
+        acquireDataverseReadLock(dataverseName);
+        acquireDataTypeWriteLock(dataTypeFullyQualifiedName);
+    }
+
+    public void dropTypeEnd(String dataverseName, String dataTypeFullyQualifiedName) {
+        releaseDataTypeWriteLock(dataTypeFullyQualifiedName);
+        releaseDataverseReadLock(dataverseName);
+    }
+
+    public void functionStatementBegin(String dataverseName, String functionFullyQualifiedName) {
+        acquireDataverseReadLock(dataverseName);
+        acquireFunctionWriteLock(functionFullyQualifiedName);
+    }
+
+    public void functionStatementEnd(String dataverseName, String functionFullyQualifiedName) {
+        releaseFunctionWriteLock(functionFullyQualifiedName);
+        releaseDataverseReadLock(dataverseName);
+    }
+
+    public void modifyDatasetBegin(String dataverseName, String datasetFullyQualifiedName) {
+        acquireDataverseReadLock(dataverseName);
+        acquireDatasetModifyLock(datasetFullyQualifiedName);
+    }
+
+    public void modifyDatasetEnd(String dataverseName, String datasetFullyQualifiedName) {
+        releaseDatasetModifyLock(datasetFullyQualifiedName);
+        releaseDataverseReadLock(dataverseName);
+    }
+
+    public void insertDeleteBegin(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
+            List<String> datasets) {
+        dataverses.add(dataverseName);
+        datasets.add(datasetFullyQualifiedName);
+        Collections.sort(dataverses);
+        Collections.sort(datasets);
+
+        String previous = null;
+        for (int i = 0; i < dataverses.size(); i++) {
+            String current = dataverses.get(i);
+            if (!current.equals(previous)) {
+                acquireDataverseReadLock(current);
+                previous = current;
+            }
+        }
+
+        for (int i = 0; i < datasets.size(); i++) {
+            String current = datasets.get(i);
+            if (!current.equals(previous)) {
+                if (current.equals(datasetFullyQualifiedName)) {
+                    acquireDatasetModifyLock(current);
+                } else {
+                    acquireDatasetReadLock(current);
+                }
+                previous = current;
+            }
+        }
+    }
+
+    public void insertDeleteEnd(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
+            List<String> datasets) {
+        String previous = null;
+        for (int i = dataverses.size() - 1; i >= 0; i--) {
+            String current = dataverses.get(i);
+            if (!current.equals(previous)) {
+                releaseDataverseReadLock(current);
+                previous = current;
+            }
+        }
+        for (int i = datasets.size() - 1; i >= 0; i--) {
+            String current = datasets.get(i);
+            if (!current.equals(previous)) {
+                if (current.equals(datasetFullyQualifiedName)) {
+                    releaseDatasetModifyLock(current);
+                } else {
+                    releaseDatasetReadLock(current);
+                }
+                previous = current;
+            }
+        }
+    }
+
+    public void dropFeedBegin(String dataverseName, String feedFullyQualifiedName) {
+        acquireDataverseReadLock(dataverseName);
+        acquireFeedWriteLock(feedFullyQualifiedName);
+    }
+
+    public void dropFeedEnd(String dataverseName, String feedFullyQualifiedName) {
+        releaseFeedWriteLock(feedFullyQualifiedName);
+        releaseDataverseReadLock(dataverseName);
+    }
+
+    public void createFeedBegin(String dataverseName, String feedFullyQualifiedName) {
+        acquireDataverseReadLock(dataverseName);
+        acquireFeedWriteLock(feedFullyQualifiedName);
+    }
+
+    public void createFeedEnd(String dataverseName, String feedFullyQualifiedName) {
+        releaseFeedWriteLock(feedFullyQualifiedName);
+        releaseDataverseReadLock(dataverseName);
+    }
+
+    public void connectFeedBegin(String dataverseName, String datasetFullyQualifiedName, String feedFullyQualifiedName) {
+        acquireDataverseReadLock(dataverseName);
+        acquireDatasetReadLock(datasetFullyQualifiedName);
+        acquireFeedReadLock(feedFullyQualifiedName);
+    }
+
+    public void connectFeedEnd(String dataverseName, String datasetFullyQualifiedName, String feedFullyQualifiedName) {
+        releaseFeedReadLock(feedFullyQualifiedName);
+        releaseDatasetReadLock(datasetFullyQualifiedName);
+        releaseDataverseReadLock(dataverseName);
+    }
+
+    public void disconnectFeedBegin(String dataverseName, String datasetFullyQualifiedName,
+            String feedFullyQualifiedName) {
+        acquireDataverseReadLock(dataverseName);
+        acquireDatasetReadLock(datasetFullyQualifiedName);
+        acquireFeedReadLock(feedFullyQualifiedName);
+    }
+
+    public void disconnectFeedEnd(String dataverseName, String datasetFullyQualifiedName, String feedFullyQualifiedName) {
+        releaseFeedReadLock(feedFullyQualifiedName);
+        releaseDatasetReadLock(datasetFullyQualifiedName);
+        releaseDataverseReadLock(dataverseName);
+    }
+
+    public void compactBegin(String dataverseName, String datasetFullyQualifiedName) {
+        acquireDataverseReadLock(dataverseName);
+        acquireDatasetReadLock(datasetFullyQualifiedName);
+    }
+
+    public void compactEnd(String dataverseName, String datasetFullyQualifiedName) {
+        releaseDatasetReadLock(datasetFullyQualifiedName);
+        releaseDataverseReadLock(dataverseName);
+    }
+
+    public void queryBegin(Dataverse dataverse, List<String> dataverses, List<String> datasets) {
+        if (dataverse != null) {
+            dataverses.add(dataverse.getDataverseName());
+        }
+        Collections.sort(dataverses);
+        Collections.sort(datasets);
+
+        String previous = null;
+        for (int i = 0; i < dataverses.size(); i++) {
+            String current = dataverses.get(i);
+            if (!current.equals(previous)) {
+                acquireDataverseReadLock(current);
+                previous = current;
+            }
+        }
+
+        for (int i = 0; i < datasets.size(); i++) {
+            String current = datasets.get(i);
+            if (!current.equals(previous)) {
+                acquireDatasetReadLock(current);
+                previous = current;
+            }
+        }
+    }
+
+    public void queryEnd(List<String> dataverses, List<String> datasets) {
+        String previous = null;
+        for (int i = dataverses.size() - 1; i >= 0; i--) {
+            String current = dataverses.get(i);
+            if (!current.equals(previous)) {
+                releaseDataverseReadLock(current);
+                previous = current;
+            }
+        }
+        for (int i = datasets.size() - 1; i >= 0; i--) {
+            String current = datasets.get(i);
+            if (!current.equals(previous)) {
+                releaseDatasetReadLock(current);
+                previous = current;
+            }
+        }
+    }
+
+    public void refreshDatasetBegin(String dataverseName, String datasetFullyQualifiedName) {
+        acquireDataverseReadLock(dataverseName);
+        acquireExternalDatasetRefreshLock(datasetFullyQualifiedName);
+    }
+
+    public void refreshDatasetEnd(String dataverseName, String datasetFullyQualifiedName) {
+        releaseExternalDatasetRefreshLock(datasetFullyQualifiedName);
+        releaseDataverseReadLock(dataverseName);
+    }
+}