Ensure Metadata locks are acquired for SQL++ queries

Change-Id: I5f468599897a37cbcb12d8577d072f340f0d949c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1642
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index c6ea045..5eae36b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -117,8 +117,7 @@
         MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
         Dataset dataset;
         try {
-            dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
-                    jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
+            dataset = metadataProvider.findDataset(jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
         } catch (MetadataException e) {
             throw new AlgebricksException(e);
         }
@@ -257,8 +256,7 @@
                     jobSpec, queryField, appContext.getStorageManager(), secondarySplitsAndConstraint.first,
                     appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
                     invListsTypeTraits, invListsComparatorFactories, dataflowHelperFactory, queryTokenizerFactory,
-                    searchModifierFactory, outputRecDesc, retainInput, retainMissing,
-                    context.getMissingWriterFactory(),
+                    searchModifierFactory, outputRecDesc, retainInput, retainMissing, context.getMissingWriterFactory(),
                     dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex,
                             ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId(),
                             IndexOperation.SEARCH, null),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
index ebd1bdb..869289a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -90,44 +90,46 @@
             // Metadata transaction begins.
             MetadataManager.INSTANCE.init();
             MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-
             // Retrieves file splits of the dataset.
             MetadataProvider metadataProvider = new MetadataProvider(null, new StorageComponentProvider());
-            metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
-            if (dataset == null) {
-                jsonResponse.put("error",
-                        "Dataset " + datasetName + " does not exist in " + "dataverse " + dataverseName);
+            try {
+                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+                if (dataset == null) {
+                    jsonResponse.put("error",
+                            "Dataset " + datasetName + " does not exist in " + "dataverse " + dataverseName);
+                    out.write(jsonResponse.toString());
+                    out.flush();
+                    return;
+                }
+                boolean temp = dataset.getDatasetDetails().isTemp();
+                FileSplit[] fileSplits =
+                        metadataProvider.splitsForDataset(mdTxnCtx, dataverseName, datasetName, datasetName, temp);
+                ARecordType recordType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(),
+                        dataset.getItemTypeName());
+                List<List<String>> primaryKeys = DatasetUtil.getPartitioningKeys(dataset);
+                StringBuilder pkStrBuf = new StringBuilder();
+                for (List<String> keys : primaryKeys) {
+                    for (String key : keys) {
+                        pkStrBuf.append(key).append(",");
+                    }
+                }
+                pkStrBuf.delete(pkStrBuf.length() - 1, pkStrBuf.length());
+                // Constructs the returned json object.
+                formResponseObject(jsonResponse, fileSplits, recordType, pkStrBuf.toString(), temp,
+                        hcc.getNodeControllerInfos());
+
+                // Flush the cached contents of the dataset to file system.
+                FlushDatasetUtil.flushDataset(hcc, metadataProvider, dataverseName, datasetName, datasetName);
+
+                // Metadata transaction commits.
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                // Writes file splits.
                 out.write(jsonResponse.toString());
                 out.flush();
-                return;
+            } finally {
+                metadataProvider.getLocks().unlock();
             }
-            boolean temp = dataset.getDatasetDetails().isTemp();
-            FileSplit[] fileSplits =
-                    metadataProvider.splitsForDataset(mdTxnCtx, dataverseName, datasetName, datasetName, temp);
-            ARecordType recordType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(),
-                    dataset.getItemTypeName());
-            List<List<String>> primaryKeys = DatasetUtil.getPartitioningKeys(dataset);
-            StringBuilder pkStrBuf = new StringBuilder();
-            for (List<String> keys : primaryKeys) {
-                for (String key : keys) {
-                    pkStrBuf.append(key).append(",");
-                }
-            }
-            pkStrBuf.delete(pkStrBuf.length() - 1, pkStrBuf.length());
-
-            // Constructs the returned json object.
-            formResponseObject(jsonResponse, fileSplits, recordType, pkStrBuf.toString(), temp,
-                    hcc.getNodeControllerInfos());
-
-            // Flush the cached contents of the dataset to file system.
-            FlushDatasetUtil.flushDataset(hcc, metadataProvider, dataverseName, datasetName, datasetName);
-
-            // Metadata transaction commits.
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            // Writes file splits.
-            out.write(jsonResponse.toString());
-            out.flush();
         } catch (Exception e) {
             LOGGER.log(Level.WARNING, "Failure handling a request", e);
             out.println(e.getMessage());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 4ed8510..59b3e88 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -135,13 +135,13 @@
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.entities.NodeGroup;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
+import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry;
+import org.apache.asterix.metadata.lock.MetadataLockManager;
 import org.apache.asterix.metadata.utils.DatasetUtil;
-import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
 import org.apache.asterix.metadata.utils.ExternalIndexingOperations;
 import org.apache.asterix.metadata.utils.IndexUtil;
 import org.apache.asterix.metadata.utils.KeyFieldTypeUtil;
 import org.apache.asterix.metadata.utils.MetadataConstants;
-import org.apache.asterix.metadata.utils.MetadataLockManager;
 import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.metadata.utils.TypeUtil;
 import org.apache.asterix.om.types.ARecordType;
@@ -411,7 +411,7 @@
         String dvName = dvd.getDataverseName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
+        MetadataLockManager.INSTANCE.acquireDataverseReadLock(metadataProvider.getLocks(), dvName);
         try {
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
             if (dv == null) {
@@ -423,7 +423,7 @@
             abort(e, e, mdTxnCtx);
             throw new MetadataException(e);
         } finally {
-            MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -434,7 +434,7 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-        MetadataLockManager.INSTANCE.acquireDataverseReadLock(dvName);
+        MetadataLockManager.INSTANCE.acquireDataverseReadLock(metadataProvider.getLocks(), dvName);
         try {
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
             if (dv != null) {
@@ -452,7 +452,7 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.releaseDataverseReadLock(dvName);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -509,18 +509,15 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-        MetadataLockManager.INSTANCE.createDatasetBegin(dataverseName, itemTypeDataverseName,
-                itemTypeDataverseName + "." + itemTypeName, metaItemTypeDataverseName,
+        MetadataLockManager.INSTANCE.createDatasetBegin(metadataProvider.getLocks(), dataverseName,
+                itemTypeDataverseName, itemTypeDataverseName + "." + itemTypeName, metaItemTypeDataverseName,
                 metaItemTypeDataverseName + "." + metaItemTypeName, nodegroupName, compactionPolicy,
                 dataverseName + "." + datasetName, defaultCompactionPolicy);
         Dataset dataset = null;
         Index primaryIndex = null;
         try {
-
             IDatasetDetails datasetDetails = null;
-            Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-                    datasetName);
+            Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
             if (ds != null) {
                 if (dd.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -671,13 +668,9 @@
                             + "." + datasetName + ") couldn't be removed from the metadata", e);
                 }
             }
-
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.createDatasetEnd(dataverseName, itemTypeDataverseName,
-                    itemTypeDataverseName + "." + itemTypeName, metaItemTypeDataverseName,
-                    metaItemTypeDataverseName + "." + metaItemTypeName, nodegroupName, compactionPolicy,
-                    dataverseName + "." + datasetName, defaultCompactionPolicy);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -770,9 +763,8 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-        MetadataLockManager.INSTANCE.createIndexBegin(dataverseName, dataverseName + "." + datasetName);
-
+        MetadataLockManager.INSTANCE.createIndexBegin(metadataProvider.getLocks(), dataverseName,
+                dataverseName + "." + datasetName);
         String indexName = null;
         JobSpecification spec = null;
         Dataset ds = null;
@@ -784,8 +776,7 @@
         boolean datasetLocked = false;
         Index index = null;
         try {
-            ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-                    datasetName);
+            ds = metadataProvider.findDataset(dataverseName, datasetName);
             if (ds == null) {
                 throw new AlgebricksException(
                         "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
@@ -1105,7 +1096,7 @@
             }
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.createIndexEnd(dataverseName, dataverseName + "." + datasetName);
+            metadataProvider.getLocks().unlock();
             if (datasetLocked) {
                 ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, firstExternalDatasetIndex);
             }
@@ -1118,7 +1109,8 @@
         String typeName = stmtCreateType.getIdent().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.createTypeBegin(dataverseName, dataverseName + "." + typeName);
+        MetadataLockManager.INSTANCE.createTypeBegin(metadataProvider.getLocks(), dataverseName,
+                dataverseName + "." + typeName);
         try {
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
             if (dv == null) {
@@ -1145,7 +1137,7 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.createTypeEnd(dataverseName, dataverseName + "." + typeName);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -1162,8 +1154,8 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.acquireDataverseWriteLock(dataverseName);
         List<JobSpecification> jobsToExecute = new ArrayList<>();
+        MetadataLockManager.INSTANCE.acquireDataverseWriteLock(metadataProvider.getLocks(), dataverseName);
         try {
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
             if (dv == null) {
@@ -1177,12 +1169,16 @@
             // # disconnect all feeds from any datasets in the dataverse.
             IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             Identifier dvId = new Identifier(dataverseName);
+            MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getDefaultDataverse(),
+                    metadataProvider.getStorageComponentProvider());
+            tempMdProvider.setConfig(metadataProvider.getConfig());
             for (IActiveEntityEventsListener listener : activeListeners) {
                 EntityId activeEntityId = listener.getEntityId();
                 if (activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME)
                         && activeEntityId.getDataverse().equals(dataverseName)) {
+                    tempMdProvider.getLocks().reset();
                     stopFeedBeforeDelete(new Pair<>(dvId, new Identifier(activeEntityId.getEntityName())),
-                            metadataProvider);
+                            tempMdProvider);
                     // prepare job to remove feed log storage
                     jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
                             MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName())));
@@ -1199,8 +1195,8 @@
                             MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
                         if (indexes.get(k).isSecondaryIndex()) {
-                            jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider,
-                                    datasets.get(j)));
+                            jobsToExecute.add(
+                                    IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider, datasets.get(j)));
                         }
                     }
                     Index primaryIndex =
@@ -1215,8 +1211,8 @@
                             jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider,
                                     datasets.get(j)));
                         } else {
-                            jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider,
-                                    datasets.get(j)));
+                            jobsToExecute.add(
+                                    IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider, datasets.get(j)));
                         }
                     }
                     ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j));
@@ -1285,7 +1281,7 @@
 
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.releaseDataverseWriteLock(dataverseName);
+            metadataProvider.getLocks().unlock();
             ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
     }
@@ -1309,7 +1305,13 @@
         DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
         String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
         String datasetName = stmtDelete.getDatasetName().getValue();
-        doDropDataset(dataverseName, datasetName, metadataProvider, stmtDelete.getIfExists(), hcc);
+        MetadataLockManager.INSTANCE.dropDatasetBegin(metadataProvider.getLocks(), dataverseName,
+                dataverseName + "." + datasetName);
+        try {
+            doDropDataset(dataverseName, datasetName, metadataProvider, stmtDelete.getIfExists(), hcc);
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
     }
 
     public static void doDropDataset(String dataverseName, String datasetName, MetadataProvider metadataProvider,
@@ -1319,10 +1321,9 @@
                 new MutableObject<>(MetadataManager.INSTANCE.beginTransaction());
         MutableBoolean bActiveTxn = new MutableBoolean(true);
         metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
-        MetadataLockManager.INSTANCE.dropDatasetBegin(dataverseName, dataverseName + "." + datasetName);
         List<JobSpecification> jobsToExecute = new ArrayList<>();
         try {
-            Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx.getValue(), dataverseName, datasetName);
+            Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
             if (ds == null) {
                 if (ifExists) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
@@ -1367,7 +1368,6 @@
             }
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, dataverseName + "." + datasetName);
             ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
     }
@@ -1382,16 +1382,14 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
-        MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, dataverseName + "." + datasetName);
-
+        List<JobSpecification> jobsToExecute = new ArrayList<>();
+        MetadataLockManager.INSTANCE.dropIndexBegin(metadataProvider.getLocks(), dataverseName,
+                dataverseName + "." + datasetName);
         String indexName = null;
         // For external index
         boolean dropFilesIndex = false;
-        List<JobSpecification> jobsToExecute = new ArrayList<>();
         try {
-
-            Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
             if (ds == null) {
                 throw new AlgebricksException(
                         "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
@@ -1558,7 +1556,7 @@
             throw e;
 
         } finally {
-            MetadataLockManager.INSTANCE.dropIndexEnd(dataverseName, dataverseName + "." + datasetName);
+            metadataProvider.getLocks().unlock();
             ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
     }
@@ -1571,8 +1569,8 @@
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.dropTypeBegin(dataverseName, dataverseName + "." + typeName);
-
+        MetadataLockManager.INSTANCE.dropTypeBegin(metadataProvider.getLocks(), dataverseName,
+                dataverseName + "." + typeName);
         try {
             Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
             if (dt == null) {
@@ -1587,7 +1585,7 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.dropTypeEnd(dataverseName, dataverseName + "." + typeName);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -1596,7 +1594,7 @@
         String nodegroupName = stmtDelete.getNodeGroupName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(nodegroupName);
+        MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodegroupName);
         try {
             NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
             if (ng == null) {
@@ -1612,7 +1610,7 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.releaseNodeGroupWriteLock(nodegroupName);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -1624,9 +1622,9 @@
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.functionStatementBegin(dataverse, dataverse + "." + functionName);
+        MetadataLockManager.INSTANCE.functionStatementBegin(metadataProvider.getLocks(), dataverse,
+                dataverse + "." + functionName);
         try {
-
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
             if (dv == null) {
                 throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
@@ -1641,7 +1639,7 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.functionStatementEnd(dataverse, dataverse + "." + functionName);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -1651,7 +1649,7 @@
         signature.setNamespace(getActiveDataverseName(signature.getNamespace()));
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.functionStatementBegin(signature.getNamespace(),
+        MetadataLockManager.INSTANCE.functionStatementBegin(metadataProvider.getLocks(), signature.getNamespace(),
                 signature.getNamespace() + "." + signature.getName());
         try {
             Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
@@ -1667,8 +1665,7 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.functionStatementEnd(signature.getNamespace(),
-                    signature.getNamespace() + "." + signature.getName());
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -1680,7 +1677,8 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.modifyDatasetBegin(dataverseName, dataverseName + "." + datasetName);
+        MetadataLockManager.INSTANCE.modifyDatasetBegin(metadataProvider.getLocks(), dataverseName,
+                dataverseName + "." + datasetName);
         try {
             CompiledLoadFromFileStatement cls =
                     new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName().getValue(),
@@ -1697,7 +1695,7 @@
             }
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.modifyDatasetEnd(dataverseName, dataverseName + "." + datasetName);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -1707,21 +1705,16 @@
             throws Exception {
         InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
         String dataverseName = getActiveDataverse(stmtInsertUpsert.getDataverseName());
-        Query query = stmtInsertUpsert.getQuery();
-
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() {
-                MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseName,
-                        dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(),
-                        query.getDatasets());
+                MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(metadataProvider.getLocks(),
+                        dataverseName + "." + stmtInsertUpsert.getDatasetName());
             }
 
             @Override
             public void unlock() {
-                MetadataLockManager.INSTANCE.insertDeleteUpsertEnd(dataverseName,
-                        dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(),
-                        query.getDatasets());
+                metadataProvider.getLocks().unlock();
             }
         };
         final IStatementCompiler compiler = () -> {
@@ -1764,16 +1757,13 @@
 
     public JobSpecification handleDeleteStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, boolean compileOnly) throws Exception {
-
         DeleteStatement stmtDelete = (DeleteStatement) stmt;
         String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseName,
-                dataverseName + "." + stmtDelete.getDatasetName(), stmtDelete.getDataverses(),
-                stmtDelete.getDatasets());
-
+        MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(metadataProvider.getLocks(),
+                dataverseName + "." + stmtDelete.getDatasetName());
         try {
             metadataProvider.setWriteTransaction(true);
             CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
@@ -1788,16 +1778,13 @@
                 JobUtils.runJob(hcc, jobSpec, true);
             }
             return jobSpec;
-
         } catch (Exception e) {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
             }
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.insertDeleteUpsertEnd(dataverseName,
-                    dataverseName + "." + stmtDelete.getDatasetName(), stmtDelete.getDataverses(),
-                    stmtDelete.getDatasets());
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -1852,7 +1839,8 @@
         String feedName = cfs.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.createFeedBegin(dataverseName, dataverseName + "." + feedName);
+        MetadataLockManager.INSTANCE.createFeedBegin(metadataProvider.getLocks(), dataverseName,
+                dataverseName + "." + feedName);
         Feed feed = null;
         try {
             feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName, feedName);
@@ -1873,7 +1861,7 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.createFeedEnd(dataverseName, dataverseName + "." + feedName);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -1886,7 +1874,8 @@
         CreateFeedPolicyStatement cfps = (CreateFeedPolicyStatement) stmt;
         dataverse = getActiveDataverse(null);
         policy = cfps.getPolicyName();
-        MetadataLockManager.INSTANCE.createFeedPolicyBegin(dataverse, dataverse + "." + policy);
+        MetadataLockManager.INSTANCE.createFeedPolicyBegin(metadataProvider.getLocks(), dataverse,
+                dataverse + "." + policy);
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1935,7 +1924,7 @@
             abort(e, e, mdTxnCtx);
             throw new HyracksDataException(e);
         } finally {
-            MetadataLockManager.INSTANCE.createFeedPolicyEnd(dataverse, dataverse + "." + policy);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -1946,8 +1935,8 @@
         String feedName = stmtFeedDrop.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.dropFeedBegin(dataverseName, dataverseName + "." + feedName);
-
+        MetadataLockManager.INSTANCE.dropFeedBegin(metadataProvider.getLocks(), dataverseName,
+                dataverseName + "." + feedName);
         try {
             Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedName);
             if (feed == null) {
@@ -1980,7 +1969,7 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.dropFeedEnd(dataverseName, dataverseName + "." + feedName);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -1990,8 +1979,8 @@
         FeedPolicyDropStatement stmtFeedPolicyDrop = (FeedPolicyDropStatement) stmt;
         String dataverseName = getActiveDataverse(stmtFeedPolicyDrop.getDataverseName());
         String policyName = stmtFeedPolicyDrop.getPolicyName().getValue();
-        MetadataLockManager.INSTANCE.dropFeedPolicyBegin(dataverseName, dataverseName + "." + policyName);
-
+        MetadataLockManager.INSTANCE.dropFeedPolicyBegin(metadataProvider.getLocks(), dataverseName,
+                dataverseName + "." + policyName);
         try {
             FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName, policyName);
             if (feedPolicy == null) {
@@ -2007,7 +1996,7 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.dropFeedPolicyEnd(dataverseName, dataverseName + "." + policyName);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -2035,15 +2024,15 @@
             throw new AlgebricksException("Feed " + feedName + " is started already.");
         }
         // Start
+        MetadataLockManager.INSTANCE.startFeedBegin(metadataProvider.getLocks(), dataverseName,
+                dataverseName + "." + feedName, feedConnections);
         try {
-            MetadataLockManager.INSTANCE.startFeedBegin(dataverseName, dataverseName + "." + feedName, feedConnections);
             // Prepare policy
             List<IDataset> datasets = new ArrayList<>();
             for (FeedConnection connection : feedConnections) {
-                datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, connection.getDataverseName(),
-                        connection.getDatasetName()));
+                Dataset ds = metadataProvider.findDataset(connection.getDataverseName(), connection.getDatasetName());
+                datasets.add(ds);
             }
-
             org.apache.commons.lang3.tuple.Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo =
                     FeedOperations.buildStartFeedJob(sessionConfig, metadataProvider, feed, feedConnections,
                             compilationProvider, storageComponentProvider, qtFactory, hcc);
@@ -2064,7 +2053,7 @@
             }
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.startFeedEnd(dataverseName, dataverseName + "." + feedName, feedConnections);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -2083,7 +2072,7 @@
         // Transaction
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.StopFeedBegin(dataverseName, feedName);
+        MetadataLockManager.INSTANCE.stopFeedBegin(metadataProvider.getLocks(), dataverseName, feedName);
         try {
             // validate
             FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, mdTxnCtx);
@@ -2097,7 +2086,7 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.StopFeedEnd(dataverseName, feedName);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -2110,17 +2099,17 @@
         String policyName = cfs.getPolicy();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        // validation
-        FeedMetadataUtil.validateIfDatasetExists(dataverseName, datasetName, mdTxnCtx);
-        Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName,
-                metadataProvider.getMetadataTxnContext());
-        ARecordType outputType = FeedMetadataUtil.getOutputType(feed, feed.getAdapterConfiguration(),
-                ExternalDataConstants.KEY_TYPE_NAME);
-        List<FunctionSignature> appliedFunctions = cfs.getAppliedFunctions();
         // Transaction handling
-        MetadataLockManager.INSTANCE.connectFeedBegin(dataverseName, dataverseName + "." + datasetName,
-                dataverseName + "." + feedName);
+        MetadataLockManager.INSTANCE.connectFeedBegin(metadataProvider.getLocks(), dataverseName,
+                dataverseName + "." + datasetName, dataverseName + "." + feedName);
         try {
+            // validation
+            FeedMetadataUtil.validateIfDatasetExists(metadataProvider, dataverseName, datasetName, mdTxnCtx);
+            Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName,
+                    metadataProvider.getMetadataTxnContext());
+            ARecordType outputType = FeedMetadataUtil.getOutputType(feed, feed.getAdapterConfiguration(),
+                    ExternalDataConstants.KEY_TYPE_NAME);
+            List<FunctionSignature> appliedFunctions = cfs.getAppliedFunctions();
             fc = MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(), dataverseName,
                     feedName, datasetName);
             if (fc != null) {
@@ -2134,8 +2123,7 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.connectFeedEnd(dataverseName, dataverseName + "." + datasetName,
-                    dataverseName + "." + feedName);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -2146,10 +2134,11 @@
         String feedName = cfs.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.disconnectFeedBegin(dataverseName, dataverseName + "." + datasetName,
-                dataverseName + "." + cfs.getFeedName());
+        MetadataLockManager.INSTANCE.disconnectFeedBegin(metadataProvider.getLocks(), dataverseName,
+                dataverseName + "." + datasetName, dataverseName + "." + cfs.getFeedName());
         try {
-            FeedMetadataUtil.validateIfDatasetExists(dataverseName, cfs.getDatasetName().getValue(), mdTxnCtx);
+            FeedMetadataUtil.validateIfDatasetExists(metadataProvider, dataverseName, cfs.getDatasetName().getValue(),
+                    mdTxnCtx);
             FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx);
             FeedConnection fc = MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(),
                     dataverseName, feedName, datasetName);
@@ -2162,8 +2151,7 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.disconnectFeedEnd(dataverseName, dataverseName + "." + datasetName,
-                    dataverseName + "." + cfs.getFeedName());
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -2175,10 +2163,11 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.compactBegin(dataverseName, dataverseName + "." + datasetName);
         List<JobSpecification> jobsToExecute = new ArrayList<>();
+        MetadataLockManager.INSTANCE.compactBegin(metadataProvider.getLocks(), dataverseName,
+                dataverseName + "." + datasetName);
         try {
-            Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
             if (ds == null) {
                 throw new AlgebricksException(
                         "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName + ".");
@@ -2230,7 +2219,7 @@
             }
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.compactEnd(dataverseName, dataverseName + "." + datasetName);
+            metadataProvider.getLocks().unlock();
             ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
     }
@@ -2270,12 +2259,11 @@
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() {
-                MetadataLockManager.INSTANCE.queryBegin(activeDataverse, query.getDataverses(), query.getDatasets());
             }
 
             @Override
             public void unlock() {
-                MetadataLockManager.INSTANCE.queryEnd(query.getDataverses(), query.getDatasets());
+                metadataProvider.getLocks().unlock();
                 // release external datasets' locks acquired during compilation of the query
                 ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
             }
@@ -2389,11 +2377,11 @@
                 printer.print(jobId);
             }
         } finally {
+            locker.unlock();
             // No matter the job succeeds or fails, removes it into the context.
             if (ctx != null && clientContextId != null) {
                 ctx.removeJobIdFromClientContextId(clientContextId);
             }
-            locker.unlock();
         }
     }
 
@@ -2404,8 +2392,7 @@
 
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(ngName);
-
+        MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(), ngName);
         try {
             NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
             if (ng != null) {
@@ -2425,7 +2412,7 @@
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.releaseNodeGroupWriteLock(ngName);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -2436,7 +2423,6 @@
         String datasetName = stmtRefresh.getDatasetName().getValue();
         TransactionState transactionState = TransactionState.COMMIT;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        MetadataLockManager.INSTANCE.refreshDatasetBegin(dataverseName, dataverseName + "." + datasetName);
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         JobSpecification spec = null;
@@ -2449,10 +2435,10 @@
         Dataset transactionDataset = null;
         boolean lockAquired = false;
         boolean success = false;
+        MetadataLockManager.INSTANCE.refreshDatasetBegin(metadataProvider.getLocks(), dataverseName,
+                dataverseName + "." + datasetName);
         try {
-            ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-                    datasetName);
-
+            ds = metadataProvider.findDataset(dataverseName, datasetName);
             // Dataset exists ?
             if (ds == null) {
                 throw new AlgebricksException(
@@ -2649,7 +2635,7 @@
             if (lockAquired) {
                 ExternalDatasetsRegistry.INSTANCE.refreshEnd(ds, success);
             }
-            MetadataLockManager.INSTANCE.refreshDatasetEnd(dataverseName, dataverseName + "." + datasetName);
+            metadataProvider.getLocks().unlock();
         }
     }
 
@@ -2676,15 +2662,11 @@
         String dataverseNameTo = getActiveDataverse(pregelixStmt.getDataverseNameTo());
         String datasetNameFrom = pregelixStmt.getDatasetNameFrom().getValue();
         String datasetNameTo = pregelixStmt.getDatasetNameTo().getValue();
-
+        String fullyQualifiedDatasetNameTo =
+                DatasetUtil.isFullyQualifiedName(datasetNameTo) ? datasetNameTo : dataverseNameTo + '.' + datasetNameTo;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        List<String> readDataverses = new ArrayList<>();
-        readDataverses.add(dataverseNameFrom);
-        List<String> readDatasets = new ArrayList<>();
-        readDatasets.add(datasetNameFrom);
-        MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseNameTo, datasetNameTo, readDataverses,
-                readDatasets);
+        MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(metadataProvider.getLocks(), fullyQualifiedDatasetNameTo);
         try {
             prepareRunExternalRuntime(metadataProvider, hcc, pregelixStmt, dataverseNameFrom, dataverseNameTo,
                     datasetNameFrom, datasetNameTo, mdTxnCtx);
@@ -2729,8 +2711,7 @@
             }
             throw e;
         } finally {
-            MetadataLockManager.INSTANCE.insertDeleteUpsertEnd(dataverseNameTo, datasetNameTo, readDataverses,
-                    readDatasets);
+            metadataProvider.getLocks().unlock();
         }
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 002e270..7bd1e62 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -111,93 +111,98 @@
                         for (Dataverse dataverse : dataverses) {
                             if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
                                 MetadataProvider metadataProvider = new MetadataProvider(dataverse, componentProvider);
-                                List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
-                                        dataverse.getDataverseName());
-                                for (Dataset dataset : datasets) {
-                                    if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-                                        // External dataset
-                                        // Get indexes
-                                        List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
-                                                dataset.getDataverseName(), dataset.getDatasetName());
-                                        if (!indexes.isEmpty()) {
-                                            // Get the state of the dataset
-                                            ExternalDatasetDetails dsd =
-                                                    (ExternalDatasetDetails) dataset.getDatasetDetails();
-                                            TransactionState datasetState = dsd.getState();
-                                            if (datasetState == TransactionState.BEGIN) {
-                                                List<ExternalFile> files = MetadataManager.INSTANCE
-                                                        .getDatasetExternalFiles(mdTxnCtx, dataset);
-                                                // if persumed abort, roll backward
-                                                // 1. delete all pending files
-                                                for (ExternalFile file : files) {
-                                                    if (file.getPendingOp() != ExternalFilePendingOp.NO_OP) {
-                                                        MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
-                                                    }
-                                                }
-                                                // 2. clean artifacts in NCs
-                                                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                                                JobSpecification jobSpec = ExternalIndexingOperations
-                                                        .buildAbortOp(dataset, indexes, metadataProvider);
-                                                executeHyracksJob(jobSpec);
-                                                // 3. correct the dataset state
-                                                ((ExternalDatasetDetails) dataset.getDatasetDetails())
-                                                        .setState(TransactionState.COMMIT);
-                                                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
-                                                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                                                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                                            } else if (datasetState == TransactionState.READY_TO_COMMIT) {
-                                                List<ExternalFile> files = MetadataManager.INSTANCE
-                                                        .getDatasetExternalFiles(mdTxnCtx, dataset);
-                                                // if ready to commit, roll forward
-                                                // 1. commit indexes in NCs
-                                                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                                                JobSpecification jobSpec = ExternalIndexingOperations
-                                                        .buildRecoverOp(dataset, indexes, metadataProvider);
-                                                executeHyracksJob(jobSpec);
-                                                // 2. add pending files in metadata
-                                                for (ExternalFile file : files) {
-                                                    if (file.getPendingOp() == ExternalFilePendingOp.ADD_OP) {
-                                                        MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
-                                                        file.setPendingOp(ExternalFilePendingOp.NO_OP);
-                                                        MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
-                                                    } else if (file.getPendingOp() == ExternalFilePendingOp.DROP_OP) {
-                                                        // find original file
-                                                        for (ExternalFile originalFile : files) {
-                                                            if (originalFile.getFileName()
-                                                                    .equals(file.getFileName())) {
-                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
-                                                                        file);
-                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
-                                                                        originalFile);
-                                                                break;
-                                                            }
+                                try {
+                                    List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
+                                            dataverse.getDataverseName());
+                                    for (Dataset dataset : datasets) {
+                                        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+                                            // External dataset
+                                            // Get indexes
+                                            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
+                                                    dataset.getDataverseName(), dataset.getDatasetName());
+                                            if (!indexes.isEmpty()) {
+                                                // Get the state of the dataset
+                                                ExternalDatasetDetails dsd =
+                                                        (ExternalDatasetDetails) dataset.getDatasetDetails();
+                                                TransactionState datasetState = dsd.getState();
+                                                if (datasetState == TransactionState.BEGIN) {
+                                                    List<ExternalFile> files = MetadataManager.INSTANCE
+                                                            .getDatasetExternalFiles(mdTxnCtx, dataset);
+                                                    // if persumed abort, roll backward
+                                                    // 1. delete all pending files
+                                                    for (ExternalFile file : files) {
+                                                        if (file.getPendingOp() != ExternalFilePendingOp.NO_OP) {
+                                                            MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
                                                         }
-                                                    } else if (file
-                                                            .getPendingOp() == ExternalFilePendingOp.APPEND_OP) {
-                                                        // find original file
-                                                        for (ExternalFile originalFile : files) {
-                                                            if (originalFile.getFileName()
-                                                                    .equals(file.getFileName())) {
-                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
-                                                                        file);
-                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
-                                                                        originalFile);
-                                                                originalFile.setSize(file.getSize());
-                                                                MetadataManager.INSTANCE.addExternalFile(mdTxnCtx,
-                                                                        originalFile);
+                                                    }
+                                                    // 2. clean artifacts in NCs
+                                                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                                                    JobSpecification jobSpec = ExternalIndexingOperations
+                                                            .buildAbortOp(dataset, indexes, metadataProvider);
+                                                    executeHyracksJob(jobSpec);
+                                                    // 3. correct the dataset state
+                                                    ((ExternalDatasetDetails) dataset.getDatasetDetails())
+                                                            .setState(TransactionState.COMMIT);
+                                                    MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
+                                                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                                                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                                                } else if (datasetState == TransactionState.READY_TO_COMMIT) {
+                                                    List<ExternalFile> files = MetadataManager.INSTANCE
+                                                            .getDatasetExternalFiles(mdTxnCtx, dataset);
+                                                    // if ready to commit, roll forward
+                                                    // 1. commit indexes in NCs
+                                                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                                                    JobSpecification jobSpec = ExternalIndexingOperations
+                                                            .buildRecoverOp(dataset, indexes, metadataProvider);
+                                                    executeHyracksJob(jobSpec);
+                                                    // 2. add pending files in metadata
+                                                    for (ExternalFile file : files) {
+                                                        if (file.getPendingOp() == ExternalFilePendingOp.ADD_OP) {
+                                                            MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
+                                                            file.setPendingOp(ExternalFilePendingOp.NO_OP);
+                                                            MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
+                                                        } else if (file
+                                                                .getPendingOp() == ExternalFilePendingOp.DROP_OP) {
+                                                            // find original file
+                                                            for (ExternalFile originalFile : files) {
+                                                                if (originalFile.getFileName()
+                                                                        .equals(file.getFileName())) {
+                                                                    MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                            file);
+                                                                    MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                            originalFile);
+                                                                    break;
+                                                                }
+                                                            }
+                                                        } else if (file
+                                                                .getPendingOp() == ExternalFilePendingOp.APPEND_OP) {
+                                                            // find original file
+                                                            for (ExternalFile originalFile : files) {
+                                                                if (originalFile.getFileName()
+                                                                        .equals(file.getFileName())) {
+                                                                    MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                            file);
+                                                                    MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                            originalFile);
+                                                                    originalFile.setSize(file.getSize());
+                                                                    MetadataManager.INSTANCE.addExternalFile(mdTxnCtx,
+                                                                            originalFile);
+                                                                }
                                                             }
                                                         }
                                                     }
+                                                    // 3. correct the dataset state
+                                                    ((ExternalDatasetDetails) dataset.getDatasetDetails())
+                                                            .setState(TransactionState.COMMIT);
+                                                    MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
+                                                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                                                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                                                 }
-                                                // 3. correct the dataset state
-                                                ((ExternalDatasetDetails) dataset.getDatasetDetails())
-                                                        .setState(TransactionState.COMMIT);
-                                                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
-                                                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                                                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                                             }
                                         }
                                     }
+                                } finally {
+                                    metadataProvider.getLocks().unlock();
                                 }
                             }
                         }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
index 7da3b32..fe08b8c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
@@ -169,12 +169,16 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         // Retrieves file splits of the dataset.
         MetadataProvider metadataProvider = new MetadataProvider(null, new StorageComponentProvider());
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
-        ARecordType recordType =
-                (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
-        // Metadata transaction commits.
-        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        return recordType;
+        try {
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
+            ARecordType recordType = (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(),
+                    dataset.getItemTypeName());
+            // Metadata transaction commits.
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            return recordType;
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
     }
 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 53f5f62..4b2af80 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -162,11 +162,10 @@
     }
 
     public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime> getInsertPipeline(IHyracksTaskContext ctx,
-            Dataset dataset, IAType[] primaryKeyTypes,
-            ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
-            Map<String, String> mergePolicyProperties, int[] filterFields, int[] primaryKeyIndexes,
-            List<Integer> primaryKeyIndicators, StorageComponentProvider storageComponentProvider)
-            throws AlgebricksException, HyracksDataException {
+            Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields,
+            int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
+            StorageComponentProvider storageComponentProvider) throws AlgebricksException, HyracksDataException {
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
                 mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators,
                 storageComponentProvider);
@@ -301,8 +300,13 @@
                 MetadataUtil.PENDING_NO_OP);
         Index index = primaryIndexInfo.getIndex();
         MetadataProvider mdProvider = new MetadataProvider(dataverse, storageComponentProvider);
-        return dataset.getIndexDataflowHelperFactory(mdProvider, index, primaryIndexInfo.recordType,
-                primaryIndexInfo.metaType, primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties);
+        try {
+            return dataset.getIndexDataflowHelperFactory(mdProvider, index, primaryIndexInfo.recordType,
+                    primaryIndexInfo.metaType, primaryIndexInfo.mergePolicyFactory,
+                    primaryIndexInfo.mergePolicyProperties);
+        } finally {
+            mdProvider.getLocks().unlock();
+        }
     }
 
     public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes,
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index f3bebbe..1d8ae5e 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -41,10 +41,8 @@
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 
 /**
  * Represents the AQL statement for subscribing to a feed.
@@ -70,9 +68,9 @@
     public void initialize(MetadataTransactionContext mdTxnCtx) throws MetadataException {
         this.query = new Query(false);
         EntityId sourceFeedId = connectionRequest.getReceivingFeedId();
-        Feed subscriberFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx,
-                connectionRequest.getReceivingFeedId().getDataverse(),
-                connectionRequest.getReceivingFeedId().getEntityName());
+        Feed subscriberFeed =
+                MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionRequest.getReceivingFeedId().getDataverse(),
+                        connectionRequest.getReceivingFeedId().getEntityName());
         if (subscriberFeed == null) {
             throw new IllegalStateException(" Subscriber feed " + subscriberFeed + " not found.");
         }
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/visitor/AqlDeleteRewriteVisitor.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/visitor/AqlDeleteRewriteVisitor.java
index 9268422..57370c5 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/visitor/AqlDeleteRewriteVisitor.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/visitor/AqlDeleteRewriteVisitor.java
@@ -41,17 +41,17 @@
 
     @Override
     public Void visit(DeleteStatement deleteStmt, Void visitArg) {
-        List<Expression> arguments = new ArrayList<Expression>();
+        List<Expression> arguments = new ArrayList<>();
         Identifier dataverseName = deleteStmt.getDataverseName();
         Identifier datasetName = deleteStmt.getDatasetName();
         String arg = dataverseName == null ? datasetName.getValue()
                 : dataverseName.getValue() + "." + datasetName.getValue();
         LiteralExpr argumentLiteral = new LiteralExpr(new StringLiteral(arg));
         arguments.add(argumentLiteral);
-        CallExpr callExpression = new CallExpr(new FunctionSignature(FunctionConstants.ASTERIX_NS, "dataset", 1),
-                arguments);
+        CallExpr callExpression =
+                new CallExpr(new FunctionSignature(FunctionConstants.ASTERIX_NS, "dataset", 1), arguments);
 
-        List<Clause> clauseList = new ArrayList<Clause>();
+        List<Clause> clauseList = new ArrayList<>();
         VariableExpr var = deleteStmt.getVariableExpr();
         Clause forClause = new ForClause(var, callExpression);
         clauseList.add(forClause);
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index ba699f0..b2909c4 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -1015,10 +1015,6 @@
   VariableExpr var = null;
   Expression condition = null;
   Pair<Identifier, Identifier> nameComponents;
-  // This is related to the new metadata lock management
-  setDataverses(new ArrayList<String>());
-  setDatasets(new ArrayList<String>());
-
 }
 {
   <DELETE> var = Variable()
@@ -1029,13 +1025,8 @@
   (<WHERE> condition = Expression())?
     {
       // First we get the dataverses and datasets that we want to lock
-      List<String> dataverses = getDataverses();
-      List<String> datasets = getDatasets();
-      // we remove the pointer to the dataverses and datasets
-      setDataverses(null);
-      setDatasets(null);
       return new DeleteStatement(var, nameComponents.first, nameComponents.second,
-          condition, getVarCounter(), dataverses, datasets);
+          condition, getVarCounter());
     }
 }
 
@@ -1621,9 +1612,6 @@
 Query Query() throws ParseException:
 {
   Query query = new Query(false);
-  // we set the pointers to the dataverses and datasets lists to fill them with entities to be locked
-  setDataverses(query.getDataverses());
-  setDatasets(query.getDatasets());
   Expression expr;
 }
 {
@@ -1631,9 +1619,6 @@
     {
       query.setBody(expr);
       query.setVarCounter(getVarCounter());
-      // we remove the pointers to the locked entities before we return the query object
-      setDataverses(null);
-      setDatasets(null);
       return query;
     }
 }
@@ -2282,12 +2267,6 @@
       LiteralExpr ds = new LiteralExpr();
       ds.setValue( new StringLiteral(name) );
       nameArg = ds;
-      if(arg2 != null){
-          addDataverse(arg1.toString());
-          addDataset(name);
-      } else {
-          addDataset(defaultDataverse + "." + name);
-      }
     }
   | ( <LEFTPAREN> nameArg = Expression() <RIGHTPAREN> ) )
     {
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/parser/ScopeChecker.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/parser/ScopeChecker.java
index 1195d37..771ae16 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/parser/ScopeChecker.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/parser/ScopeChecker.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.lang.common.parser;
 
-import java.util.List;
 import java.util.Stack;
 
 import org.apache.asterix.common.functions.FunctionSignature;
@@ -35,17 +34,14 @@
 
     protected Counter varCounter = new Counter(-1);
 
-    protected Stack<Scope> scopeStack = new Stack<Scope>();
+    protected Stack<Scope> scopeStack = new Stack<>();
 
-    protected Stack<Scope> forbiddenScopeStack = new Stack<Scope>();
+    protected Stack<Scope> forbiddenScopeStack = new Stack<>();
 
     protected String[] inputLines;
 
     protected String defaultDataverse;
 
-    private List<String> dataverses;
-    private List<String> datasets;
-
     public ScopeChecker() {
         scopeStack.push(RootScopeFactory.createRootScope(this));
     }
@@ -323,32 +319,4 @@
         extract.append(inputLines[endLine - 1].substring(0, endColumn - 1));
         return extract.toString().trim();
     }
-
-    public void addDataverse(String dataverseName) {
-        if (dataverses != null) {
-            dataverses.add(dataverseName);
-        }
-    }
-
-    public void addDataset(String datasetName) {
-        if (datasets != null) {
-            datasets.add(datasetName);
-        }
-    }
-
-    public void setDataverses(List<String> dataverses) {
-        this.dataverses = dataverses;
-    }
-
-    public void setDatasets(List<String> datasets) {
-        this.datasets = datasets;
-    }
-
-    public List<String> getDataverses() {
-        return dataverses;
-    }
-
-    public List<String> getDatasets() {
-        return datasets;
-    }
 }
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DeleteStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DeleteStatement.java
index 3bd309a..7bf0db2 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DeleteStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DeleteStatement.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.lang.common.statement;
 
-import java.util.List;
-
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
@@ -35,19 +33,15 @@
     private Identifier datasetName;
     private Expression condition;
     private int varCounter;
-    private List<String> dataverses;
-    private List<String> datasets;
     private Query rewrittenQuery;
 
     public DeleteStatement(VariableExpr vars, Identifier dataverseName, Identifier datasetName, Expression condition,
-            int varCounter, List<String> dataverses, List<String> datasets) {
+            int varCounter) {
         this.vars = vars;
         this.dataverseName = dataverseName;
         this.datasetName = datasetName;
         this.condition = condition;
         this.varCounter = varCounter;
-        this.dataverses = dataverses;
-        this.datasets = datasets;
     }
 
     @Override
@@ -88,18 +82,9 @@
         return visitor.visit(this, arg);
     }
 
-    public List<String> getDataverses() {
-        return dataverses;
-    }
-
-    public List<String> getDatasets() {
-        return datasets;
-    }
-
     @Override
     public int hashCode() {
-        return ObjectUtils.hashCodeMulti(condition, datasetName, datasets, dataverseName, dataverses, rewrittenQuery,
-                vars);
+        return ObjectUtils.hashCodeMulti(condition, datasetName, dataverseName, rewrittenQuery, vars);
     }
 
     @Override
@@ -111,11 +96,11 @@
             return false;
         }
         DeleteStatement target = (DeleteStatement) object;
-        boolean equals = ObjectUtils.equals(condition, target.condition)
-                && ObjectUtils.equals(datasetName, target.datasetName) && ObjectUtils.equals(datasets, target.datasets)
-                && ObjectUtils.equals(dataverseName, target.dataverseName);
-        return equals && ObjectUtils.equals(dataverses, target.dataverses)
-                && ObjectUtils.equals(rewrittenQuery, target.rewrittenQuery) && ObjectUtils.equals(vars, target.vars);
+        boolean equals =
+                ObjectUtils.equals(condition, target.condition) && ObjectUtils.equals(datasetName, target.datasetName)
+                        && ObjectUtils.equals(dataverseName, target.dataverseName);
+        return equals && ObjectUtils.equals(rewrittenQuery, target.rewrittenQuery)
+                && ObjectUtils.equals(vars, target.vars);
     }
 
     @Override
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/Query.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/Query.java
index 2aee087..7274ae3 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/Query.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/Query.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.lang.common.statement;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
@@ -34,21 +33,16 @@
     private boolean topLevel = true;
     private Expression body;
     private int varCounter;
-    private List<String> dataverses = new ArrayList<>();
-    private List<String> datasets = new ArrayList<>();
 
     public Query(boolean explain) {
         this.explain = explain;
     }
 
-    public Query(boolean explain, boolean topLevel, Expression body, int varCounter, List<String> dataverses,
-            List<String> datasets) {
+    public Query(boolean explain, boolean topLevel, Expression body, int varCounter) {
         this.explain = explain;
         this.topLevel = topLevel;
         this.body = body;
         this.varCounter = varCounter;
-        this.dataverses.addAll(dataverses);
-        this.datasets.addAll(datasets);
     }
 
     @Override
@@ -99,25 +93,9 @@
         return visitor.visit(this, arg);
     }
 
-    public void setDataverses(List<String> dataverses) {
-        this.dataverses = dataverses;
-    }
-
-    public void setDatasets(List<String> datasets) {
-        this.datasets = datasets;
-    }
-
-    public List<String> getDataverses() {
-        return dataverses;
-    }
-
-    public List<String> getDatasets() {
-        return datasets;
-    }
-
     @Override
     public int hashCode() {
-        return ObjectUtils.hashCodeMulti(body, datasets, dataverses, topLevel, explain);
+        return ObjectUtils.hashCodeMulti(body, topLevel, explain);
     }
 
     @Override
@@ -129,9 +107,7 @@
             return false;
         }
         Query target = (Query) object;
-        return explain == target.explain && ObjectUtils.equals(body, target.body)
-                && ObjectUtils.equals(datasets, target.datasets) && ObjectUtils.equals(dataverses, target.dataverses)
-                && topLevel == target.topLevel;
+        return explain == target.explain && ObjectUtils.equals(body, target.body) && topLevel == target.topLevel;
     }
 
     @Override
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java
index 9b419f1..58c81a6 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/visitor/CloneAndSubstituteVariablesVisitor.java
@@ -102,7 +102,7 @@
         if (gc.hasGroupFieldList()) {
             for (Pair<Expression, Identifier> varId : gc.getGroupFieldList()) {
                 Expression newExpr = (Expression) varId.first.accept(this, env).first;
-                newGroupFieldList.add(new Pair<Expression, Identifier>(newExpr, varId.second));
+                newGroupFieldList.add(new Pair<>(newExpr, varId.second));
             }
         }
         GroupbyClause newGroup = new GroupbyClause(newGbyList, newDecorList, newWithMap, newGroupVar, newGroupFieldList,
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupBySugarVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupBySugarVisitor.java
index a1c17fc..6346994 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupBySugarVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/rewrites/visitor/SqlppGroupBySugarVisitor.java
@@ -123,15 +123,14 @@
             // Reference to a field in the group variable.
             if (fieldVars.contains(usedVar)) {
                 // Rewrites to a reference to a field in the group variable.
-                varExprMap.put(usedVar,
-                                new FieldAccessor(fromBindingVar, SqlppVariableUtil.toUserDefinedVariableName(usedVar
-                                        .getVar())));
+                varExprMap.put(usedVar, new FieldAccessor(fromBindingVar,
+                        SqlppVariableUtil.toUserDefinedVariableName(usedVar.getVar())));
             }
         }
 
         // Select clause.
-        SelectElement selectElement = new SelectElement(
-                SqlppRewriteUtil.substituteExpression(expr, varExprMap, context));
+        SelectElement selectElement =
+                new SelectElement(SqlppRewriteUtil.substituteExpression(expr, varExprMap, context));
         SelectClause selectClause = new SelectClause(selectElement, null, false);
 
         // Construct the select expression.
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppRewriteUtil.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppRewriteUtil.java
index a066110..a8c5ddc 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppRewriteUtil.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppRewriteUtil.java
@@ -42,8 +42,8 @@
 
     // Applying sugar rewriting for group-by.
     public static Expression rewriteExpressionUsingGroupVariable(VariableExpr groupVar,
-            Collection<VariableExpr> fieldVars, ILangExpression expr,
-            LangRewritingContext context) throws CompilationException {
+            Collection<VariableExpr> fieldVars, ILangExpression expr, LangRewritingContext context)
+            throws CompilationException {
         SqlppGroupBySugarVisitor visitor = new SqlppGroupBySugarVisitor(context, groupVar, fieldVars);
         return expr.accept(visitor, null);
     }
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java
index 7c2a1747..37e4e26 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/DeepCopyVisitor.java
@@ -132,8 +132,7 @@
     @Override
     public Projection visit(Projection projection, Void arg) throws CompilationException {
         return new Projection(projection.star() ? null : (Expression) projection.getExpression().accept(this, arg),
-                projection.getName(),
-                projection.star(), projection.exprStar());
+                projection.getName(), projection.star(), projection.exprStar());
     }
 
     @Override
@@ -233,8 +232,7 @@
 
     @Override
     public Query visit(Query q, Void arg) throws CompilationException {
-        return new Query(q.isExplain(), q.isTopLevel(), (Expression) q.getBody().accept(this, arg), q.getVarCounter(),
-                q.getDataverses(), q.getDatasets());
+        return new Query(q.isExplain(), q.isTopLevel(), (Expression) q.getBody().accept(this, arg), q.getVarCounter());
     }
 
     @Override
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java
index 9579741..8904241 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppCloneAndSubstituteVariablesVisitor.java
@@ -86,8 +86,8 @@
             VariableSubstitutionEnvironment env) throws CompilationException {
         VariableExpr leftVar = fromTerm.getLeftVariable();
         VariableExpr newLeftVar = generateNewVariable(context, leftVar);
-        VariableExpr newLeftPosVar = fromTerm.hasPositionalVariable() ? generateNewVariable(context,
-                fromTerm.getPositionalVariable()) : null;
+        VariableExpr newLeftPosVar = fromTerm.hasPositionalVariable()
+                ? generateNewVariable(context, fromTerm.getPositionalVariable()) : null;
         Expression newLeftExpr = (Expression) visitUnnesBindingExpression(fromTerm.getLeftExpression(), env).first;
         List<AbstractBinaryCorrelateClause> newCorrelateClauses = new ArrayList<>();
 
@@ -123,8 +123,8 @@
             VariableSubstitutionEnvironment env) throws CompilationException {
         VariableExpr rightVar = joinClause.getRightVariable();
         VariableExpr newRightVar = generateNewVariable(context, rightVar);
-        VariableExpr newRightPosVar = joinClause.hasPositionalVariable() ? generateNewVariable(context,
-                joinClause.getPositionalVariable()) : null;
+        VariableExpr newRightPosVar = joinClause.hasPositionalVariable()
+                ? generateNewVariable(context, joinClause.getPositionalVariable()) : null;
 
         // Visits the right expression.
         Expression newRightExpr = (Expression) visitUnnesBindingExpression(joinClause.getRightExpression(), env).first;
@@ -138,8 +138,8 @@
         // The condition can refer to the newRightVar and newRightPosVar.
         Expression conditionExpr = (Expression) joinClause.getConditionExpression().accept(this, currentEnv).first;
 
-        JoinClause newJoinClause = new JoinClause(joinClause.getJoinType(), newRightExpr, newRightVar, newRightPosVar,
-                conditionExpr);
+        JoinClause newJoinClause =
+                new JoinClause(joinClause.getJoinType(), newRightExpr, newRightVar, newRightPosVar, conditionExpr);
         return new Pair<>(newJoinClause, currentEnv);
     }
 
@@ -148,8 +148,8 @@
             VariableSubstitutionEnvironment env) throws CompilationException {
         VariableExpr rightVar = nestClause.getRightVariable();
         VariableExpr newRightVar = generateNewVariable(context, rightVar);
-        VariableExpr newRightPosVar = nestClause.hasPositionalVariable() ? generateNewVariable(context,
-                nestClause.getPositionalVariable()) : null;
+        VariableExpr newRightPosVar = nestClause.hasPositionalVariable()
+                ? generateNewVariable(context, nestClause.getPositionalVariable()) : null;
 
         // Visits the right expression.
         Expression rightExpr = (Expression) nestClause.getRightExpression().accept(this, env).first;
@@ -163,8 +163,8 @@
         // The condition can refer to the newRightVar and newRightPosVar.
         Expression conditionExpr = (Expression) nestClause.getConditionExpression().accept(this, currentEnv).first;
 
-        NestClause newJoinClause = new NestClause(nestClause.getJoinType(), rightExpr, newRightVar, newRightPosVar,
-                conditionExpr);
+        NestClause newJoinClause =
+                new NestClause(nestClause.getJoinType(), rightExpr, newRightVar, newRightPosVar, conditionExpr);
         return new Pair<>(newJoinClause, currentEnv);
     }
 
@@ -173,8 +173,8 @@
             VariableSubstitutionEnvironment env) throws CompilationException {
         VariableExpr rightVar = unnestClause.getRightVariable();
         VariableExpr newRightVar = generateNewVariable(context, rightVar);
-        VariableExpr newRightPosVar = unnestClause.hasPositionalVariable() ? generateNewVariable(context,
-                unnestClause.getPositionalVariable()) : null;
+        VariableExpr newRightPosVar = unnestClause.hasPositionalVariable()
+                ? generateNewVariable(context, unnestClause.getPositionalVariable()) : null;
 
         // Visits the right expression.
         Expression rightExpr = (Expression) visitUnnesBindingExpression(unnestClause.getRightExpression(), env).first;
@@ -186,8 +186,8 @@
             currentEnv.removeSubstitution(newRightPosVar);
         }
         // The condition can refer to the newRightVar and newRightPosVar.
-        UnnestClause newJoinClause = new UnnestClause(unnestClause.getJoinType(), rightExpr, newRightVar,
-                newRightPosVar);
+        UnnestClause newJoinClause =
+                new UnnestClause(unnestClause.getJoinType(), rightExpr, newRightVar, newRightPosVar);
         return new Pair<>(newJoinClause, currentEnv);
     }
 
@@ -265,13 +265,13 @@
             VariableSubstitutionEnvironment env) throws CompilationException {
         boolean distinct = selectClause.distinct();
         if (selectClause.selectElement()) {
-            Pair<ILangExpression, VariableSubstitutionEnvironment> newSelectElement = selectClause.getSelectElement()
-                    .accept(this, env);
+            Pair<ILangExpression, VariableSubstitutionEnvironment> newSelectElement =
+                    selectClause.getSelectElement().accept(this, env);
             return new Pair<>(new SelectClause((SelectElement) newSelectElement.first, null, distinct),
                     newSelectElement.second);
         } else {
-            Pair<ILangExpression, VariableSubstitutionEnvironment> newSelectRegular = selectClause.getSelectRegular()
-                    .accept(this, env);
+            Pair<ILangExpression, VariableSubstitutionEnvironment> newSelectRegular =
+                    selectClause.getSelectRegular().accept(this, env);
             return new Pair<>(new SelectClause(null, (SelectRegular) newSelectRegular.first, distinct),
                     newSelectRegular.second);
         }
@@ -280,8 +280,8 @@
     @Override
     public Pair<ILangExpression, VariableSubstitutionEnvironment> visit(SelectElement selectElement,
             VariableSubstitutionEnvironment env) throws CompilationException {
-        Pair<ILangExpression, VariableSubstitutionEnvironment> newExpr = selectElement.getExpression()
-                .accept(this, env);
+        Pair<ILangExpression, VariableSubstitutionEnvironment> newExpr =
+                selectElement.getExpression().accept(this, env);
         return new Pair<>(new SelectElement((Expression) newExpr.first), newExpr.second);
     }
 
@@ -318,12 +318,12 @@
                 SetOperationInput newRightInput;
                 SetOperationInput rightInput = right.getSetOperationRightInput();
                 if (rightInput.selectBlock()) {
-                    Pair<ILangExpression, VariableSubstitutionEnvironment> rightResult = rightInput.getSelectBlock()
-                            .accept(this, env);
+                    Pair<ILangExpression, VariableSubstitutionEnvironment> rightResult =
+                            rightInput.getSelectBlock().accept(this, env);
                     newRightInput = new SetOperationInput((SelectBlock) rightResult.first, null);
                 } else {
-                    Pair<ILangExpression, VariableSubstitutionEnvironment> rightResult = rightInput.getSubquery()
-                            .accept(this, env);
+                    Pair<ILangExpression, VariableSubstitutionEnvironment> rightResult =
+                            rightInput.getSubquery().accept(this, env);
                     newRightInput = new SetOperationInput(null, (SelectExpression) rightResult.first);
                 }
                 newRightInputs.add(new SetOperationRight(right.getSetOpType(), right.isSetSemantics(), newRightInput));
@@ -392,10 +392,10 @@
     public Pair<ILangExpression, VariableSubstitutionEnvironment> visit(CaseExpression caseExpr,
             VariableSubstitutionEnvironment env) throws CompilationException {
         Expression conditionExpr = (Expression) caseExpr.getConditionExpr().accept(this, env).first;
-        List<Expression> whenExprList = VariableCloneAndSubstitutionUtil.visitAndCloneExprList(caseExpr.getWhenExprs(),
-                env, this);
-        List<Expression> thenExprList = VariableCloneAndSubstitutionUtil.visitAndCloneExprList(caseExpr.getThenExprs(),
-                env, this);
+        List<Expression> whenExprList =
+                VariableCloneAndSubstitutionUtil.visitAndCloneExprList(caseExpr.getWhenExprs(), env, this);
+        List<Expression> thenExprList =
+                VariableCloneAndSubstitutionUtil.visitAndCloneExprList(caseExpr.getThenExprs(), env, this);
         Expression elseExpr = (Expression) caseExpr.getElseExpr().accept(this, env).first;
         CaseExpression newCaseExpr = new CaseExpression(conditionExpr, whenExprList, thenExprList, elseExpr);
         return new Pair<>(newCaseExpr, env);
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppDeleteRewriteVisitor.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppDeleteRewriteVisitor.java
index 503437c..680ce55 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppDeleteRewriteVisitor.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/SqlppDeleteRewriteVisitor.java
@@ -58,8 +58,8 @@
                 : dataverseName.getValue() + "." + datasetName.getValue();
         LiteralExpr argumentLiteral = new LiteralExpr(new StringLiteral(arg));
         arguments.add(argumentLiteral);
-        CallExpr callExpression = new CallExpr(new FunctionSignature(FunctionConstants.ASTERIX_NS, "dataset", 1),
-                arguments);
+        CallExpr callExpression =
+                new CallExpr(new FunctionSignature(FunctionConstants.ASTERIX_NS, "dataset", 1), arguments);
 
         // From clause.
         VariableExpr var = deleteStmt.getVariableExpr();
@@ -84,7 +84,7 @@
         SelectBlock selectBlock = new SelectBlock(selectClause, fromClause, null, whereClause, null, null, null);
         SelectSetOperation selectSetOperation = new SelectSetOperation(new SetOperationInput(selectBlock, null), null);
         SelectExpression selectExpression = new SelectExpression(null, selectSetOperation, null, null, false);
-        Query query = new Query(false, false, selectExpression, 0, new ArrayList<>(), new ArrayList<>());
+        Query query = new Query(false, false, selectExpression, 0);
         query.setBody(selectExpression);
 
         // return the delete statement.
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index e553c4d..e6a2951 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -1029,10 +1029,6 @@
   VariableExpr varExpr = null;
   Expression condition = null;
   Pair<Identifier, Identifier> nameComponents;
-  // This is related to the new metadata lock management
-  setDataverses(new ArrayList<String>());
-  setDatasets(new ArrayList<String>());
-
 }
 {
   <DELETE>
@@ -1040,20 +1036,13 @@
          ((<AS>)? varExpr = Variable())?
   (<WHERE> condition = Expression())?
   {
-      // First we get the dataverses and datasets that we want to lock
-      List<String> dataverses = getDataverses();
-      List<String> datasets = getDatasets();
-      // we remove the pointer to the dataverses and datasets
-      setDataverses(null);
-      setDatasets(null);
-
       if(varExpr == null){
         varExpr = new VariableExpr();
         VarIdentifier var = SqlppVariableUtil.toInternalVariableIdentifier(nameComponents.second.getValue());
         varExpr.setVar(var);
       }
       return new DeleteStatement(varExpr, nameComponents.first, nameComponents.second,
-          condition, getVarCounter(), dataverses, datasets);
+          condition, getVarCounter());
   }
 }
 
@@ -1738,9 +1727,6 @@
 Query Query(boolean explain) throws ParseException:
 {
   Query query = new Query(explain);
-  // we set the pointers to the dataverses and datasets lists to fill them with entities to be locked
-  setDataverses(query.getDataverses());
-  setDatasets(query.getDatasets());
   Expression expr;
 }
 {
@@ -1751,9 +1737,6 @@
   )
   {
     query.setBody(expr);
-    // we remove the pointers to the locked entities before we return the query object
-    setDataverses(null);
-    setDatasets(null);
     return query;
   }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 5d44d0b..b2c2e81 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -400,7 +400,7 @@
             ARecordType aRecType = (ARecordType) datatype.getDatatype();
             return new Datatype(
                     datatype.getDataverseName(), datatype.getDatatypeName(), new ARecordType(aRecType.getTypeName(),
-                    aRecType.getFieldNames(), aRecType.getFieldTypes(), aRecType.isOpen()),
+                            aRecType.getFieldNames(), aRecType.getFieldTypes(), aRecType.isOpen()),
                     datatype.getIsAnonymous());
         }
         try {
@@ -917,7 +917,7 @@
 
     @Override
     public ExternalFile getExternalFile(MetadataTransactionContext ctx, String dataverseName, String datasetName,
-                                        Integer fileNumber) throws MetadataException {
+            Integer fileNumber) throws MetadataException {
         ExternalFile file;
         try {
             file = metadataNode.getExternalFile(ctx.getJobId(), dataverseName, datasetName, fileNumber);
@@ -985,8 +985,7 @@
 
     @Override
     public <T extends IExtensionMetadataEntity> List<T> getEntities(MetadataTransactionContext mdTxnCtx,
-                                                                    IExtensionMetadataSearchKey searchKey)
-            throws MetadataException {
+            IExtensionMetadataSearchKey searchKey) throws MetadataException {
         try {
             return metadataNode.getEntities(mdTxnCtx.getJobId(), searchKey);
         } catch (RemoteException e) {
@@ -1021,14 +1020,14 @@
                 return;
             }
             try {
-                metadataNode = proxy.waitForMetadataNode(metadataProperties.getRegistrationTimeoutSecs(),
-                        TimeUnit.SECONDS);
+                metadataNode =
+                        proxy.waitForMetadataNode(metadataProperties.getRegistrationTimeoutSecs(), TimeUnit.SECONDS);
                 if (metadataNode != null) {
                     rebindMetadataNode = false;
                 } else {
                     throw new HyracksDataException("The MetadataNode failed to bind before the configured timeout ("
-                            + metadataProperties.getRegistrationTimeoutSecs() + " seconds); the MetadataNode was " +
-                            "configured to run on NC: " + metadataProperties.getMetadataNodeName());
+                            + metadataProperties.getRegistrationTimeoutSecs() + " seconds); the MetadataNode was "
+                            + "configured to run on NC: " + metadataProperties.getMetadataNodeName());
                 }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index bd1c7d1..8ce53d0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -695,11 +695,10 @@
     /**
      * Feed Connection Related Metadata operations
      */
-    void addFeedConnection(MetadataTransactionContext ctx, FeedConnection feedConnection)
-            throws MetadataException;
+    void addFeedConnection(MetadataTransactionContext ctx, FeedConnection feedConnection) throws MetadataException;
 
-    void dropFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName,
-            String datasetName) throws MetadataException;
+    void dropFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName, String datasetName)
+            throws MetadataException;
 
     FeedConnection getFeedConnection(MetadataTransactionContext ctx, String dataverseName, String feedName,
             String datasetName) throws MetadataException;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java
index 0560bd0..f3913e1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeDataflowHelperFactoryProvider.java
@@ -28,7 +28,7 @@
 import org.apache.asterix.metadata.api.IIndexDataflowHelperFactoryProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
+import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry;
 import org.apache.asterix.metadata.utils.IndexUtil;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f8f4a0e..aa76122 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -71,6 +71,8 @@
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
+import org.apache.asterix.metadata.lock.LockList;
+import org.apache.asterix.metadata.lock.MetadataLockManager;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
@@ -150,6 +152,7 @@
     private final StorageProperties storageProperties;
     private final ILibraryManager libraryManager;
     private final Dataverse defaultDataverse;
+    private final LockList locks;
 
     private MetadataTransactionContext mdTxnCtx;
     private boolean isWriteTransaction;
@@ -160,7 +163,7 @@
     private ResultSetId resultSetId;
     private IResultSerializerFactoryProvider resultSerializerFactoryProvider;
     private JobId jobId;
-    private Map<String, Integer> locks;
+    private Map<String, Integer> externalDataLocks;
     private boolean isTemporaryDatasetWriteJob = true;
     private boolean blockingOperatorDisabled = false;
 
@@ -171,6 +174,7 @@
         libraryManager = AppContextInfo.INSTANCE.getLibraryManager();
         metadataPageManagerFactory = componentProvider.getMetadataPageManagerFactory();
         primitiveValueProviderFactory = componentProvider.getPrimitiveValueProviderFactory();
+        locks = new LockList();
     }
 
     public String getPropertyValue(String propertyName) {
@@ -280,12 +284,12 @@
         return storageProperties;
     }
 
-    public Map<String, Integer> getLocks() {
-        return locks;
+    public Map<String, Integer> getExternalDataLocks() {
+        return externalDataLocks;
     }
 
-    public void setLocks(Map<String, Integer> locks) {
-        this.locks = locks;
+    public void setExternalDataLocks(Map<String, Integer> locks) {
+        this.externalDataLocks = locks;
     }
 
     /**
@@ -302,6 +306,9 @@
         if (dv == null) {
             return null;
         }
+        String fqName = dv + '.' + dataset;
+        MetadataLockManager.INSTANCE.acquireDataverseReadLock(locks, dv);
+        MetadataLockManager.INSTANCE.acquireDatasetReadLock(locks, fqName);
         return MetadataManagerUtil.findDataset(mdTxnCtx, dv, dataset);
     }
 
@@ -917,7 +924,8 @@
 
     public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
             String targetIdxName, boolean temp) throws AlgebricksException {
-        return SplitsAndConstraintsUtil.getDatasetSplits(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
+        return SplitsAndConstraintsUtil.getDatasetSplits(findDataset(dataverseName, datasetName), mdTxnCtx,
+                targetIdxName, temp);
     }
 
     public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName)
@@ -939,8 +947,8 @@
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
             String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
-        return SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints(mdTxnCtx, dataverseName, datasetName,
-                targetIdxName, create);
+        return SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints(
+                findDataset(dataverseName, datasetName), mdTxnCtx, targetIdxName, create);
     }
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime(
@@ -1827,16 +1835,14 @@
                         splitsAndConstraint.first, appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits,
                         tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
                         fieldPermutation, indexDataFlowFactory, filterFactory, modificationCallbackFactory,
-                        searchCallbackFactory, indexName,
-                        prevFieldPermutation, metadataPageManagerFactory);
+                        searchCallbackFactory, indexName, prevFieldPermutation, metadataPageManagerFactory);
             } else {
                 op = new LSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc,
                         appContext.getStorageManager(), splitsAndConstraint.first,
                         appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
                         invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
                         indexDataFlowFactory, filterFactory, modificationCallbackFactory, searchCallbackFactory,
-                        indexName,
-                        metadataPageManagerFactory);
+                        indexName, metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
         } catch (Exception e) {
@@ -2064,4 +2070,8 @@
                 ds.getDatasetDetails().isTemp());
         return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
     }
+
+    public LockList getLocks() {
+        return locks;
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index e0607a6..572130d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -50,8 +50,8 @@
 import org.apache.asterix.metadata.api.IMetadataEntity;
 import org.apache.asterix.metadata.declared.BTreeDataflowHelperFactoryProvider;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry;
 import org.apache.asterix.metadata.utils.DatasetUtil;
-import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
 import org.apache.asterix.metadata.utils.ExternalIndexingOperations;
 import org.apache.asterix.metadata.utils.IndexUtil;
 import org.apache.asterix.metadata.utils.InvertedIndexDataflowHelperFactoryProvider;
@@ -283,9 +283,7 @@
             // prepare job spec(s) that would disconnect any active feeds involving the dataset.
             IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             for (IActiveEntityEventsListener listener : activeListeners) {
-                IDataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
-                        dataverseName, datasetName);
-                if (listener.isEntityUsingDataset(ds)) {
+                if (listener.isEntityUsingDataset(this)) {
                     throw new CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET,
                             RecordUtil.toFullyQualifiedName(dataverseName, datasetName),
                             listener.getEntityId().toString());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 45e358f..584aead 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -39,6 +39,7 @@
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.metadata.entities.Datatype;
@@ -47,7 +48,6 @@
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -59,17 +59,16 @@
  */
 public class FeedMetadataUtil {
 
-    public static Dataset validateIfDatasetExists(String dataverse, String datasetName, MetadataTransactionContext ctx)
-            throws CompilationException {
-        Dataset dataset = MetadataManager.INSTANCE.getDataset(ctx, dataverse, datasetName);
+    public static Dataset validateIfDatasetExists(MetadataProvider metadataProvider, String dataverse,
+            String datasetName, MetadataTransactionContext ctx) throws AlgebricksException {
+        Dataset dataset = metadataProvider.findDataset(dataverse, datasetName);
         if (dataset == null) {
             throw new CompilationException("Unknown target dataset :" + datasetName);
         }
 
         if (!dataset.getDatasetType().equals(DatasetType.INTERNAL)) {
             throw new CompilationException("Statement not applicable. Dataset " + datasetName
-                    + " is not of required type "
-                    + DatasetType.INTERNAL);
+                    + " is not of required type " + DatasetType.INTERNAL);
         }
         return dataset;
     }
@@ -87,8 +86,8 @@
             MetadataTransactionContext ctx) throws CompilationException {
         FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, dataverse, policyName);
         if (feedPolicy == null) {
-            feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, MetadataConstants.METADATA_DATAVERSE_NAME,
-                    policyName);
+            feedPolicy =
+                    MetadataManager.INSTANCE.getFeedPolicy(ctx, MetadataConstants.METADATA_DATAVERSE_NAME, policyName);
             if (feedPolicy == null) {
                 throw new CompilationException("Unknown feed policy" + policyName);
             }
@@ -155,7 +154,7 @@
                 }
             }
         } catch (Exception e) {
-            throw new AsterixException("Invalid feed parameters. Exception Message:" + e.getMessage() , e);
+            throw new AsterixException("Invalid feed parameters. Exception Message:" + e.getMessage(), e);
         }
     }
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
new file mode 100644
index 0000000..ffcff78
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.lock;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.asterix.metadata.lock.IMetadataLock.Mode;
+import org.apache.asterix.om.base.AMutableInt32;
+
+public class DatasetLock implements IMetadataLock {
+
+    private ReentrantReadWriteLock dsLock;
+    private ReentrantReadWriteLock dsModifyLock;
+    private AMutableInt32 indexBuildCounter;
+
+    public DatasetLock() {
+        dsLock = new ReentrantReadWriteLock(true);
+        dsModifyLock = new ReentrantReadWriteLock(true);
+        indexBuildCounter = new AMutableInt32(0);
+    }
+
+    private void acquireReadLock() {
+        // query
+        // build index
+        // insert
+        dsLock.readLock().lock();
+    }
+
+    private void releaseReadLock() {
+        // query
+        // build index
+        // insert
+        dsLock.readLock().unlock();
+    }
+
+    private void acquireWriteLock() {
+        // create ds
+        // delete ds
+        // drop index
+        dsLock.writeLock().lock();
+    }
+
+    private void releaseWriteLock() {
+        // create ds
+        // delete ds
+        // drop index
+        dsLock.writeLock().unlock();
+    }
+
+    private void acquireReadModifyLock() {
+        // insert
+        dsModifyLock.readLock().lock();
+    }
+
+    private void releaseReadModifyLock() {
+        // insert
+        dsModifyLock.readLock().unlock();
+    }
+
+    private void acquireWriteModifyLock() {
+        // Build index statement
+        synchronized (indexBuildCounter) {
+            if (indexBuildCounter.getIntegerValue() > 0) {
+                indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() + 1);
+            } else {
+                dsModifyLock.writeLock().lock();
+                indexBuildCounter.setValue(1);
+            }
+        }
+    }
+
+    private void releaseWriteModifyLock() {
+        // Build index statement
+        synchronized (indexBuildCounter) {
+            if (indexBuildCounter.getIntegerValue() == 1) {
+                dsModifyLock.writeLock().unlock();
+            }
+            indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() - 1);
+        }
+    }
+
+    private void acquireRefreshLock() {
+        // Refresh External Dataset statement
+        dsModifyLock.writeLock().lock();
+    }
+
+    private void releaseRefreshLock() {
+        // Refresh External Dataset statement
+        dsModifyLock.writeLock().unlock();
+    }
+
+    @Override
+    public void acquire(IMetadataLock.Mode mode) {
+        switch (mode) {
+            case INDEX_BUILD:
+                acquireReadLock();
+                acquireWriteModifyLock();
+                break;
+            case MODIFY:
+                acquireReadLock();
+                acquireReadModifyLock();
+                break;
+            case REFRESH:
+                acquireReadLock();
+                acquireRefreshLock();
+                break;
+            case INDEX_DROP:
+            case WRITE:
+                acquireWriteLock();
+                break;
+            default:
+                acquireReadLock();
+                break;
+        }
+    }
+
+    @Override
+    public void release(IMetadataLock.Mode mode) {
+        switch (mode) {
+            case INDEX_BUILD:
+                releaseWriteModifyLock();
+                releaseReadLock();
+                break;
+            case MODIFY:
+                releaseReadModifyLock();
+                releaseReadLock();
+                break;
+            case REFRESH:
+                releaseRefreshLock();
+                releaseReadLock();
+                break;
+            case INDEX_DROP:
+            case WRITE:
+                releaseWriteLock();
+                break;
+            default:
+                releaseReadLock();
+                break;
+        }
+    }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java
similarity index 94%
rename from asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
rename to asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java
index 2e35ed4..4d8bacf 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalDatasetsRegistry.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/ExternalDatasetsRegistry.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.metadata.utils;
+package org.apache.asterix.metadata.lock;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -26,6 +26,7 @@
 
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.ExternalDatasetAccessManager;
 
 /**
  * This is a singelton class used to maintain the version of each external dataset with indexes
@@ -62,10 +63,10 @@
         Map<String, Integer> locks;
         String lockKey = dataset.getDataverseName() + "." + dataset.getDatasetName();
         // check first if the lock was aquired already
-        locks = metadataProvider.getLocks();
+        locks = metadataProvider.getExternalDataLocks();
         if (locks == null) {
             locks = new HashMap<>();
-            metadataProvider.setLocks(locks);
+            metadataProvider.setExternalDataLocks(locks);
         } else {
             // if dataset was accessed already by this job, return the registered version
             Integer version = locks.get(lockKey);
@@ -123,7 +124,7 @@
     }
 
     public void releaseAcquiredLocks(MetadataProvider metadataProvider) {
-        Map<String, Integer> locks = metadataProvider.getLocks();
+        Map<String, Integer> locks = metadataProvider.getExternalDataLocks();
         if (locks == null) {
             return;
         } else {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/IMetadataLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/IMetadataLock.java
new file mode 100644
index 0000000..8af29ba
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/IMetadataLock.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.lock;
+
+/**
+ * A Metadata lock local to compilation node
+ */
+public interface IMetadataLock {
+
+    enum Mode {
+        READ,
+        MODIFY,
+        REFRESH,
+        INDEX_BUILD,
+        INDEX_DROP,
+        WRITE
+    }
+
+    /**
+     * Acquire a lock
+     *
+     * @param mode
+     *            lock mode
+     */
+    void acquire(IMetadataLock.Mode mode);
+
+    /**
+     * Release a lock
+     *
+     * @param mode
+     *            lock mode
+     */
+    void release(IMetadataLock.Mode mode);
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java
new file mode 100644
index 0000000..6e6f086
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.lock;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.metadata.lock.IMetadataLock.Mode;
+import org.apache.commons.lang3.tuple.Pair;
+
+public class LockList {
+    List<Pair<IMetadataLock.Mode, IMetadataLock>> locks = new ArrayList<>();
+
+    public void add(IMetadataLock.Mode mode, IMetadataLock lock) {
+        lock.acquire(mode);
+        locks.add(Pair.of(mode, lock));
+    }
+
+    public void unlock() {
+        for (int i = locks.size() - 1; i >= 0; i--) {
+            Pair<IMetadataLock.Mode, IMetadataLock> pair = locks.get(i);
+            pair.getRight().release(pair.getLeft());
+        }
+    }
+
+    public void reset() {
+        locks.clear();
+    }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java
new file mode 100644
index 0000000..c8fd64f
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLock.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.lock;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.asterix.metadata.lock.IMetadataLock.Mode;
+
+public class MetadataLock implements IMetadataLock {
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    @Override
+    public void acquire(IMetadataLock.Mode mode) {
+        switch (mode) {
+            case WRITE:
+                lock.writeLock().lock();
+                break;
+            default:
+                lock.readLock().lock();
+                break;
+        }
+    }
+
+    @Override
+    public void release(IMetadataLock.Mode mode) {
+        switch (mode) {
+            case WRITE:
+                lock.writeLock().unlock();
+                break;
+            default:
+                lock.readLock().unlock();
+                break;
+        }
+    }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
new file mode 100644
index 0000000..5ae3aa4
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.lock;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+
+import org.apache.asterix.metadata.entities.FeedConnection;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+
+public class MetadataLockManager {
+
+    public static final MetadataLockManager INSTANCE = new MetadataLockManager();
+    private static final Function<String, MetadataLock> LOCK_FUNCTION = key -> new MetadataLock();
+    private static final Function<String, DatasetLock> DATASET_LOCK_FUNCTION = key -> new DatasetLock();
+
+    private final ConcurrentHashMap<String, MetadataLock> dataversesLocks;
+    private final ConcurrentHashMap<String, DatasetLock> datasetsLocks;
+    private final ConcurrentHashMap<String, MetadataLock> functionsLocks;
+    private final ConcurrentHashMap<String, MetadataLock> nodeGroupsLocks;
+    private final ConcurrentHashMap<String, MetadataLock> feedsLocks;
+    private final ConcurrentHashMap<String, MetadataLock> feedPolicyLocks;
+    private final ConcurrentHashMap<String, MetadataLock> compactionPolicyLocks;
+    private final ConcurrentHashMap<String, MetadataLock> dataTypeLocks;
+    private final ConcurrentHashMap<String, MetadataLock> extensionLocks;
+
+    private MetadataLockManager() {
+        dataversesLocks = new ConcurrentHashMap<>();
+        datasetsLocks = new ConcurrentHashMap<>();
+        functionsLocks = new ConcurrentHashMap<>();
+        nodeGroupsLocks = new ConcurrentHashMap<>();
+        feedsLocks = new ConcurrentHashMap<>();
+        feedPolicyLocks = new ConcurrentHashMap<>();
+        compactionPolicyLocks = new ConcurrentHashMap<>();
+        dataTypeLocks = new ConcurrentHashMap<>();
+        extensionLocks = new ConcurrentHashMap<>();
+    }
+
+    // Dataverse
+    public void acquireDataverseReadLock(LockList locks, String dataverseName) {
+        MetadataLock lock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.READ, lock);
+    }
+
+    public void acquireDataverseWriteLock(LockList locks, String dataverseName) {
+        MetadataLock lock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.WRITE, lock);
+    }
+
+    // Dataset
+    public void acquireDatasetReadLock(LockList locks, String datasetName) {
+        DatasetLock lock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.READ, lock);
+    }
+
+    public void acquireDatasetWriteLock(LockList locks, String datasetName) {
+        DatasetLock lock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.WRITE, lock);
+    }
+
+    public void acquireDatasetModifyLock(LockList locks, String datasetName) {
+        DatasetLock lock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.MODIFY, lock);
+    }
+
+    public void acquireDatasetCreateIndexLock(LockList locks, String datasetName) {
+        DatasetLock lock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.INDEX_BUILD, lock);
+    }
+
+    public void acquireExternalDatasetRefreshLock(LockList locks, String datasetName) {
+        DatasetLock lock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.INDEX_BUILD, lock);
+    }
+
+    // Function
+    public void acquireFunctionReadLock(LockList locks, String dataverseName) {
+        MetadataLock lock = functionsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.READ, lock);
+    }
+
+    public void acquireFunctionWriteLock(LockList locks, String dataverseName) {
+        MetadataLock lock = functionsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.WRITE, lock);
+    }
+
+    // Node Group
+    public void acquireNodeGroupReadLock(LockList locks, String dataverseName) {
+        MetadataLock lock = nodeGroupsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.READ, lock);
+    }
+
+    public void acquireNodeGroupWriteLock(LockList locks, String dataverseName) {
+        MetadataLock lock = nodeGroupsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.WRITE, lock);
+    }
+
+    // Feeds
+    public void acquireFeedReadLock(LockList locks, String dataverseName) {
+        MetadataLock lock = feedsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.READ, lock);
+    }
+
+    public void acquireFeedWriteLock(LockList locks, String dataverseName) {
+        MetadataLock lock = feedsLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.WRITE, lock);
+    }
+
+    // Feed Policies
+    public void acquireFeedPolicyReadLock(LockList locks, String dataverseName) {
+        MetadataLock lock = feedPolicyLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.READ, lock);
+    }
+
+    public void acquireFeedPolicyWriteLock(LockList locks, String dataverseName) {
+        MetadataLock lock = feedPolicyLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.WRITE, lock);
+    }
+
+    // CompactionPolicy
+    public void acquireCompactionPolicyReadLock(LockList locks, String dataverseName) {
+        MetadataLock lock = compactionPolicyLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.READ, lock);
+    }
+
+    public void acquireCompactionPolicyWriteLock(LockList locks, String dataverseName) {
+        MetadataLock lock = compactionPolicyLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.WRITE, lock);
+    }
+
+    // DataType
+    public void acquireDataTypeReadLock(LockList locks, String dataverseName) {
+        MetadataLock lock = dataTypeLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.READ, lock);
+    }
+
+    public void acquireDataTypeWriteLock(LockList locks, String dataverseName) {
+        MetadataLock lock = dataTypeLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.WRITE, lock);
+    }
+
+    // Extensions
+    public void acquireExtensionReadLock(LockList locks, String dataverseName) {
+        MetadataLock lock = extensionLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.READ, lock);
+    }
+
+    public void acquireExtensionWriteLock(LockList locks, String dataverseName) {
+        MetadataLock lock = extensionLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
+        locks.add(IMetadataLock.Mode.WRITE, lock);
+    }
+
+    public void createDatasetBegin(LockList locks, String dataverseName, String itemTypeDataverseName,
+            String itemTypeFullyQualifiedName, String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName,
+            String nodeGroupName, String compactionPolicyName, String datasetFullyQualifiedName,
+            boolean isDefaultCompactionPolicy) {
+        acquireDataverseReadLock(locks, dataverseName);
+        if (!dataverseName.equals(itemTypeDataverseName)) {
+            acquireDataverseReadLock(locks, itemTypeDataverseName);
+        }
+        if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(dataverseName)
+                && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) {
+            acquireDataverseReadLock(locks, metaItemTypeDataverseName);
+        }
+        acquireDataTypeReadLock(locks, itemTypeFullyQualifiedName);
+        if (metaItemTypeFullyQualifiedName != null
+                && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName)) {
+            acquireDataTypeReadLock(locks, metaItemTypeFullyQualifiedName);
+        }
+        acquireNodeGroupReadLock(locks, nodeGroupName);
+        if (!isDefaultCompactionPolicy) {
+            acquireCompactionPolicyReadLock(locks, compactionPolicyName);
+        }
+        acquireDatasetWriteLock(locks, datasetFullyQualifiedName);
+    }
+
+    public void createIndexBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) {
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireDatasetCreateIndexLock(locks, datasetFullyQualifiedName);
+    }
+
+    public void createTypeBegin(LockList locks, String dataverseName, String itemTypeFullyQualifiedName) {
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireDataTypeWriteLock(locks, itemTypeFullyQualifiedName);
+    }
+
+    public void dropDatasetBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) {
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireDatasetWriteLock(locks, datasetFullyQualifiedName);
+    }
+
+    public void dropIndexBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) {
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireDatasetWriteLock(locks, datasetFullyQualifiedName);
+    }
+
+    public void dropTypeBegin(LockList locks, String dataverseName, String dataTypeFullyQualifiedName) {
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireDataTypeWriteLock(locks, dataTypeFullyQualifiedName);
+    }
+
+    public void functionStatementBegin(LockList locks, String dataverseName, String functionFullyQualifiedName) {
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireFunctionWriteLock(locks, functionFullyQualifiedName);
+    }
+
+    public void modifyDatasetBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) {
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireDatasetModifyLock(locks, datasetFullyQualifiedName);
+    }
+
+    public void insertDeleteUpsertBegin(LockList locks, String datasetFullyQualifiedName) {
+        acquireDataverseReadLock(locks, DatasetUtil.getDataverseFromFullyQualifiedName(datasetFullyQualifiedName));
+        acquireDatasetModifyLock(locks, datasetFullyQualifiedName);
+    }
+
+    public void dropFeedBegin(LockList locks, String dataverseName, String feedFullyQualifiedName) {
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireFeedWriteLock(locks, feedFullyQualifiedName);
+    }
+
+    public void dropFeedPolicyBegin(LockList locks, String dataverseName, String policyName) {
+        acquireFeedWriteLock(locks, policyName);
+        acquireDataverseReadLock(locks, dataverseName);
+    }
+
+    public void startFeedBegin(LockList locks, String dataverseName, String feedName,
+            List<FeedConnection> feedConnections) {
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireFeedReadLock(locks, feedName);
+        for (FeedConnection feedConnection : feedConnections) {
+            // what if the dataset is in a different dataverse
+            String fqName = dataverseName + "." + feedConnection.getDatasetName();
+            acquireDatasetReadLock(locks, fqName);
+        }
+    }
+
+    public void stopFeedBegin(LockList locks, String dataverseName, String feedName) {
+        // TODO: dataset lock?
+        // Dataset locks are not required here since datasets are protected by the active event listener
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireFeedReadLock(locks, feedName);
+    }
+
+    public void createFeedBegin(LockList locks, String dataverseName, String feedFullyQualifiedName) {
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireFeedWriteLock(locks, feedFullyQualifiedName);
+    }
+
+    public void connectFeedBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName,
+            String feedFullyQualifiedName) {
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireDatasetReadLock(locks, datasetFullyQualifiedName);
+        acquireFeedReadLock(locks, feedFullyQualifiedName);
+    }
+
+    public void createFeedPolicyBegin(LockList locks, String dataverseName, String policyName) {
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireFeedPolicyWriteLock(locks, policyName);
+    }
+
+    public void disconnectFeedBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName,
+            String feedFullyQualifiedName) {
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireDatasetReadLock(locks, datasetFullyQualifiedName);
+        acquireFeedReadLock(locks, feedFullyQualifiedName);
+    }
+
+    public void compactBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) {
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireDatasetReadLock(locks, datasetFullyQualifiedName);
+    }
+
+    public void refreshDatasetBegin(LockList locks, String dataverseName, String datasetFullyQualifiedName) {
+        acquireDataverseReadLock(locks, dataverseName);
+        acquireExternalDatasetRefreshLock(locks, datasetFullyQualifiedName);
+    }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetLock.java
deleted file mode 100644
index d53191c..0000000
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetLock.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.utils;
-
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.asterix.om.base.AMutableInt32;
-
-public class DatasetLock {
-
-    private ReentrantReadWriteLock dsLock;
-    private ReentrantReadWriteLock dsModifyLock;
-    private AMutableInt32 indexBuildCounter;
-
-    public DatasetLock() {
-        dsLock = new ReentrantReadWriteLock(true);
-        dsModifyLock = new ReentrantReadWriteLock(true);
-        indexBuildCounter = new AMutableInt32(0);
-    }
-
-    public void acquireReadLock() {
-        // query
-        // build index
-        // insert
-        dsLock.readLock().lock();
-    }
-
-    public void releaseReadLock() {
-        // query
-        // build index
-        // insert
-        dsLock.readLock().unlock();
-    }
-
-    public void acquireWriteLock() {
-        // create ds
-        // delete ds
-        // drop index
-        dsLock.writeLock().lock();
-    }
-
-    public void releaseWriteLock() {
-        // create ds
-        // delete ds
-        // drop index
-        dsLock.writeLock().unlock();
-    }
-
-    public void acquireReadModifyLock() {
-        // insert
-        dsModifyLock.readLock().lock();
-    }
-
-    public void releaseReadModifyLock() {
-        // insert
-        dsModifyLock.readLock().unlock();
-    }
-
-    public void acquireWriteModifyLock() {
-        // Build index statement
-        synchronized (indexBuildCounter) {
-            if (indexBuildCounter.getIntegerValue() > 0) {
-                indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() + 1);
-            } else {
-                dsModifyLock.writeLock().lock();
-                indexBuildCounter.setValue(1);
-            }
-        }
-    }
-
-    public void releaseWriteModifyLock() {
-        // Build index statement
-        synchronized (indexBuildCounter) {
-            if (indexBuildCounter.getIntegerValue() == 1) {
-                dsModifyLock.writeLock().unlock();
-            }
-            indexBuildCounter.setValue(indexBuildCounter.getIntegerValue() - 1);
-        }
-    }
-
-    public void acquireRefreshLock() {
-        // Refresh External Dataset statement
-        dsModifyLock.writeLock().lock();
-    }
-
-    public void releaseRefreshLock() {
-        // Refresh External Dataset statement
-        dsModifyLock.writeLock().unlock();
-    }
-}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 572cc75..ca56cc3 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -94,8 +94,8 @@
     private DatasetUtil() {
     }
 
-    public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset,
-            ARecordType itemType, ARecordType metaItemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
+    public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset, ARecordType itemType,
+            ARecordType metaItemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
             throws AlgebricksException {
         List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
         IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[partitioningKeys.size()];
@@ -181,8 +181,7 @@
 
     public static List<List<String>> getPartitioningKeys(Dataset dataset) {
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-            return IndexingConstants
-                    .getRIDKeys(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
+            return IndexingConstants.getRIDKeys(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
         }
         return ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
     }
@@ -276,8 +275,7 @@
         String compactionPolicyFactoryClassName = compactionPolicy.getClassName();
         ILSMMergePolicyFactory mergePolicyFactory;
         try {
-            mergePolicyFactory =
-                    (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance();
+            mergePolicyFactory = (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance();
             if (mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
                 ((CorrelatedPrefixMergePolicyFactory) mergePolicyFactory).setDatasetID(dataset.getDatasetId());
             }
@@ -338,8 +336,8 @@
                 metadataProvider.getSplitProviderAndConstraints(dataset);
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
-        IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(
-                metadataProvider, primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second);
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider,
+                primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second);
         IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
         IndexDropOperatorDescriptor primaryBtreeDrop =
                 new IndexDropOperatorDescriptor(specPrimary, storageComponentProvider.getStorageManager(),
@@ -393,8 +391,8 @@
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
 
-        IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(
-                metadataProvider, primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second);
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(metadataProvider,
+                primaryIndex, itemType, metaType, compactionInfo.first, compactionInfo.second);
         IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
         IndexDropOperatorDescriptor primaryBtreeDrop =
                 new IndexDropOperatorDescriptor(specPrimary, storageComponentProvider.getStorageManager(),
@@ -467,8 +465,8 @@
                 index, itemType, metaItemType, compactionInfo.first, compactionInfo.second);
         TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
                 RuntimeComponentsProvider.RUNTIME_PROVIDER, RuntimeComponentsProvider.RUNTIME_PROVIDER,
-                splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
-                dataflowHelperFactory, localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE,
+                splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory,
+                localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE,
                 storageComponentProvider.getMetadataPageManagerFactory());
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
                 splitsAndConstraint.second);
@@ -518,4 +516,13 @@
         spec.addRoot(compactOp);
         return spec;
     }
+
+    public static boolean isFullyQualifiedName(String datasetName) {
+        return datasetName.indexOf('.') > 0; //NOSONAR a fully qualified name can't start with a .
+    }
+
+    public static String getDataverseFromFullyQualifiedName(String datasetName) {
+        int idx = datasetName.indexOf('.');
+        return datasetName.substring(0, idx);
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
deleted file mode 100644
index 9292008..0000000
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
+++ /dev/null
@@ -1,587 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.utils;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.FeedConnection;
-
-public class MetadataLockManager {
-
-    public static final MetadataLockManager INSTANCE = new MetadataLockManager();
-    private static final Function<String, ReentrantReadWriteLock> LOCK_FUNCTION = key -> new ReentrantReadWriteLock();
-    private static final Function<String, DatasetLock> DATASET_LOCK_FUNCTION = key -> new DatasetLock();
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataversesLocks;
-    private final ConcurrentHashMap<String, DatasetLock> datasetsLocks;
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> functionsLocks;
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> nodeGroupsLocks;
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedsLocks;
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedPolicyLocks;
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> compactionPolicyLocks;
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks;
-    private final ConcurrentHashMap<String, ReentrantReadWriteLock> extensionLocks;
-
-    private MetadataLockManager() {
-        dataversesLocks = new ConcurrentHashMap<>();
-        datasetsLocks = new ConcurrentHashMap<>();
-        functionsLocks = new ConcurrentHashMap<>();
-        nodeGroupsLocks = new ConcurrentHashMap<>();
-        feedsLocks = new ConcurrentHashMap<>();
-        feedPolicyLocks = new ConcurrentHashMap<>();
-        compactionPolicyLocks = new ConcurrentHashMap<>();
-        dataTypeLocks = new ConcurrentHashMap<>();
-        extensionLocks = new ConcurrentHashMap<>();
-    }
-
-    public void acquireDataverseReadLock(String dataverseName) {
-        ReentrantReadWriteLock dvLock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
-        dvLock.readLock().lock();
-    }
-
-    public void releaseDataverseReadLock(String dataverseName) {
-        dataversesLocks.get(dataverseName).readLock().unlock();
-    }
-
-    public void acquireDataverseWriteLock(String dataverseName) {
-        ReentrantReadWriteLock dvLock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
-        dvLock.writeLock().lock();
-    }
-
-    public void releaseDataverseWriteLock(String dataverseName) {
-        dataversesLocks.get(dataverseName).writeLock().unlock();
-    }
-
-    public void acquireDatasetReadLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
-        dsLock.acquireReadLock();
-    }
-
-    public void releaseDatasetReadLock(String datasetName) {
-        datasetsLocks.get(datasetName).releaseReadLock();
-    }
-
-    public void acquireDatasetWriteLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
-        dsLock.acquireWriteLock();
-    }
-
-    public void releaseDatasetWriteLock(String datasetName) {
-        datasetsLocks.get(datasetName).releaseWriteLock();
-    }
-
-    public void acquireDatasetModifyLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
-        dsLock.acquireReadLock();
-        dsLock.acquireReadModifyLock();
-    }
-
-    public void releaseDatasetModifyLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.get(datasetName);
-        dsLock.releaseReadModifyLock();
-        dsLock.releaseReadLock();
-    }
-
-    public void acquireDatasetCreateIndexLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
-        dsLock.acquireReadLock();
-        dsLock.acquireWriteModifyLock();
-    }
-
-    public void releaseDatasetCreateIndexLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.get(datasetName);
-        dsLock.releaseWriteModifyLock();
-        dsLock.releaseReadLock();
-    }
-
-    public void acquireExternalDatasetRefreshLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
-        dsLock.acquireReadLock();
-        dsLock.acquireRefreshLock();
-    }
-
-    public void releaseExternalDatasetRefreshLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.get(datasetName);
-        dsLock.releaseRefreshLock();
-        dsLock.releaseReadLock();
-    }
-
-    public void acquireFunctionReadLock(String functionName) {
-        ReentrantReadWriteLock fLock = functionsLocks.computeIfAbsent(functionName, LOCK_FUNCTION);
-        fLock.readLock().lock();
-    }
-
-    public void releaseFunctionReadLock(String functionName) {
-        functionsLocks.get(functionName).readLock().unlock();
-    }
-
-    public void acquireFunctionWriteLock(String functionName) {
-        ReentrantReadWriteLock fLock = functionsLocks.computeIfAbsent(functionName, LOCK_FUNCTION);
-        fLock.writeLock().lock();
-    }
-
-    public void releaseFunctionWriteLock(String functionName) {
-        functionsLocks.get(functionName).writeLock().unlock();
-    }
-
-    public void acquireNodeGroupReadLock(String nodeGroupName) {
-        ReentrantReadWriteLock ngLock = nodeGroupsLocks.computeIfAbsent(nodeGroupName, LOCK_FUNCTION);
-        ngLock.readLock().lock();
-    }
-
-    public void releaseNodeGroupReadLock(String nodeGroupName) {
-        nodeGroupsLocks.get(nodeGroupName).readLock().unlock();
-    }
-
-    public void acquireNodeGroupWriteLock(String nodeGroupName) {
-        ReentrantReadWriteLock ngLock = nodeGroupsLocks.computeIfAbsent(nodeGroupName, LOCK_FUNCTION);
-        ngLock.writeLock().lock();
-    }
-
-    public void releaseNodeGroupWriteLock(String nodeGroupName) {
-        nodeGroupsLocks.get(nodeGroupName).writeLock().unlock();
-    }
-
-    public void acquireFeedReadLock(String feedName) {
-        ReentrantReadWriteLock fLock = feedsLocks.computeIfAbsent(feedName, LOCK_FUNCTION);
-        fLock.readLock().lock();
-    }
-
-    public void releaseFeedReadLock(String feedName) {
-        feedsLocks.get(feedName).readLock().unlock();
-    }
-
-    public void acquireFeedWriteLock(String feedName) {
-        ReentrantReadWriteLock fLock = feedsLocks.computeIfAbsent(feedName, LOCK_FUNCTION);
-        fLock.writeLock().lock();
-    }
-
-    public void releaseFeedWriteLock(String feedName) {
-        feedsLocks.get(feedName).writeLock().unlock();
-    }
-
-    public void acquireFeedPolicyWriteLock(String policyName) {
-        ReentrantReadWriteLock fLock = feedPolicyLocks.computeIfAbsent(policyName, LOCK_FUNCTION);
-        fLock.writeLock().lock();
-    }
-
-    public void releaseFeedPolicyWriteLock(String policyName) {
-        feedPolicyLocks.get(policyName).writeLock().unlock();
-    }
-
-    public void acquireCompactionPolicyReadLock(String compactionPolicyName) {
-        ReentrantReadWriteLock compactionPolicyLock =
-                compactionPolicyLocks.computeIfAbsent(compactionPolicyName, LOCK_FUNCTION);
-        compactionPolicyLock.readLock().lock();
-    }
-
-    public void releaseCompactionPolicyReadLock(String compactionPolicyName) {
-        compactionPolicyLocks.get(compactionPolicyName).readLock().unlock();
-    }
-
-    public void acquireCompactionPolicyWriteLock(String compactionPolicyName) {
-        ReentrantReadWriteLock compactionPolicyLock =
-                compactionPolicyLocks.computeIfAbsent(compactionPolicyName, LOCK_FUNCTION);
-        compactionPolicyLock.writeLock().lock();
-    }
-
-    public void releaseCompactionPolicyWriteLock(String compactionPolicyName) {
-        compactionPolicyLocks.get(compactionPolicyName).writeLock().unlock();
-    }
-
-    public void acquireDataTypeReadLock(String dataTypeName) {
-        ReentrantReadWriteLock dataTypeLock = dataTypeLocks.computeIfAbsent(dataTypeName, LOCK_FUNCTION);
-        dataTypeLock.readLock().lock();
-    }
-
-    public void releaseDataTypeReadLock(String dataTypeName) {
-        dataTypeLocks.get(dataTypeName).readLock().unlock();
-    }
-
-    public void acquireDataTypeWriteLock(String dataTypeName) {
-        ReentrantReadWriteLock dataTypeLock = dataTypeLocks.computeIfAbsent(dataTypeName, LOCK_FUNCTION);
-        dataTypeLock.writeLock().lock();
-    }
-
-    public void releaseDataTypeWriteLock(String dataTypeName) {
-        dataTypeLocks.get(dataTypeName).writeLock().unlock();
-    }
-
-    public void createDatasetBegin(String dataverseName, String itemTypeDataverseName,
-            String itemTypeFullyQualifiedName, String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName,
-            String nodeGroupName, String compactionPolicyName, String datasetFullyQualifiedName,
-            boolean isDefaultCompactionPolicy) {
-        acquireDataverseReadLock(dataverseName);
-        if (!dataverseName.equals(itemTypeDataverseName)) {
-            acquireDataverseReadLock(itemTypeDataverseName);
-        }
-        if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(dataverseName)
-                && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) {
-            acquireDataverseReadLock(metaItemTypeDataverseName);
-        }
-        acquireDataTypeReadLock(itemTypeFullyQualifiedName);
-        if (metaItemTypeFullyQualifiedName != null
-                && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName)) {
-            acquireDataTypeReadLock(metaItemTypeFullyQualifiedName);
-        }
-        acquireNodeGroupReadLock(nodeGroupName);
-        if (!isDefaultCompactionPolicy) {
-            acquireCompactionPolicyReadLock(compactionPolicyName);
-        }
-        acquireDatasetWriteLock(datasetFullyQualifiedName);
-    }
-
-    public void createDatasetEnd(String dataverseName, String itemTypeDataverseName, String itemTypeFullyQualifiedName,
-            String metaItemTypeDataverseName, String metaItemTypeFullyQualifiedName, String nodeGroupName,
-            String compactionPolicyName, String datasetFullyQualifiedName, boolean isDefaultCompactionPolicy) {
-        releaseDatasetWriteLock(datasetFullyQualifiedName);
-        if (!isDefaultCompactionPolicy) {
-            releaseCompactionPolicyReadLock(compactionPolicyName);
-        }
-        releaseNodeGroupReadLock(nodeGroupName);
-        if (metaItemTypeFullyQualifiedName != null
-                && !metaItemTypeFullyQualifiedName.equals(itemTypeFullyQualifiedName)) {
-            releaseDataTypeReadLock(metaItemTypeFullyQualifiedName);
-        }
-        releaseDataTypeReadLock(itemTypeFullyQualifiedName);
-        if (metaItemTypeDataverseName != null && !metaItemTypeDataverseName.equals(dataverseName)
-                && !metaItemTypeDataverseName.equals(itemTypeDataverseName)) {
-            releaseDataverseReadLock(metaItemTypeDataverseName);
-        }
-        if (!dataverseName.equals(itemTypeDataverseName)) {
-            releaseDataverseReadLock(itemTypeDataverseName);
-        }
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void createIndexBegin(String dataverseName, String datasetFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetCreateIndexLock(datasetFullyQualifiedName);
-    }
-
-    public void createIndexEnd(String dataverseName, String datasetFullyQualifiedName) {
-        releaseDatasetCreateIndexLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void createTypeBegin(String dataverseName, String itemTypeFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDataTypeWriteLock(itemTypeFullyQualifiedName);
-    }
-
-    public void createTypeEnd(String dataverseName, String itemTypeFullyQualifiedName) {
-        releaseDataTypeWriteLock(itemTypeFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void dropDatasetBegin(String dataverseName, String datasetFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetWriteLock(datasetFullyQualifiedName);
-    }
-
-    public void dropDatasetEnd(String dataverseName, String datasetFullyQualifiedName) {
-        releaseDatasetWriteLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void dropIndexBegin(String dataverseName, String datasetFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetWriteLock(datasetFullyQualifiedName);
-    }
-
-    public void dropIndexEnd(String dataverseName, String datasetFullyQualifiedName) {
-        releaseDatasetWriteLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void dropTypeBegin(String dataverseName, String dataTypeFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDataTypeWriteLock(dataTypeFullyQualifiedName);
-    }
-
-    public void dropTypeEnd(String dataverseName, String dataTypeFullyQualifiedName) {
-        releaseDataTypeWriteLock(dataTypeFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void functionStatementBegin(String dataverseName, String functionFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireFunctionWriteLock(functionFullyQualifiedName);
-    }
-
-    public void functionStatementEnd(String dataverseName, String functionFullyQualifiedName) {
-        releaseFunctionWriteLock(functionFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void modifyDatasetBegin(String dataverseName, String datasetFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetModifyLock(datasetFullyQualifiedName);
-    }
-
-    public void modifyDatasetEnd(String dataverseName, String datasetFullyQualifiedName) {
-        releaseDatasetModifyLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void insertDeleteUpsertBegin(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
-            List<String> datasets) {
-        dataverses.add(dataverseName);
-        datasets.add(datasetFullyQualifiedName);
-        Collections.sort(dataverses);
-        Collections.sort(datasets);
-
-        String previous = null;
-        for (int i = 0; i < dataverses.size(); i++) {
-            String current = dataverses.get(i);
-            if (!current.equals(previous)) {
-                acquireDataverseReadLock(current);
-                previous = current;
-            }
-        }
-
-        for (int i = 0; i < datasets.size(); i++) {
-            String current = datasets.get(i);
-            if (!current.equals(previous)) {
-                if (current.equals(datasetFullyQualifiedName)) {
-                    acquireDatasetModifyLock(current);
-                } else {
-                    acquireDatasetReadLock(current);
-                }
-                previous = current;
-            }
-        }
-    }
-
-    public void insertDeleteUpsertEnd(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
-            List<String> datasets) {
-        String previous = null;
-        for (int i = dataverses.size() - 1; i >= 0; i--) {
-            String current = dataverses.get(i);
-            if (!current.equals(previous)) {
-                releaseDataverseReadLock(current);
-                previous = current;
-            }
-        }
-        for (int i = datasets.size() - 1; i >= 0; i--) {
-            String current = datasets.get(i);
-            if (!current.equals(previous)) {
-                if (current.equals(datasetFullyQualifiedName)) {
-                    releaseDatasetModifyLock(current);
-                } else {
-                    releaseDatasetReadLock(current);
-                }
-                previous = current;
-            }
-        }
-    }
-
-    public void dropFeedBegin(String dataverseName, String feedFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireFeedWriteLock(feedFullyQualifiedName);
-    }
-
-    public void dropFeedEnd(String dataverseName, String feedFullyQualifiedName) {
-        releaseFeedWriteLock(feedFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void dropFeedPolicyBegin(String dataverseName, String policyName) {
-        acquireFeedWriteLock(policyName);
-        acquireDataverseReadLock(dataverseName);
-    }
-
-    public void dropFeedPolicyEnd(String dataverseName, String policyName) {
-        releaseFeedWriteLock(policyName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void startFeedBegin(String dataverseName, String feedName, List<FeedConnection> feedConnections) {
-        acquireDataverseReadLock(dataverseName);
-        acquireFeedReadLock(feedName);
-        for (FeedConnection feedConnection : feedConnections) {
-            acquireDatasetReadLock(dataverseName + "." + feedConnection.getDatasetName());
-        }
-    }
-
-    public void startFeedEnd(String dataverseName, String feedName, List<FeedConnection> feedConnections) {
-        releaseDataverseReadLock(dataverseName);
-        releaseFeedReadLock(feedName);
-        for (FeedConnection feedConnection : feedConnections) {
-            releaseDatasetReadLock(dataverseName + "." + feedConnection.getDatasetName());
-        }
-    }
-
-    public void StopFeedBegin(String dataverseName, String feedName) {
-        // TODO: dataset lock?
-        acquireDataverseReadLock(dataverseName);
-        acquireFeedReadLock(feedName);
-    }
-
-    public void StopFeedEnd(String dataverseName, String feedName) {
-        releaseDataverseReadLock(dataverseName);
-        releaseFeedReadLock(feedName);
-    }
-
-    public void createFeedBegin(String dataverseName, String feedFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireFeedWriteLock(feedFullyQualifiedName);
-    }
-
-    public void createFeedEnd(String dataverseName, String feedFullyQualifiedName) {
-        releaseFeedWriteLock(feedFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void connectFeedBegin(String dataverseName, String datasetFullyQualifiedName,
-            String feedFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetReadLock(datasetFullyQualifiedName);
-        acquireFeedReadLock(feedFullyQualifiedName);
-    }
-
-    public void connectFeedEnd(String dataverseName, String datasetFullyQualifiedName, String feedFullyQualifiedName) {
-        releaseFeedReadLock(feedFullyQualifiedName);
-        releaseDatasetReadLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void createFeedPolicyBegin(String dataverseName, String policyName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireFeedPolicyWriteLock(policyName);
-    }
-
-    public void createFeedPolicyEnd(String dataverseName, String policyName) {
-        releaseFeedPolicyWriteLock(policyName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void disconnectFeedBegin(String dataverseName, String datasetFullyQualifiedName,
-            String feedFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetReadLock(datasetFullyQualifiedName);
-        acquireFeedReadLock(feedFullyQualifiedName);
-    }
-
-    public void disconnectFeedEnd(String dataverseName, String datasetFullyQualifiedName,
-            String feedFullyQualifiedName) {
-        releaseFeedReadLock(feedFullyQualifiedName);
-        releaseDatasetReadLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void subscribeFeedBegin(String dataverseName, String datasetFullyQualifiedName,
-            String feedFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetReadLock(datasetFullyQualifiedName);
-        acquireFeedReadLock(feedFullyQualifiedName);
-    }
-
-    public void subscribeFeedEnd(String dataverseName, String datasetFullyQualifiedName,
-            String feedFullyQualifiedName) {
-        releaseFeedReadLock(feedFullyQualifiedName);
-        releaseDatasetReadLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void compactBegin(String dataverseName, String datasetFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireDatasetReadLock(datasetFullyQualifiedName);
-    }
-
-    public void compactEnd(String dataverseName, String datasetFullyQualifiedName) {
-        releaseDatasetReadLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void queryBegin(Dataverse dataverse, List<String> dataverses, List<String> datasets) {
-        if (dataverse != null) {
-            dataverses.add(dataverse.getDataverseName());
-        }
-        Collections.sort(dataverses);
-        Collections.sort(datasets);
-
-        String previous = null;
-        for (int i = 0; i < dataverses.size(); i++) {
-            String current = dataverses.get(i);
-            if (!current.equals(previous)) {
-                acquireDataverseReadLock(current);
-                previous = current;
-            }
-        }
-
-        for (int i = 0; i < datasets.size(); i++) {
-            String current = datasets.get(i);
-            if (!current.equals(previous)) {
-                acquireDatasetReadLock(current);
-                previous = current;
-            }
-        }
-    }
-
-    public void queryEnd(List<String> dataverses, List<String> datasets) {
-        String previous = null;
-        for (int i = dataverses.size() - 1; i >= 0; i--) {
-            String current = dataverses.get(i);
-            if (!current.equals(previous)) {
-                releaseDataverseReadLock(current);
-                previous = current;
-            }
-        }
-        for (int i = datasets.size() - 1; i >= 0; i--) {
-            String current = datasets.get(i);
-            if (!current.equals(previous)) {
-                releaseDatasetReadLock(current);
-                previous = current;
-            }
-        }
-    }
-
-    public void refreshDatasetBegin(String dataverseName, String datasetFullyQualifiedName) {
-        acquireDataverseReadLock(dataverseName);
-        acquireExternalDatasetRefreshLock(datasetFullyQualifiedName);
-    }
-
-    public void refreshDatasetEnd(String dataverseName, String datasetFullyQualifiedName) {
-        releaseExternalDatasetRefreshLock(datasetFullyQualifiedName);
-        releaseDataverseReadLock(dataverseName);
-    }
-
-    public void acquireExtensionReadLock(String entityName) {
-        ReentrantReadWriteLock entityLock = extensionLocks.computeIfAbsent(entityName, LOCK_FUNCTION);
-        entityLock.readLock().lock();
-    }
-
-    public void releaseExtensionReadLock(String entityName) {
-        extensionLocks.get(entityName).readLock().unlock();
-    }
-
-    public void acquireExtensionWriteLock(String entityName) {
-        ReentrantReadWriteLock entityLock = extensionLocks.computeIfAbsent(entityName, LOCK_FUNCTION);
-        entityLock.writeLock().lock();
-    }
-
-    public void releaseExtensionWriteLock(String entityName) {
-        extensionLocks.get(entityName).writeLock().unlock();
-    }
-}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
index 8859b9d..45034de 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/RTreeDataflowHelperFactoryProvider.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index 35e7acb..a4b849f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -48,19 +48,19 @@
         ClusterPartition[] clusterPartition = ClusterStateManager.INSTANCE.getClusterPartitons();
         String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName();
         for (int j = 0; j < clusterPartition.length; j++) {
-                File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName,
-                    clusterPartition[j].getPartitionId()) + File.separator + relPathFile);
-                splits.add(StoragePathUtil.getFileSplitForClusterPartition(clusterPartition[j], f.getPath()));
+            File f = new File(
+                    StoragePathUtil.prepareStoragePartitionPath(storageDirName, clusterPartition[j].getPartitionId())
+                            + File.separator + relPathFile);
+            splits.add(StoragePathUtil.getFileSplitForClusterPartition(clusterPartition[j], f.getPath()));
         }
         return splits.toArray(new FileSplit[] {});
     }
 
-    public static FileSplit[] getDatasetSplits(MetadataTransactionContext mdTxnCtx, String dataverseName,
-            String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
+    public static FileSplit[] getDatasetSplits(Dataset dataset, MetadataTransactionContext mdTxnCtx,
+            String targetIdxName, boolean temp) throws AlgebricksException {
         try {
-            File relPathFile =
-                    new File(StoragePathUtil.prepareDataverseIndexName(dataverseName, datasetName, targetIdxName));
-            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            File relPathFile = new File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
+                    dataset.getDatasetName(), targetIdxName));
             List<String> nodeGroup =
                     MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames();
             if (nodeGroup == null) {
@@ -92,12 +92,11 @@
         }
     }
 
-    private static FileSplit[] getFilesIndexSplits(MetadataTransactionContext mdTxnCtx, String dataverseName,
-            String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
+    private static FileSplit[] getFilesIndexSplits(Dataset dataset, MetadataTransactionContext mdTxnCtx,
+            String targetIdxName, boolean create) throws AlgebricksException {
         try {
-            File relPathFile =
-                    new File(StoragePathUtil.prepareDataverseIndexName(dataverseName, datasetName, targetIdxName));
-            Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+            File relPathFile = new File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
+                    dataset.getDatasetName(), targetIdxName));
             List<String> nodeGroup =
                     MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames();
             if (nodeGroup == null) {
@@ -114,14 +113,14 @@
                     // Only the first partition when create
                     File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName,
                             nodePartitions[firstPartition].getPartitionId()) + File.separator + relPathFile);
-                    splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition], f
-                            .getPath()));
+                    splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition],
+                            f.getPath()));
                 } else {
                     for (int k = 0; k < nodePartitions.length; k++) {
                         File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName,
                                 nodePartitions[firstPartition].getPartitionId()) + File.separator + relPathFile);
-                        splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition], f
-                                .getPath()));
+                        splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition],
+                                f.getPath()));
                     }
                 }
             }
@@ -138,9 +137,9 @@
     }
 
     public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getFilesIndexSplitProviderAndConstraints(
-            MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String targetIdxName,
-            boolean create) throws AlgebricksException {
-        FileSplit[] splits = getFilesIndexSplits(mdTxnCtx, dataverseName, datasetName, targetIdxName, create);
+            Dataset dataset, MetadataTransactionContext mdTxnCtx, String targetIdxName, boolean create)
+            throws AlgebricksException {
+        FileSplit[] splits = getFilesIndexSplits(dataset, mdTxnCtx, targetIdxName, create);
         return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
     }