[ASTERIXDB-3259][MTD] Change metadata manager APIs to accept 'database'

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Change metadata manager APIs to accept 'database'.
The 'null' that is passed for 'database' from callers
will be changed in subsequent patches when 'database' is enabled.

Change-Id: I4eda46bbe2b8a6ff9e136de71856b3f62fdc0e9b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17783
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 5e66f68..b83c8f5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -164,7 +164,7 @@
         IAObject simThresh = ((AsterixConstantValue) similarityThreshold).getObject();
         int numPrimaryKeys = dataset.getPrimaryKeys().size();
         Index secondaryIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
-                dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+                dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName(), indexName);
         if (secondaryIndex == null) {
             throw new AlgebricksException(
                     "Code generation error: no index " + indexName + " for " + dataset() + " " + datasetName);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java
index f84a3b7..033e077 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/TypeTranslator.java
@@ -146,7 +146,7 @@
         // solve remaining top level references
         for (TypeSignature typeSignature : incompleteTopLevelTypeReferences.keySet()) {
             IAType t;
-            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, typeSignature.getDataverseName(),
+            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, null, typeSignature.getDataverseName(),
                     typeSignature.getName());
             if (dt == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_TYPE, sourceLoc, typeSignature.getName());
@@ -160,9 +160,9 @@
         // solve remaining field type references
         for (String trefName : incompleteFieldTypes.keySet()) {
             IAType t;
-            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, typeDataverse, trefName);
+            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, null, typeDataverse, trefName);
             if (dt == null) {
-                dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+                dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, null, MetadataConstants.METADATA_DATAVERSE_NAME,
                         trefName);
             }
             if (dt == null) {
@@ -190,7 +190,7 @@
             IAType t;
             Datatype dt;
             if (MetadataManager.INSTANCE != null) {
-                dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, typeSignature.getDataverseName(),
+                dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, null, typeSignature.getDataverseName(),
                         typeSignature.getName());
                 if (dt == null) {
                     throw new CompilationException(ErrorCode.UNKNOWN_TYPE, sourceLoc, typeSignature.getName());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
index 58a37bd..68b93f0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
@@ -249,7 +249,7 @@
     private Iterable<Dataset> getDatasetsInDataverseForRebalance(DataverseName dvName,
             MetadataTransactionContext mdTxnCtx) throws Exception {
         return MetadataConstants.METADATA_DATAVERSE_NAME.equals(dvName) ? Collections.emptyList()
-                : IterableUtils.filteredIterable(MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName),
+                : IterableUtils.filteredIterable(MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, null, dvName),
                         DatasetUtil::isNotView);
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 3456427..d32d7a9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -594,7 +594,9 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
             DataverseName dvName = stmtUseDataverse.getDataverseName();
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+            String database = null;
+            Dataverse dv =
+                    MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), database, dvName);
             if (dv == null) {
                 if (stmtUseDataverse.getIfExists()) {
                     if (warningCollector.shouldWarn()) {
@@ -639,7 +641,9 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
             DataverseName dvName = stmtCreateDataverse.getDataverseName();
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
+            String database = null;
+            Dataverse dv =
+                    MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), database, dvName);
             if (dv != null) {
                 if (stmtCreateDataverse.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -662,7 +666,8 @@
     protected static void validateCompactionPolicy(String compactionPolicy,
             Map<String, String> compactionPolicyProperties, MetadataTransactionContext mdTxnCtx,
             boolean isExternalDataset, SourceLocation sourceLoc) throws Exception {
-        CompactionPolicy compactionPolicyEntity = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
+        String database = null;
+        CompactionPolicy compactionPolicyEntity = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx, database,
                 MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy);
         if (compactionPolicyEntity == null) {
             throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
@@ -761,6 +766,7 @@
             TypeExpression itemTypeExpr, String itemTypeName, TypeExpression metaItemTypeExpr,
             DataverseName metaItemTypeDataverseName, String metaItemTypeName, IHyracksClientConnection hcc,
             IRequestParameters requestParameters) throws Exception {
+        String database = null;
         MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
         SourceLocation sourceLoc = dd.getSourceLocation();
         DatasetType dsType = dd.getDatasetType();
@@ -781,7 +787,7 @@
                 storageProperties.getColumnMaxTupleCount(), storageProperties.getColumnFreeSpaceTolerance());
         try {
             // Check if the dataverse exists
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dv == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName);
             }
@@ -921,8 +927,8 @@
 
             // #. add a new dataset with PendingNoOp after deleting the dataset with
             // PendingAddOp
-            MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
-                    requestParameters.isForceDropDataset());
+            MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), database, dataverseName,
+                    datasetName, requestParameters.isForceDropDataset());
             dataset.setPendingOp(MetadataUtil.PENDING_NO_OP);
             MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -959,15 +965,15 @@
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
-                    MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName,
+                    MetadataManager.INSTANCE.dropDataset(mdTxnCtx, database, dataverseName, datasetName,
                             requestParameters.isForceDropDataset());
                     if (itemTypeAdded) {
-                        MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, itemTypeEntity.getDataverseName(),
-                                itemTypeEntity.getDatatypeName());
+                        MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, itemTypeEntity.getDatabaseName(),
+                                itemTypeEntity.getDataverseName(), itemTypeEntity.getDatatypeName());
                     }
                     if (metaItemTypeAdded) {
-                        MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, metaItemTypeEntity.getDataverseName(),
-                                metaItemTypeEntity.getDatatypeName());
+                        MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, metaItemTypeEntity.getDatabaseName(),
+                                metaItemTypeEntity.getDataverseName(), metaItemTypeEntity.getDatatypeName());
                     }
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
@@ -1127,13 +1133,14 @@
     protected void doCreateIndex(MetadataProvider metadataProvider, CreateIndexStatement stmtCreateIndex,
             DataverseName dataverseName, String datasetName, IHyracksClientConnection hcc,
             IRequestParameters requestParameters) throws Exception {
+        String database = null;
         SourceLocation sourceLoc = stmtCreateIndex.getSourceLocation();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
             // Check if the dataverse exists
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dv == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName);
             }
@@ -1152,8 +1159,8 @@
             validateIndexType(datasetType, indexType, isSecondaryPrimary, sourceLoc);
 
             String indexName = stmtCreateIndex.getIndexName().getValue();
-            Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                    datasetName, indexName);
+            Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), database,
+                    dataverseName, datasetName, indexName);
             if (index != null) {
                 if (stmtCreateIndex.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1163,8 +1170,9 @@
                 }
             }
 
+            String itemTypeDatabase = null;
             Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
-                    ds.getItemTypeDataverseName(), ds.getItemTypeName());
+                    itemTypeDatabase, ds.getItemTypeDataverseName(), ds.getItemTypeName());
             ARecordType aRecordType = (ARecordType) dt.getDatatype();
             /* TODO: unused for now becase indexes on meta are disabled -- see below
             ARecordType metaRecordType = null;
@@ -1480,7 +1488,7 @@
     protected void doCreateFullTextFilter(MetadataProvider metadataProvider,
             CreateFullTextFilterStatement stmtCreateFilter, DataverseName dataverseName) throws Exception {
         AbstractFullTextFilterDescriptor filterDescriptor;
-
+        String database = null;
         String filterType = stmtCreateFilter.getFilterType();
         if (filterType == null) {
             throw new CompilationException(ErrorCode.PARSE_ERROR, stmtCreateFilter.getSourceLocation(),
@@ -1498,7 +1506,7 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dv == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, stmtCreateFilter.getSourceLocation(),
                         dataverseName);
@@ -1506,7 +1514,7 @@
 
             String filterName = stmtCreateFilter.getFilterName();
             FullTextFilterMetadataEntity existingFilter =
-                    MetadataManager.INSTANCE.getFullTextFilter(mdTxnCtx, dataverseName, filterName);
+                    MetadataManager.INSTANCE.getFullTextFilter(mdTxnCtx, database, dataverseName, filterName);
             if (existingFilter != null) {
                 if (stmtCreateFilter.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1549,13 +1557,13 @@
     protected void doCreateFullTextConfig(MetadataProvider metadataProvider,
             CreateFullTextConfigStatement stmtCreateConfig, DataverseName dataverseName, String configName,
             ImmutableList<String> filterNames) throws Exception {
-
+        String database = null;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
         try {
             FullTextConfigMetadataEntity existingConfig =
-                    MetadataManager.INSTANCE.getFullTextConfig(mdTxnCtx, dataverseName, configName);
+                    MetadataManager.INSTANCE.getFullTextConfig(mdTxnCtx, database, dataverseName, configName);
             if (existingConfig != null) {
                 if (stmtCreateConfig.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1569,7 +1577,7 @@
             ImmutableList.Builder<IFullTextFilterDescriptor> filterDescriptorsBuilder = ImmutableList.builder();
             for (String filterName : filterNames) {
                 FullTextFilterMetadataEntity filterMetadataEntity =
-                        MetadataManager.INSTANCE.getFullTextFilter(mdTxnCtx, dataverseName, filterName);
+                        MetadataManager.INSTANCE.getFullTextFilter(mdTxnCtx, database, dataverseName, filterName);
                 if (filterMetadataEntity == null) {
                     throw new CompilationException(ErrorCode.FULL_TEXT_FILTER_NOT_FOUND,
                             stmtCreateConfig.getSourceLocation(), filterName);
@@ -1622,8 +1630,9 @@
                     default:
                         throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, sourceLoc, "");
                 }
-                List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(
-                        metadataProvider.getMetadataTxnContext(), index.getDataverseName(), index.getDatasetName());
+                List<Index> indexes =
+                        MetadataManager.INSTANCE.getDatasetIndexes(metadataProvider.getMetadataTxnContext(),
+                                index.getDatabaseName(), index.getDataverseName(), index.getDatasetName());
                 for (Index existingIndex : indexes) {
                     if (!existingIndex.isEnforced()) {
                         continue;
@@ -1702,8 +1711,8 @@
 
             // #. add another new index with PendingNoOp after deleting the index with
             // PendingAddOp
-            MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), index.getDataverseName(),
-                    index.getDatasetName(), index.getIndexName());
+            MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), index.getDatabaseName(),
+                    index.getDataverseName(), index.getDatasetName(), index.getIndexName());
             index.setPendingOp(MetadataUtil.PENDING_NO_OP);
             MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1735,7 +1744,8 @@
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
                     MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
-                            index.getDataverseName(), index.getDatasetName(), index.getIndexName());
+                            index.getDatabaseName(), index.getDataverseName(), index.getDatasetName(),
+                            index.getIndexName());
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
@@ -1771,6 +1781,7 @@
         SourceLocation sourceLoc = stmtCreateType.getSourceLocation();
         String typeName = stmtCreateType.getIdent().getValue();
         metadataProvider.validateDatabaseObjectName(stmtCreateType.getDataverseName(), typeName, sourceLoc);
+        String database = null;
         DataverseName dataverseName = getActiveDataverseName(stmtCreateType.getDataverseName());
         if (isCompileOnly()) {
             return;
@@ -1779,11 +1790,11 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         lockUtil.createTypeBegin(lockManager, metadataProvider.getLocks(), dataverseName, typeName);
         try {
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dv == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName);
             }
-            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, database, dataverseName, typeName);
             if (dt != null) {
                 if (!stmtCreateType.getIfNotExists()) {
                     throw new CompilationException(ErrorCode.TYPE_EXISTS, sourceLoc, typeName);
@@ -1843,6 +1854,7 @@
             IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
         SourceLocation sourceLoc = stmtDropDataverse.getSourceLocation();
         DataverseName dataverseName = stmtDropDataverse.getDataverseName();
+        String database = null;
         ProgressState progress = ProgressState.NO_PROGRESS;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
@@ -1850,7 +1862,7 @@
         List<FeedEventsListener> feedsToStop = new ArrayList<>();
         List<JobSpecification> jobsToExecute = new ArrayList<>();
         try {
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dv == null) {
                 if (stmtDropDataverse.getIfExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1860,7 +1872,7 @@
                 }
             }
 
-            if (stmtDropDataverse.getIfEmpty() && isDataverseNotEmpty(dataverseName, mdTxnCtx)) {
+            if (stmtDropDataverse.getIfEmpty() && isDataverseNotEmpty(database, dataverseName, mdTxnCtx)) {
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 return false;
             }
@@ -1883,14 +1895,14 @@
             }
 
             // #. prepare jobs which will drop corresponding datasets with indexes.
-            List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
+            List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, database, dataverseName);
             for (Dataset dataset : datasets) {
                 String datasetName = dataset.getDatasetName();
                 DatasetType dsType = dataset.getDatasetType();
                 switch (dsType) {
                     case INTERNAL:
-                        List<Index> indexes =
-                                MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
+                        List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, database,
+                                dataverseName, datasetName);
                         for (Index index : indexes) {
                             jobsToExecute
                                     .add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset, sourceLoc));
@@ -1903,7 +1915,7 @@
             }
 
             // #. prepare jobs which will drop corresponding libraries.
-            List<Library> libraries = MetadataManager.INSTANCE.getDataverseLibraries(mdTxnCtx, dataverseName);
+            List<Library> libraries = MetadataManager.INSTANCE.getDataverseLibraries(mdTxnCtx, database, dataverseName);
             for (Library library : libraries) {
                 jobsToExecute.add(ExternalLibraryJobUtils.buildDropLibraryJobSpec(dataverseName, library.getName(),
                         metadataProvider));
@@ -1915,7 +1927,7 @@
             // first, deleting the dataverse record from the DATAVERSE_DATASET
             // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
             // Note: the delete operation fails if the dataverse cannot be deleted due to metadata dependencies
-            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, database, dataverseName);
             MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
                     new Dataverse(dataverseName, dv.getDataFormat(), MetadataUtil.PENDING_DROP_OP));
 
@@ -1939,7 +1951,7 @@
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
             // #. finally, delete the dataverse.
-            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, database, dataverseName);
 
             // Drops all node groups that no longer needed
             for (Dataset dataset : datasets) {
@@ -1981,7 +1993,7 @@
                 // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 try {
-                    MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
+                    MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, database, dataverseName);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
@@ -1994,9 +2006,9 @@
         }
     }
 
-    protected boolean isDataverseNotEmpty(DataverseName dataverseName, MetadataTransactionContext mdTxnCtx)
-            throws AlgebricksException {
-        return MetadataManager.INSTANCE.isDataverseNotEmpty(mdTxnCtx, dataverseName);
+    protected boolean isDataverseNotEmpty(String database, DataverseName dataverseName,
+            MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
+        return MetadataManager.INSTANCE.isDataverseNotEmpty(mdTxnCtx, database, dataverseName);
     }
 
     protected void validateDataverseStateBeforeDrop(MetadataProvider metadataProvider, Dataverse dataverse,
@@ -2031,6 +2043,7 @@
     protected boolean doDropDataset(DataverseName dataverseName, String datasetName, MetadataProvider metadataProvider,
             boolean ifExists, IHyracksClientConnection hcc, IRequestParameters requestParameters,
             boolean dropCorrespondingNodeGroup, SourceLocation sourceLoc) throws Exception {
+        String database = null;
         MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
         MutableObject<MetadataTransactionContext> mdTxnCtx =
                 new MutableObject<>(MetadataManager.INSTANCE.beginTransaction());
@@ -2040,7 +2053,7 @@
         Dataset ds = null;
         try {
             // Check if the dataverse exists
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx.getValue(), dataverseName);
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx.getValue(), database, dataverseName);
             if (dv == null) {
                 if (ifExists) {
                     if (warningCollector.shouldWarn()) {
@@ -2104,8 +2117,8 @@
                 mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
                 metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
                 try {
-                    MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-                            datasetName, requestParameters.isForceDropDataset());
+                    MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), database,
+                            dataverseName, datasetName, requestParameters.isForceDropDataset());
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
@@ -2139,6 +2152,7 @@
     protected boolean doDropIndex(MetadataProvider metadataProvider, IndexDropStatement stmtIndexDrop,
             DataverseName dataverseName, String datasetName, IHyracksClientConnection hcc,
             IRequestParameters requestParameters) throws Exception {
+        String database = null;
         SourceLocation sourceLoc = stmtIndexDrop.getSourceLocation();
         String indexName = stmtIndexDrop.getIndexName().getValue();
         ProgressState progress = ProgressState.NO_PROGRESS;
@@ -2154,7 +2168,8 @@
                         dataverseName);
             }
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+                Index index =
+                        MetadataManager.INSTANCE.getIndex(mdTxnCtx, database, dataverseName, datasetName, indexName);
                 if (index == null) {
                     if (stmtIndexDrop.getIfExists()) {
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2183,7 +2198,7 @@
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
                 // #. finally, delete the existing index
-                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, database, dataverseName, datasetName, indexName);
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             return true;
@@ -2208,8 +2223,8 @@
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
-                    MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                            datasetName, indexName);
+                    MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), database,
+                            dataverseName, datasetName, indexName);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
@@ -2242,11 +2257,12 @@
 
     protected void doDropFullTextFilter(MetadataProvider metadataProvider, FullTextFilterDropStatement stmtFilterDrop,
             DataverseName dataverseName, String fullTextFilterName) throws AlgebricksException, RemoteException {
+        String database = null;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
             FullTextFilterMetadataEntity filter =
-                    MetadataManager.INSTANCE.getFullTextFilter(mdTxnCtx, dataverseName, fullTextFilterName);
+                    MetadataManager.INSTANCE.getFullTextFilter(mdTxnCtx, database, dataverseName, fullTextFilterName);
             if (filter == null) {
                 if (stmtFilterDrop.getIfExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2257,7 +2273,7 @@
                 }
             }
 
-            MetadataManager.INSTANCE.dropFullTextFilter(mdTxnCtx, dataverseName, fullTextFilterName);
+            MetadataManager.INSTANCE.dropFullTextFilter(mdTxnCtx, database, dataverseName, fullTextFilterName);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
@@ -2292,6 +2308,7 @@
                     stmtConfigDrop.getSourceLocation());
         }
 
+        String database = null;
         DataverseName dataverseName = getActiveDataverseName(stmtConfigDrop.getDataverseName());
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2299,7 +2316,7 @@
 
         try {
             FullTextConfigMetadataEntity configMetadataEntity =
-                    MetadataManager.INSTANCE.getFullTextConfig(mdTxnCtx, dataverseName, fullTextConfigName);
+                    MetadataManager.INSTANCE.getFullTextConfig(mdTxnCtx, database, dataverseName, fullTextConfigName);
             if (configMetadataEntity == null) {
                 if (stmtConfigDrop.getIfExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2310,7 +2327,7 @@
                 }
             }
 
-            MetadataManager.INSTANCE.dropFullTextConfig(mdTxnCtx, dataverseName, fullTextConfigName);
+            MetadataManager.INSTANCE.dropFullTextConfig(mdTxnCtx, database, dataverseName, fullTextConfigName);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
@@ -2323,6 +2340,7 @@
         SourceLocation sourceLoc = stmtTypeDrop.getSourceLocation();
         String typeName = stmtTypeDrop.getTypeName().getValue();
         metadataProvider.validateDatabaseObjectName(stmtTypeDrop.getDataverseName(), typeName, sourceLoc);
+        String database = null;
         DataverseName dataverseName = getActiveDataverseName(stmtTypeDrop.getDataverseName());
         if (isCompileOnly()) {
             return;
@@ -2332,7 +2350,7 @@
         lockUtil.dropTypeBegin(lockManager, metadataProvider.getLocks(), dataverseName, typeName);
         try {
             // Check if the dataverse exists
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dv == null) {
                 if (stmtTypeDrop.getIfExists()) {
                     if (warningCollector.shouldWarn()) {
@@ -2345,13 +2363,13 @@
                 }
             }
 
-            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+            Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, database, dataverseName, typeName);
             if (dt == null) {
                 if (!stmtTypeDrop.getIfExists()) {
                     throw new CompilationException(ErrorCode.UNKNOWN_TYPE, sourceLoc, typeName);
                 }
             } else {
-                MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
+                MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, database, dataverseName, typeName);
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
@@ -2432,15 +2450,16 @@
     protected CreateResult doCreateView(MetadataProvider metadataProvider, CreateViewStatement cvs,
             DataverseName dataverseName, String viewName, DataverseName itemTypeDataverseName, String itemTypeName,
             IStatementRewriter stmtRewriter, IRequestParameters requestParameters) throws Exception {
+        String database = null;
         SourceLocation sourceLoc = cvs.getSourceLocation();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dv == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName);
             }
-            Dataset existingDataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, viewName);
+            Dataset existingDataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, database, dataverseName, viewName);
             if (existingDataset != null) {
                 if (DatasetUtil.isNotView(existingDataset)) {
                     throw new CompilationException(ErrorCode.DATASET_EXISTS, sourceLoc,
@@ -2478,11 +2497,13 @@
                     for (CreateViewStatement.ForeignKeyDecl foreignKeyDecl : foreignKeyDecls) {
                         List<String> foreignKeyFields =
                                 ValidateUtil.validateViewKeyFields(foreignKeyDecl, itemType, true, sourceLoc);
+                        String refDatabase = null;
                         DataverseName refDataverseName = foreignKeyDecl.getReferencedDataverseName();
                         if (refDataverseName == null) {
                             refDataverseName = dataverseName;
                         } else {
-                            Dataverse refDataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, refDataverseName);
+                            Dataverse refDataverse =
+                                    MetadataManager.INSTANCE.getDataverse(mdTxnCtx, refDatabase, refDataverseName);
                             if (refDataverse == null) {
                                 throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc,
                                         refDataverseName);
@@ -2623,12 +2644,13 @@
 
     protected boolean doDropView(MetadataProvider metadataProvider, ViewDropStatement stmtViewDrop,
             DataverseName dataverseName, String viewName) throws Exception {
+        String database = null;
         SourceLocation sourceLoc = stmtViewDrop.getSourceLocation();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
             // Check if the dataverse exists
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dv == null) {
                 if (stmtViewDrop.getIfExists()) {
                     if (warningCollector.shouldWarn()) {
@@ -2654,10 +2676,11 @@
                 throw new CompilationException(ErrorCode.UNKNOWN_VIEW, sourceLoc,
                         DatasetUtil.getFullyQualifiedDisplayName(dataverseName, viewName));
             }
-            MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, viewName, false);
+            MetadataManager.INSTANCE.dropDataset(mdTxnCtx, database, dataverseName, viewName, false);
             if (TypeUtil.isDatasetInlineTypeName(dataset, dataset.getItemTypeDataverseName(),
                     dataset.getItemTypeName())) {
-                MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(),
+                String itemDatabase = null;
+                MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, itemDatabase, dataset.getItemTypeDataverseName(),
                         dataset.getItemTypeName());
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2710,12 +2733,13 @@
     protected CreateResult doCreateFunction(MetadataProvider metadataProvider, CreateFunctionStatement cfs,
             FunctionSignature functionSignature, IStatementRewriter stmtRewriter, IRequestParameters requestParameters)
             throws Exception {
+        String database = null;
         DataverseName dataverseName = functionSignature.getDataverseName();
         SourceLocation sourceLoc = cfs.getSourceLocation();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dv == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName);
             }
@@ -2799,12 +2823,14 @@
                     newInlineTypes.put(returnTypeSignature, returnInlineTypeEntity);
                 }
 
+                String libraryDatabase = null;
                 DataverseName libraryDataverseName = cfs.getLibraryDataverseName();
                 if (libraryDataverseName == null) {
                     libraryDataverseName = dataverseName;
                 }
                 String libraryName = cfs.getLibraryName();
-                Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, libraryDataverseName, libraryName);
+                Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, libraryDatabase, libraryDataverseName,
+                        libraryName);
                 if (library == null) {
                     throw new CompilationException(ErrorCode.UNKNOWN_LIBRARY, sourceLoc, libraryName);
                 }
@@ -2873,8 +2899,9 @@
                     Datatype newInlineType =
                             newInlineTypes.isEmpty() ? null : newInlineTypes.remove(existingInlineType);
                     if (newInlineType == null) {
-                        MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, existingInlineType.getDataverseName(),
-                                existingInlineType.getName());
+                        String existingInlineTypeDatabase = null;
+                        MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, existingInlineTypeDatabase,
+                                existingInlineType.getDataverseName(), existingInlineType.getName());
                     } else {
                         MetadataManager.INSTANCE.updateDatatype(mdTxnCtx, newInlineType);
                     }
@@ -2961,12 +2988,13 @@
 
     protected boolean doDropFunction(MetadataProvider metadataProvider, FunctionDropStatement stmtDropFunction,
             FunctionSignature signature, IRequestParameters requestParameters) throws Exception {
+        String database = null;
         DataverseName dataverseName = signature.getDataverseName();
         SourceLocation sourceLoc = stmtDropFunction.getSourceLocation();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
-            Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dataverse == null) {
                 if (stmtDropFunction.getIfExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2989,7 +3017,9 @@
 
             MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
             for (TypeSignature inlineType : inlineTypes) {
-                MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, inlineType.getDataverseName(), inlineType.getName());
+                String inlineTypeDatabase = null;
+                MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, inlineTypeDatabase, inlineType.getDataverseName(),
+                        inlineType.getName());
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             return true;
@@ -3026,13 +3056,15 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
+            String database = null;
             DataverseName dataverseName = getActiveDataverseName(cas.getDataverseName());
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dv == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName);
             }
             String adapterName = cas.getAdapterName();
-            DatasourceAdapter adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName);
+            DatasourceAdapter adapter =
+                    MetadataManager.INSTANCE.getAdapter(mdTxnCtx, database, dataverseName, adapterName);
             if (adapter != null) {
                 if (cas.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -3041,12 +3073,14 @@
                 throw new CompilationException(ErrorCode.ADAPTER_EXISTS, sourceLoc, adapterName);
             }
 
+            String libraryDatabase = null;
             DataverseName libraryDataverseName = cas.getLibraryDataverseName();
             if (libraryDataverseName == null) {
                 libraryDataverseName = dataverseName;
             }
             String libraryName = cas.getLibraryName();
-            Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, libraryDataverseName, libraryName);
+            Library library =
+                    MetadataManager.INSTANCE.getLibrary(mdTxnCtx, libraryDatabase, libraryDataverseName, libraryName);
             if (library == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_LIBRARY, sourceLoc, libraryName);
             }
@@ -3095,11 +3129,12 @@
 
     protected boolean doDropAdapter(MetadataProvider metadataProvider, AdapterDropStatement stmtDropAdapter,
             DataverseName dataverseName, String adapterName) throws Exception {
+        String database = null;
         SourceLocation sourceLoc = stmtDropAdapter.getSourceLocation();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
-            Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dataverse == null) {
                 if (stmtDropAdapter.getIfExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -3108,7 +3143,8 @@
                     throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName);
                 }
             }
-            DatasourceAdapter adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName);
+            DatasourceAdapter adapter =
+                    MetadataManager.INSTANCE.getAdapter(mdTxnCtx, database, dataverseName, adapterName);
             if (adapter == null) {
                 if (stmtDropAdapter.getIfExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -3118,7 +3154,7 @@
                 }
             }
 
-            MetadataManager.INSTANCE.dropAdapter(mdTxnCtx, dataverseName, adapterName);
+            MetadataManager.INSTANCE.dropAdapter(mdTxnCtx, database, dataverseName, adapterName);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             return true;
         } catch (Exception e) {
@@ -3149,6 +3185,7 @@
     protected CreateResult doCreateLibrary(MetadataProvider metadataProvider, DataverseName dataverseName,
             String libraryName, String libraryHash, CreateLibraryStatement cls, IHyracksClientConnection hcc,
             IRequestParameters requestParameters) throws Exception {
+        String database = null;
         JobUtils.ProgressState progress = ProgressState.NO_PROGRESS;
         boolean prepareJobSuccessful = false;
         JobSpecification abortJobSpec = null;
@@ -3157,12 +3194,12 @@
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dv == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, dataverseName);
             }
             ExternalFunctionLanguage language = cls.getLang();
-            existingLibrary = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverseName, libraryName);
+            existingLibrary = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, database, dataverseName, libraryName);
             if (existingLibrary != null && !cls.getReplaceIfExists()) {
                 throw new CompilationException(ErrorCode.COMPILATION_ERROR,
                         "A library with this name " + libraryName + " already exists.");
@@ -3238,7 +3275,7 @@
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 try {
                     if (existingLibrary == null) {
-                        MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName);
+                        MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, database, dataverseName, libraryName);
                     } else {
                         MetadataManager.INSTANCE.updateLibrary(mdTxnCtx, existingLibrary);
                     }
@@ -3280,12 +3317,13 @@
     protected boolean doDropLibrary(MetadataProvider metadataProvider, LibraryDropStatement stmtDropLibrary,
             DataverseName dataverseName, String libraryName, IHyracksClientConnection hcc,
             IRequestParameters requestParameters) throws Exception {
+        String database = null;
         JobUtils.ProgressState progress = ProgressState.NO_PROGRESS;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
-            Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dataverse == null) {
                 if (stmtDropLibrary.getIfExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -3295,7 +3333,7 @@
                             dataverseName);
                 }
             }
-            Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, dataverseName, libraryName);
+            Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, database, dataverseName, libraryName);
             if (library == null) {
                 if (stmtDropLibrary.getIfExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -3308,7 +3346,7 @@
 
             // #. mark the existing library as PendingDropOp
             // do drop instead of update because drop will fail if the library is used by functions/adapters
-            MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName);
+            MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, database, dataverseName, libraryName);
             MetadataManager.INSTANCE.addLibrary(mdTxnCtx, new Library(dataverseName, libraryName, library.getLanguage(),
                     library.getHash(), MetadataUtil.PENDING_DROP_OP));
 
@@ -3329,7 +3367,7 @@
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
             // #. drop library
-            MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName);
+            MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, database, dataverseName, libraryName);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             return true;
@@ -3341,7 +3379,7 @@
                 // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 try {
-                    MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName);
+                    MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, database, dataverseName, libraryName);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
@@ -3377,14 +3415,15 @@
     protected CreateResult doCreateSynonym(MetadataProvider metadataProvider, CreateSynonymStatement css,
             DataverseName dataverseName, String synonymName, DataverseName objectDataverseName, String objectName)
             throws Exception {
+        String database = null;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dv == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, css.getSourceLocation(), dataverseName);
             }
-            Synonym synonym = MetadataManager.INSTANCE.getSynonym(metadataProvider.getMetadataTxnContext(),
+            Synonym synonym = MetadataManager.INSTANCE.getSynonym(metadataProvider.getMetadataTxnContext(), database,
                     dataverseName, synonymName);
             if (synonym != null) {
                 if (css.getIfNotExists()) {
@@ -3426,10 +3465,11 @@
 
     protected boolean doDropSynonym(MetadataProvider metadataProvider, SynonymDropStatement stmtSynDrop,
             DataverseName dataverseName, String synonymName) throws Exception {
+        String database = null;
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
-            Synonym synonym = MetadataManager.INSTANCE.getSynonym(mdTxnCtx, dataverseName, synonymName);
+            Synonym synonym = MetadataManager.INSTANCE.getSynonym(mdTxnCtx, database, dataverseName, synonymName);
             if (synonym == null) {
                 if (stmtSynDrop.getIfExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -3437,7 +3477,7 @@
                 }
                 throw new CompilationException(ErrorCode.UNKNOWN_SYNONYM, stmtSynDrop.getSourceLocation(), synonymName);
             }
-            MetadataManager.INSTANCE.dropSynonym(mdTxnCtx, dataverseName, synonymName);
+            MetadataManager.INSTANCE.dropSynonym(mdTxnCtx, database, dataverseName, synonymName);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             return true;
         } catch (Exception e) {
@@ -3508,8 +3548,9 @@
                 throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, stmt.getSourceLocation(),
                         datasetName, dataverseName);
             }
-            Datatype itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(),
-                    dataset.getItemTypeName());
+            String itemTypeDatabase = null;
+            Datatype itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, itemTypeDatabase,
+                    dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
             // Copy statement with csv files will have a type expression
             if (copyStmt.getTypeExpr() != null) {
                 TypeExpression itemTypeExpr = copyStmt.getTypeExpr();
@@ -3794,6 +3835,7 @@
         SourceLocation sourceLoc = cfs.getSourceLocation();
         String feedName = cfs.getFeedName().getValue();
         metadataProvider.validateDatabaseObjectName(cfs.getDataverseName(), feedName, sourceLoc);
+        String database = null;
         DataverseName dataverseName = getActiveDataverseName(cfs.getDataverseName());
         if (isCompileOnly()) {
             return;
@@ -3802,8 +3844,8 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         lockUtil.createFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, feedName);
         try {
-            Feed feed =
-                    MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName, feedName);
+            Feed feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), database,
+                    dataverseName, feedName);
             if (feed != null) {
                 if (cfs.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -3836,6 +3878,7 @@
         SourceLocation sourceLoc = cfps.getSourceLocation();
         String policyName = cfps.getPolicyName();
         metadataProvider.validateDatabaseObjectName(null, policyName, sourceLoc);
+        String database = null;
         DataverseName dataverseName = getActiveDataverseName(null);
         if (isCompileOnly()) {
             return;
@@ -3845,7 +3888,7 @@
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE
-                    .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverseName, policyName);
+                    .getFeedPolicy(metadataProvider.getMetadataTxnContext(), database, dataverseName, policyName);
             if (feedPolicy != null) {
                 if (cfps.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -3859,10 +3902,10 @@
             String description = cfps.getDescription() == null ? "" : cfps.getDescription();
             if (extendingExisting) {
                 FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(
-                        metadataProvider.getMetadataTxnContext(), dataverseName, cfps.getSourcePolicyName());
+                        metadataProvider.getMetadataTxnContext(), database, dataverseName, cfps.getSourcePolicyName());
                 if (sourceFeedPolicy == null) {
                     sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
-                            MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
+                            database, MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
                     if (sourceFeedPolicy == null) {
                         throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
                                 "Unknown policy " + cfps.getSourcePolicyName());
@@ -3900,6 +3943,7 @@
         SourceLocation sourceLoc = stmtFeedDrop.getSourceLocation();
         String feedName = stmtFeedDrop.getFeedName().getValue();
         metadataProvider.validateDatabaseObjectName(stmtFeedDrop.getDataverseName(), feedName, sourceLoc);
+        String database = null;
         DataverseName dataverseName = getActiveDataverseName(stmtFeedDrop.getDataverseName());
         if (isCompileOnly()) {
             return;
@@ -3908,7 +3952,7 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         lockUtil.dropFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, feedName);
         try {
-            Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedName);
+            Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, database, dataverseName, feedName);
             if (feed == null) {
                 if (!stmtFeedDrop.getIfExists()) {
                     throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
@@ -3941,10 +3985,11 @@
         } else if (listener != null) {
             listener.unregister();
         }
-        JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
-                MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverseName(), feedId.getEntityName()));
+        String feedDatabase = null;
+        JobSpecification spec = FeedOperations.buildRemoveFeedStorageJob(metadataProvider, MetadataManager.INSTANCE
+                .getFeed(mdTxnCtx, feedDatabase, feedId.getDataverseName(), feedId.getEntityName()));
         runJob(hcc, spec);
-        MetadataManager.INSTANCE.dropFeed(mdTxnCtx, feed.getDataverseName(), feed.getFeedName());
+        MetadataManager.INSTANCE.dropFeed(mdTxnCtx, feedDatabase, feed.getDataverseName(), feed.getFeedName());
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("Removed feed " + feedId);
         }
@@ -3955,6 +4000,7 @@
         SourceLocation sourceLoc = stmtFeedPolicyDrop.getSourceLocation();
         String policyName = stmtFeedPolicyDrop.getPolicyName().getValue();
         metadataProvider.validateDatabaseObjectName(stmtFeedPolicyDrop.getDataverseName(), policyName, sourceLoc);
+        String database = null;
         DataverseName dataverseName = getActiveDataverseName(stmtFeedPolicyDrop.getDataverseName());
         if (isCompileOnly()) {
             return;
@@ -3963,7 +4009,8 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         lockUtil.dropFeedPolicyBegin(lockManager, metadataProvider.getLocks(), dataverseName, policyName);
         try {
-            FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName, policyName);
+            FeedPolicyEntity feedPolicy =
+                    MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, database, dataverseName, policyName);
             if (feedPolicy == null) {
                 if (!stmtFeedPolicyDrop.getIfExists()) {
                     throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
@@ -3972,7 +4019,7 @@
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 return;
             }
-            MetadataManager.INSTANCE.dropFeedPolicy(mdTxnCtx, dataverseName, policyName);
+            MetadataManager.INSTANCE.dropFeedPolicy(mdTxnCtx, database, dataverseName, policyName);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
@@ -3986,6 +4033,7 @@
             IHyracksClientConnection hcc) throws Exception {
         StartFeedStatement sfs = (StartFeedStatement) stmt;
         SourceLocation sourceLoc = sfs.getSourceLocation();
+        String database = null;
         DataverseName dataverseName = getActiveDataverseName(sfs.getDataverseName());
         String feedName = sfs.getFeedName().getValue();
         if (isCompileOnly()) {
@@ -4002,7 +4050,7 @@
             Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName,
                     metadataProvider.getMetadataTxnContext());
             List<FeedConnection> feedConnections = MetadataManager.INSTANCE
-                    .getFeedConections(metadataProvider.getMetadataTxnContext(), dataverseName, feedName);
+                    .getFeedConections(metadataProvider.getMetadataTxnContext(), database, dataverseName, feedName);
             if (feedConnections.isEmpty()) {
                 throw new CompilationException(ErrorCode.FEED_START_FEED_WITHOUT_CONNECTION, sourceLoc, feedName);
             }
@@ -4068,6 +4116,7 @@
         FeedConnection fc;
         ConnectFeedStatement cfs = (ConnectFeedStatement) stmt;
         SourceLocation sourceLoc = cfs.getSourceLocation();
+        String database = null;
         DataverseName dataverseName = getActiveDataverseName(cfs.getDataverseName());
         String feedName = cfs.getFeedName();
         String datasetName = cfs.getDatasetName().getValue();
@@ -4103,8 +4152,8 @@
                             func.getName());
                 }
             }
-            fc = MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(), dataverseName,
-                    feedName, datasetName);
+            fc = MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(), database,
+                    dataverseName, feedName, datasetName);
             if (fc != null) {
                 throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
                         "Feed" + feedName + " is already connected to " + dataset() + " " + datasetName);
@@ -4128,6 +4177,7 @@
     protected void handleDisconnectFeedStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
         DisconnectFeedStatement cfs = (DisconnectFeedStatement) stmt;
         SourceLocation sourceLoc = cfs.getSourceLocation();
+        String database = null;
         DataverseName dataverseName = getActiveDataverseName(cfs.getDataverseName());
         String datasetName = cfs.getDatasetName().getValue();
         String feedName = cfs.getFeedName().getValue();
@@ -4150,7 +4200,7 @@
             FeedMetadataUtil.validateIfDatasetExists(metadataProvider, dataverseName, cfs.getDatasetName().getValue());
             FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx);
             FeedConnection fc = MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(),
-                    dataverseName, feedName, datasetName);
+                    database, dataverseName, feedName, datasetName);
             Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
             if (ds == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName,
@@ -4160,7 +4210,7 @@
                 throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Feed " + feedName
                         + " is currently not connected to " + cfs.getDatasetName().getValue() + ". Invalid operation!");
             }
-            MetadataManager.INSTANCE.dropFeedConnection(mdTxnCtx, dataverseName, feedName, datasetName);
+            MetadataManager.INSTANCE.dropFeedConnection(mdTxnCtx, database, dataverseName, feedName, datasetName);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             if (listener != null) {
                 listener.remove(ds);
@@ -4194,6 +4244,7 @@
     protected void doAnalyzeDataset(MetadataProvider metadataProvider, AnalyzeStatement stmtAnalyze,
             DataverseName dataverseName, String datasetName, IHyracksClientConnection hcc,
             IRequestParameters requestParameters) throws Exception {
+        String database = null;
         SourceLocation sourceLoc = stmtAnalyze.getSourceLocation();
         ProgressState progressNewIndexCreate = ProgressState.NO_PROGRESS;
         ProgressState progressExistingIndexDrop = ProgressState.NO_PROGRESS;
@@ -4205,7 +4256,7 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
             // Check if the dataverse exists
-            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+            Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, database, dataverseName);
             if (dv == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName);
             }
@@ -4224,12 +4275,12 @@
             IndexType sampleIndexType = IndexType.SAMPLE;
             Pair<String, String> sampleIndexNames = IndexUtil.getSampleIndexNames(datasetName);
             String newIndexName;
-            existingIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                    datasetName, sampleIndexNames.first);
+            existingIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), database,
+                    dataverseName, datasetName, sampleIndexNames.first);
             if (existingIndex != null) {
                 newIndexName = sampleIndexNames.second;
             } else {
-                existingIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
+                existingIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), database,
                         dataverseName, datasetName, sampleIndexNames.second);
                 newIndexName = sampleIndexNames.first;
             }
@@ -4292,8 +4343,8 @@
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             // #. add same new index with PendingNoOp after deleting its entry with PendingAddOp
             MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
-                    newIndexPendingAdd.getDataverseName(), newIndexPendingAdd.getDatasetName(),
-                    newIndexPendingAdd.getIndexName());
+                    newIndexPendingAdd.getDatabaseName(), newIndexPendingAdd.getDataverseName(),
+                    newIndexPendingAdd.getDatasetName(), newIndexPendingAdd.getIndexName());
             MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), newIndexFinal);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -4305,7 +4356,8 @@
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
-                        existingIndex.getDataverseName(), existingIndex.getDatasetName(), existingIndex.getIndexName());
+                        existingIndex.getDatabaseName(), existingIndex.getDataverseName(),
+                        existingIndex.getDatasetName(), existingIndex.getIndexName());
                 existingIndex.setPendingOp(MetadataUtil.PENDING_DROP_OP);
                 MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), existingIndex);
                 existingIndexDropSpec = IndexUtil.buildDropIndexJobSpec(existingIndex, metadataProvider, ds, sourceLoc);
@@ -4321,7 +4373,8 @@
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
-                        existingIndex.getDataverseName(), existingIndex.getDatasetName(), existingIndex.getIndexName());
+                        existingIndex.getDatabaseName(), existingIndex.getDataverseName(),
+                        existingIndex.getDatasetName(), existingIndex.getIndexName());
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 progressExistingIndexDrop = ProgressState.NO_PROGRESS;
@@ -4346,8 +4399,8 @@
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
                     MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
-                            existingIndex.getDataverseName(), existingIndex.getDatasetName(),
-                            existingIndex.getIndexName());
+                            existingIndex.getDatabaseName(), existingIndex.getDataverseName(),
+                            existingIndex.getDatasetName(), existingIndex.getIndexName());
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
@@ -4378,8 +4431,8 @@
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
                     MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
-                            newIndexPendingAdd.getDataverseName(), newIndexPendingAdd.getDatasetName(),
-                            newIndexPendingAdd.getIndexName());
+                            newIndexPendingAdd.getDatabaseName(), newIndexPendingAdd.getDataverseName(),
+                            newIndexPendingAdd.getDatasetName(), newIndexPendingAdd.getIndexName());
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
@@ -4415,6 +4468,7 @@
     protected boolean doAnalyzeDatasetDrop(MetadataProvider metadataProvider, AnalyzeDropStatement stmtIndexDrop,
             DataverseName dataverseName, String datasetName, IHyracksClientConnection hcc,
             IRequestParameters requestParams) throws Exception {
+        String database = null;
         SourceLocation sourceLoc = stmtIndexDrop.getSourceLocation();
         Pair<String, String> sampleIndexNames = IndexUtil.getSampleIndexNames(datasetName);
         String indexName1 = sampleIndexNames.first;
@@ -4435,8 +4489,10 @@
             if (ds.getDatasetType() != DatasetType.INTERNAL) {
                 throw new CompilationException(ErrorCode.OPERATION_NOT_SUPPORTED, sourceLoc);
             }
-            Index index1 = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName1);
-            Index index2 = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName2);
+            Index index1 =
+                    MetadataManager.INSTANCE.getIndex(mdTxnCtx, database, dataverseName, datasetName, indexName1);
+            Index index2 =
+                    MetadataManager.INSTANCE.getIndex(mdTxnCtx, database, dataverseName, datasetName, indexName2);
             index1Exists = index1 != null;
             index2Exists = index2 != null;
             if (!index1Exists && !index2Exists) {
@@ -4466,10 +4522,10 @@
 
             // #. finally, delete the existing indexes
             if (index1Exists) {
-                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName1);
+                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, database, dataverseName, datasetName, indexName1);
             }
             if (index2Exists) {
-                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName2);
+                MetadataManager.INSTANCE.dropIndex(mdTxnCtx, database, dataverseName, datasetName, indexName2);
             }
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -4495,12 +4551,12 @@
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
                     if (index1Exists) {
-                        MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                                datasetName, indexName1);
+                        MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), database,
+                                dataverseName, datasetName, indexName1);
                     }
                     if (index2Exists) {
-                        MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                                datasetName, indexName2);
+                        MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), database,
+                                dataverseName, datasetName, indexName2);
                     }
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
@@ -4531,12 +4587,13 @@
     private void prepareIndexDrop(MetadataProvider metadataProvider, DataverseName dataverseName, String datasetName,
             SourceLocation sourceLoc, String indexName, List<JobSpecification> jobsToExecute,
             MetadataTransactionContext mdTxnCtx, Dataset ds, Index index) throws AlgebricksException {
+        String database = null;
         if (index != null) {
             // #. prepare a job to drop the index in NC.
             jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds, sourceLoc));
 
             // #. mark PendingDropOp on the existing index
-            MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+            MetadataManager.INSTANCE.dropIndex(mdTxnCtx, database, dataverseName, datasetName, indexName);
             MetadataManager.INSTANCE.addIndex(mdTxnCtx,
                     new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getIndexDetails(),
                             index.isEnforced(), index.isPrimaryIndex(), MetadataUtil.PENDING_DROP_OP));
@@ -4547,6 +4604,7 @@
             IHyracksClientConnection hcc) throws Exception {
         CompactStatement compactStatement = (CompactStatement) stmt;
         SourceLocation sourceLoc = compactStatement.getSourceLocation();
+        String database = null;
         DataverseName dataverseName = getActiveDataverseName(compactStatement.getDataverseName());
         String datasetName = compactStatement.getDatasetName().getValue();
         if (isCompileOnly()) {
@@ -4564,13 +4622,14 @@
                         dataverseName);
             }
             // Prepare jobs to compact the datatset and its indexes
-            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
+            List<Index> indexes =
+                    MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, database, dataverseName, datasetName);
             if (indexes.isEmpty()) {
                 throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
                         "Cannot compact the external " + dataset() + " " + datasetName + " because it has no indexes");
             }
-            Dataverse dataverse =
-                    MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dataverseName);
+            Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
+                    database, dataverseName);
             jobsToExecute.add(DatasetUtil.compactDatasetJobSpec(dataverse, datasetName, metadataProvider));
 
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index df2c25d..ed9d0c6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -137,8 +137,8 @@
         List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
         IntOpenHashSet validDatasetIds = new IntOpenHashSet();
         for (Dataverse dataverse : dataverses) {
-            List<Dataset> dataverseDatasets =
-                    MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverse.getDataverseName());
+            List<Dataset> dataverseDatasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
+                    dataverse.getDatabaseName(), dataverse.getDataverseName());
             dataverseDatasets.stream().filter(DatasetUtil::isNotView).mapToInt(Dataset::getDatasetId)
                     .forEach(validDatasetIds::add);
         }
@@ -159,7 +159,7 @@
         // Loop over datasets
         for (Dataverse dataverse : MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) {
             // Fixes ASTERIXDB-2386 by caching the dataverse during recovery
-            MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse.getDataverseName());
+            MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse.getDatabaseName(), dataverse.getDataverseName());
         }
         return mdTxnCtx;
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 837d2cb..2b78526 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -465,7 +465,7 @@
             // drop dataset entry from metadata
             runMetadataTransaction(metadataProvider,
                     () -> MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(),
-                            dataset.getDataverseName(), dataset.getDatasetName(), true));
+                            dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName(), true));
             MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
             // try to drop the dataset's node group
             runMetadataTransaction(metadataProvider, () -> tryDropDatasetNodegroup(dataset, metadataProvider));
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
index bb52946..ba89e4c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
@@ -152,7 +152,7 @@
             MetadataProvider metadataProver = MetadataProvider.create(appCtx, null);
             metadataProver.setMetadataTxnContext(mdTxn);
             final DataverseName defaultDv = MetadataBuiltinEntities.DEFAULT_DATAVERSE.getDataverseName();
-            final Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxn, defaultDv, datasetName);
+            final Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxn, null, defaultDv, datasetName);
             MetadataManager.INSTANCE.commitTransaction(mdTxn);
             FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), dataset,
                     indexName, Arrays.asList("asterix_nc1"));
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 83c374f..547be91 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -179,9 +179,10 @@
     }
 
     @Override
-    public void dropDataverse(MetadataTransactionContext ctx, DataverseName dataverseName) throws AlgebricksException {
+    public void dropDataverse(MetadataTransactionContext ctx, String database, DataverseName dataverseName)
+            throws AlgebricksException {
         try {
-            metadataNode.dropDataverse(ctx.getTxnId(), null, dataverseName);
+            metadataNode.dropDataverse(ctx.getTxnId(), database, dataverseName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -189,10 +190,10 @@
     }
 
     @Override
-    public boolean isDataverseNotEmpty(MetadataTransactionContext ctx, DataverseName dataverseName)
+    public boolean isDataverseNotEmpty(MetadataTransactionContext ctx, String database, DataverseName dataverseName)
             throws AlgebricksException {
         try {
-            return metadataNode.isDataverseNotEmpty(ctx.getTxnId(), null, dataverseName);
+            return metadataNode.isDataverseNotEmpty(ctx.getTxnId(), database, dataverseName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -208,8 +209,10 @@
     }
 
     @Override
-    public Dataverse getDataverse(MetadataTransactionContext ctx, DataverseName dataverseName)
+    public Dataverse getDataverse(MetadataTransactionContext ctx, String database, DataverseName dataverseName)
             throws AlgebricksException {
+        //TODO(DB): change cache to consider database
+
         // First look in the context to see if this transaction created the
         // requested dataverse itself (but the dataverse is still uncommitted).
         Dataverse dataverse = ctx.getDataverse(dataverseName);
@@ -229,7 +232,7 @@
             return dataverse;
         }
         try {
-            dataverse = metadataNode.getDataverse(ctx.getTxnId(), null, dataverseName);
+            dataverse = metadataNode.getDataverse(ctx.getTxnId(), database, dataverseName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -242,12 +245,12 @@
     }
 
     @Override
-    public List<Dataset> getDataverseDatasets(MetadataTransactionContext ctx, DataverseName dataverseName)
-            throws AlgebricksException {
+    public List<Dataset> getDataverseDatasets(MetadataTransactionContext ctx, String database,
+            DataverseName dataverseName) throws AlgebricksException {
         List<Dataset> dataverseDatasets;
         try {
             // Assuming that the transaction can read its own writes on the metadata node.
-            dataverseDatasets = metadataNode.getDataverseDatasets(ctx.getTxnId(), null, dataverseName);
+            dataverseDatasets = metadataNode.getDataverseDatasets(ctx.getTxnId(), database, dataverseName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -269,10 +272,10 @@
     }
 
     @Override
-    public void dropDataset(MetadataTransactionContext ctx, DataverseName dataverseName, String datasetName,
-            boolean force) throws AlgebricksException {
+    public void dropDataset(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String datasetName, boolean force) throws AlgebricksException {
         try {
-            metadataNode.dropDataset(ctx.getTxnId(), null, dataverseName, datasetName, force);
+            metadataNode.dropDataset(ctx.getTxnId(), database, dataverseName, datasetName, force);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -281,8 +284,8 @@
     }
 
     @Override
-    public Dataset getDataset(MetadataTransactionContext ctx, DataverseName dataverseName, String datasetName)
-            throws AlgebricksException {
+    public Dataset getDataset(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String datasetName) throws AlgebricksException {
 
         // First look in the context to see if this transaction created the
         // requested dataset itself (but the dataset is still uncommitted).
@@ -304,7 +307,7 @@
             return dataset;
         }
         try {
-            dataset = metadataNode.getDataset(ctx.getTxnId(), null, dataverseName, datasetName);
+            dataset = metadataNode.getDataset(ctx.getTxnId(), database, dataverseName, datasetName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -317,15 +320,15 @@
     }
 
     @Override
-    public List<Index> getDatasetIndexes(MetadataTransactionContext ctx, DataverseName dataverseName,
+    public List<Index> getDatasetIndexes(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
             String datasetName) throws AlgebricksException {
-        Dataset dataset = getDataset(ctx, dataverseName, datasetName);
+        Dataset dataset = getDataset(ctx, database, dataverseName, datasetName);
         if (dataset == null) {
             return Collections.emptyList();
         }
         List<Index> datasetIndexes;
         try {
-            datasetIndexes = metadataNode.getDatasetIndexes(ctx.getTxnId(), null, dataverseName, datasetName);
+            datasetIndexes = metadataNode.getDatasetIndexes(ctx.getTxnId(), database, dataverseName, datasetName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -344,11 +347,11 @@
     }
 
     @Override
-    public CompactionPolicy getCompactionPolicy(MetadataTransactionContext ctx, DataverseName dataverse,
-            String policyName) throws AlgebricksException {
+    public CompactionPolicy getCompactionPolicy(MetadataTransactionContext ctx, String database,
+            DataverseName dataverse, String policyName) throws AlgebricksException {
         CompactionPolicy compactionPolicy;
         try {
-            compactionPolicy = metadataNode.getCompactionPolicy(ctx.getTxnId(), null, dataverse, policyName);
+            compactionPolicy = metadataNode.getCompactionPolicy(ctx.getTxnId(), database, dataverse, policyName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -371,10 +374,10 @@
     }
 
     @Override
-    public void dropDatatype(MetadataTransactionContext ctx, DataverseName dataverseName, String datatypeName)
-            throws AlgebricksException {
+    public void dropDatatype(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String datatypeName) throws AlgebricksException {
         try {
-            metadataNode.dropDatatype(ctx.getTxnId(), null, dataverseName, datatypeName);
+            metadataNode.dropDatatype(ctx.getTxnId(), database, dataverseName, datatypeName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -382,8 +385,8 @@
     }
 
     @Override
-    public Datatype getDatatype(MetadataTransactionContext ctx, DataverseName dataverseName, String datatypeName)
-            throws AlgebricksException {
+    public Datatype getDatatype(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String datatypeName) throws AlgebricksException {
         // First look in the context to see if this transaction created the
         // requested datatype itself (but the datatype is still uncommitted).
         Datatype datatype = ctx.getDatatype(dataverseName, datatypeName);
@@ -404,7 +407,7 @@
             return datatype;
         }
         try {
-            datatype = metadataNode.getDatatype(ctx.getTxnId(), null, dataverseName, datatypeName);
+            datatype = metadataNode.getDatatype(ctx.getTxnId(), database, dataverseName, datatypeName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -437,10 +440,10 @@
     }
 
     @Override
-    public void dropIndex(MetadataTransactionContext ctx, DataverseName dataverseName, String datasetName,
-            String indexName) throws AlgebricksException {
+    public void dropIndex(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String datasetName, String indexName) throws AlgebricksException {
         try {
-            metadataNode.dropIndex(ctx.getTxnId(), null, dataverseName, datasetName, indexName);
+            metadataNode.dropIndex(ctx.getTxnId(), database, dataverseName, datasetName, indexName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -448,8 +451,8 @@
     }
 
     @Override
-    public Index getIndex(MetadataTransactionContext ctx, DataverseName dataverseName, String datasetName,
-            String indexName) throws AlgebricksException {
+    public Index getIndex(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String datasetName, String indexName) throws AlgebricksException {
 
         // First look in the context to see if this transaction created the
         // requested index itself (but the index is still uncommitted).
@@ -472,7 +475,7 @@
             return index;
         }
         try {
-            index = metadataNode.getIndex(ctx.getTxnId(), null, dataverseName, datasetName, indexName);
+            index = metadataNode.getIndex(ctx.getTxnId(), database, dataverseName, datasetName, indexName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -621,10 +624,10 @@
     }
 
     @Override
-    public List<Function> getDataverseFunctions(MetadataTransactionContext ctx, DataverseName dataverseName)
-            throws AlgebricksException {
+    public List<Function> getDataverseFunctions(MetadataTransactionContext ctx, String database,
+            DataverseName dataverseName) throws AlgebricksException {
         try {
-            return metadataNode.getDataverseFunctions(ctx.getTxnId(), null, dataverseName);
+            return metadataNode.getDataverseFunctions(ctx.getTxnId(), database, dataverseName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -642,10 +645,10 @@
     }
 
     @Override
-    public void dropFullTextFilter(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName, String filterName)
-            throws AlgebricksException {
+    public void dropFullTextFilter(MetadataTransactionContext mdTxnCtx, String database, DataverseName dataverseName,
+            String filterName) throws AlgebricksException {
         try {
-            metadataNode.dropFullTextFilter(mdTxnCtx.getTxnId(), null, dataverseName, filterName);
+            metadataNode.dropFullTextFilter(mdTxnCtx.getTxnId(), database, dataverseName, filterName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -653,8 +656,8 @@
     }
 
     @Override
-    public FullTextFilterMetadataEntity getFullTextFilter(MetadataTransactionContext ctx, DataverseName dataverseName,
-            String filterName) throws AlgebricksException {
+    public FullTextFilterMetadataEntity getFullTextFilter(MetadataTransactionContext ctx, String database,
+            DataverseName dataverseName, String filterName) throws AlgebricksException {
         // First look in the context to see if this transaction created the
         // requested full-text filter itself (but the full-text filter is still uncommitted).
         FullTextFilterMetadataEntity filter = ctx.getFullTextFilter(dataverseName, filterName);
@@ -683,7 +686,7 @@
         }
 
         try {
-            filter = metadataNode.getFullTextFilter(ctx.getTxnId(), null, dataverseName, filterName);
+            filter = metadataNode.getFullTextFilter(ctx.getTxnId(), database, dataverseName, filterName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -711,8 +714,8 @@
     }
 
     @Override
-    public FullTextConfigMetadataEntity getFullTextConfig(MetadataTransactionContext ctx, DataverseName dataverseName,
-            String configName) throws AlgebricksException {
+    public FullTextConfigMetadataEntity getFullTextConfig(MetadataTransactionContext ctx, String database,
+            DataverseName dataverseName, String configName) throws AlgebricksException {
         // First look in the context to see if this transaction created the
         // requested full-text config itself (but the full-text config is still uncommitted).
         FullTextConfigMetadataEntity configMetadataEntity = ctx.getFullTextConfig(dataverseName, configName);
@@ -741,7 +744,7 @@
         }
 
         try {
-            configMetadataEntity = metadataNode.getFullTextConfig(ctx.getTxnId(), null, dataverseName, configName);
+            configMetadataEntity = metadataNode.getFullTextConfig(ctx.getTxnId(), database, dataverseName, configName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -755,10 +758,10 @@
     }
 
     @Override
-    public void dropFullTextConfig(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName, String configName)
-            throws AlgebricksException {
+    public void dropFullTextConfig(MetadataTransactionContext mdTxnCtx, String database, DataverseName dataverseName,
+            String configName) throws AlgebricksException {
         try {
-            metadataNode.dropFullTextConfig(mdTxnCtx.getTxnId(), null, dataverseName, configName);
+            metadataNode.dropFullTextConfig(mdTxnCtx.getTxnId(), database, dataverseName, configName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -795,10 +798,10 @@
     }
 
     @Override
-    public void dropAdapter(MetadataTransactionContext ctx, DataverseName dataverseName, String name)
+    public void dropAdapter(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String name)
             throws AlgebricksException {
         try {
-            metadataNode.dropAdapter(ctx.getTxnId(), null, dataverseName, name);
+            metadataNode.dropAdapter(ctx.getTxnId(), database, dataverseName, name);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -806,11 +809,11 @@
     }
 
     @Override
-    public DatasourceAdapter getAdapter(MetadataTransactionContext ctx, DataverseName dataverseName, String name)
-            throws AlgebricksException {
+    public DatasourceAdapter getAdapter(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String name) throws AlgebricksException {
         DatasourceAdapter adapter;
         try {
-            adapter = metadataNode.getAdapter(ctx.getTxnId(), null, dataverseName, name);
+            adapter = metadataNode.getAdapter(ctx.getTxnId(), database, dataverseName, name);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -818,10 +821,10 @@
     }
 
     @Override
-    public void dropLibrary(MetadataTransactionContext ctx, DataverseName dataverseName, String libraryName)
-            throws AlgebricksException {
+    public void dropLibrary(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String libraryName) throws AlgebricksException {
         try {
-            metadataNode.dropLibrary(ctx.getTxnId(), null, dataverseName, libraryName);
+            metadataNode.dropLibrary(ctx.getTxnId(), database, dataverseName, libraryName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -829,13 +832,13 @@
     }
 
     @Override
-    public List<Library> getDataverseLibraries(MetadataTransactionContext ctx, DataverseName dataverseName)
-            throws AlgebricksException {
+    public List<Library> getDataverseLibraries(MetadataTransactionContext ctx, String database,
+            DataverseName dataverseName) throws AlgebricksException {
         List<Library> dataverseLibaries;
         try {
             // Assuming that the transaction can read its own writes on the
             // metadata node.
-            dataverseLibaries = metadataNode.getDataverseLibraries(ctx.getTxnId(), null, dataverseName);
+            dataverseLibaries = metadataNode.getDataverseLibraries(ctx.getTxnId(), database, dataverseName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -855,11 +858,11 @@
     }
 
     @Override
-    public Library getLibrary(MetadataTransactionContext ctx, DataverseName dataverseName, String libraryName)
-            throws AlgebricksException {
+    public Library getLibrary(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String libraryName) throws AlgebricksException {
         Library library;
         try {
-            library = metadataNode.getLibrary(ctx.getTxnId(), null, dataverseName, libraryName);
+            library = metadataNode.getLibrary(ctx.getTxnId(), database, dataverseName, libraryName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -867,11 +870,11 @@
     }
 
     @Override
-    public FeedPolicyEntity getFeedPolicy(MetadataTransactionContext ctx, DataverseName dataverseName,
+    public FeedPolicyEntity getFeedPolicy(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
             String policyName) throws AlgebricksException {
         FeedPolicyEntity feedPolicy;
         try {
-            feedPolicy = metadataNode.getFeedPolicy(ctx.getTxnId(), null, dataverseName, policyName);
+            feedPolicy = metadataNode.getFeedPolicy(ctx.getTxnId(), database, dataverseName, policyName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -879,11 +882,11 @@
     }
 
     @Override
-    public Feed getFeed(MetadataTransactionContext ctx, DataverseName dataverseName, String feedName)
+    public Feed getFeed(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String feedName)
             throws AlgebricksException {
         Feed feed;
         try {
-            feed = metadataNode.getFeed(ctx.getTxnId(), null, dataverseName, feedName);
+            feed = metadataNode.getFeed(ctx.getTxnId(), database, dataverseName, feedName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -891,10 +894,11 @@
     }
 
     @Override
-    public List<Feed> getFeeds(MetadataTransactionContext ctx, DataverseName dataverseName) throws AlgebricksException {
+    public List<Feed> getFeeds(MetadataTransactionContext ctx, String database, DataverseName dataverseName)
+            throws AlgebricksException {
         List<Feed> feeds;
         try {
-            feeds = metadataNode.getFeeds(ctx.getTxnId(), null, dataverseName);
+            feeds = metadataNode.getFeeds(ctx.getTxnId(), database, dataverseName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -902,16 +906,16 @@
     }
 
     @Override
-    public void dropFeed(MetadataTransactionContext ctx, DataverseName dataverseName, String feedName)
+    public void dropFeed(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String feedName)
             throws AlgebricksException {
         Feed feed;
         List<FeedConnection> feedConnections;
         try {
-            feed = metadataNode.getFeed(ctx.getTxnId(), null, dataverseName, feedName);
-            feedConnections = metadataNode.getFeedConnections(ctx.getTxnId(), null, dataverseName, feedName);
-            metadataNode.dropFeed(ctx.getTxnId(), null, dataverseName, feedName);
+            feed = metadataNode.getFeed(ctx.getTxnId(), database, dataverseName, feedName);
+            feedConnections = metadataNode.getFeedConnections(ctx.getTxnId(), database, dataverseName, feedName);
+            metadataNode.dropFeed(ctx.getTxnId(), database, dataverseName, feedName);
             for (FeedConnection feedConnection : feedConnections) {
-                metadataNode.dropFeedConnection(ctx.getTxnId(), null, dataverseName, feedName,
+                metadataNode.dropFeedConnection(ctx.getTxnId(), database, dataverseName, feedName,
                         feedConnection.getDatasetName());
                 ctx.dropFeedConnection(dataverseName, feedName, feedConnection.getDatasetName());
             }
@@ -943,10 +947,10 @@
     }
 
     @Override
-    public void dropFeedConnection(MetadataTransactionContext ctx, DataverseName dataverseName, String feedName,
-            String datasetName) throws AlgebricksException {
+    public void dropFeedConnection(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String feedName, String datasetName) throws AlgebricksException {
         try {
-            metadataNode.dropFeedConnection(ctx.getTxnId(), null, dataverseName, feedName, datasetName);
+            metadataNode.dropFeedConnection(ctx.getTxnId(), database, dataverseName, feedName, datasetName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -954,31 +958,31 @@
     }
 
     @Override
-    public FeedConnection getFeedConnection(MetadataTransactionContext ctx, DataverseName dataverseName,
-            String feedName, String datasetName) throws AlgebricksException {
+    public FeedConnection getFeedConnection(MetadataTransactionContext ctx, String database,
+            DataverseName dataverseName, String feedName, String datasetName) throws AlgebricksException {
         try {
-            return metadataNode.getFeedConnection(ctx.getTxnId(), null, dataverseName, feedName, datasetName);
+            return metadataNode.getFeedConnection(ctx.getTxnId(), database, dataverseName, feedName, datasetName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
     @Override
-    public List<FeedConnection> getFeedConections(MetadataTransactionContext ctx, DataverseName dataverseName,
-            String feedName) throws AlgebricksException {
+    public List<FeedConnection> getFeedConections(MetadataTransactionContext ctx, String database,
+            DataverseName dataverseName, String feedName) throws AlgebricksException {
         try {
-            return metadataNode.getFeedConnections(ctx.getTxnId(), null, dataverseName, feedName);
+            return metadataNode.getFeedConnections(ctx.getTxnId(), database, dataverseName, feedName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
     @Override
-    public List<DatasourceAdapter> getDataverseAdapters(MetadataTransactionContext mdTxnCtx,
+    public List<DatasourceAdapter> getDataverseAdapters(MetadataTransactionContext mdTxnCtx, String database,
             DataverseName dataverseName) throws AlgebricksException {
         List<DatasourceAdapter> dataverseAdapters;
         try {
-            dataverseAdapters = metadataNode.getDataverseAdapters(mdTxnCtx.getTxnId(), null, dataverseName);
+            dataverseAdapters = metadataNode.getDataverseAdapters(mdTxnCtx.getTxnId(), database, dataverseName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -986,12 +990,12 @@
     }
 
     @Override
-    public void dropFeedPolicy(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName, String policyName)
-            throws AlgebricksException {
+    public void dropFeedPolicy(MetadataTransactionContext mdTxnCtx, String database, DataverseName dataverseName,
+            String policyName) throws AlgebricksException {
         FeedPolicyEntity feedPolicy;
         try {
-            feedPolicy = metadataNode.getFeedPolicy(mdTxnCtx.getTxnId(), null, dataverseName, policyName);
-            metadataNode.dropFeedPolicy(mdTxnCtx.getTxnId(), null, dataverseName, policyName);
+            feedPolicy = metadataNode.getFeedPolicy(mdTxnCtx.getTxnId(), database, dataverseName, policyName);
+            metadataNode.dropFeedPolicy(mdTxnCtx.getTxnId(), database, dataverseName, policyName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -999,11 +1003,11 @@
     }
 
     @Override
-    public List<FeedPolicyEntity> getDataverseFeedPolicies(MetadataTransactionContext mdTxnCtx,
+    public List<FeedPolicyEntity> getDataverseFeedPolicies(MetadataTransactionContext mdTxnCtx, String database,
             DataverseName dataverseName) throws AlgebricksException {
         List<FeedPolicyEntity> dataverseFeedPolicies;
         try {
-            dataverseFeedPolicies = metadataNode.getDataverseFeedPolicies(mdTxnCtx.getTxnId(), null, dataverseName);
+            dataverseFeedPolicies = metadataNode.getDataverseFeedPolicies(mdTxnCtx.getTxnId(), database, dataverseName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -1034,19 +1038,19 @@
     @Override
     public void dropExternalFile(MetadataTransactionContext ctx, ExternalFile externalFile) throws AlgebricksException {
         try {
-            metadataNode.dropExternalFile(ctx.getTxnId(), null, externalFile.getDataverseName(),
-                    externalFile.getDatasetName(), externalFile.getFileNumber());
+            metadataNode.dropExternalFile(ctx.getTxnId(), externalFile.getDatabaseName(),
+                    externalFile.getDataverseName(), externalFile.getDatasetName(), externalFile.getFileNumber());
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
     @Override
-    public ExternalFile getExternalFile(MetadataTransactionContext ctx, DataverseName dataverseName, String datasetName,
-            Integer fileNumber) throws AlgebricksException {
+    public ExternalFile getExternalFile(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String datasetName, Integer fileNumber) throws AlgebricksException {
         ExternalFile file;
         try {
-            file = metadataNode.getExternalFile(ctx.getTxnId(), null, dataverseName, datasetName, fileNumber);
+            file = metadataNode.getExternalFile(ctx.getTxnId(), database, dataverseName, datasetName, fileNumber);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
@@ -1063,30 +1067,30 @@
     }
 
     @Override
-    public void dropSynonym(MetadataTransactionContext ctx, DataverseName dataverseName, String synonymName)
-            throws AlgebricksException {
+    public void dropSynonym(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String synonymName) throws AlgebricksException {
         try {
-            metadataNode.dropSynonym(ctx.getTxnId(), null, dataverseName, synonymName);
+            metadataNode.dropSynonym(ctx.getTxnId(), database, dataverseName, synonymName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
     @Override
-    public Synonym getSynonym(MetadataTransactionContext ctx, DataverseName dataverseName, String synonymName)
-            throws AlgebricksException {
+    public Synonym getSynonym(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String synonymName) throws AlgebricksException {
         try {
-            return metadataNode.getSynonym(ctx.getTxnId(), null, dataverseName, synonymName);
+            return metadataNode.getSynonym(ctx.getTxnId(), database, dataverseName, synonymName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
     @Override
-    public List<Synonym> getDataverseSynonyms(MetadataTransactionContext ctx, DataverseName dataverseName)
-            throws AlgebricksException {
+    public List<Synonym> getDataverseSynonyms(MetadataTransactionContext ctx, String database,
+            DataverseName dataverseName) throws AlgebricksException {
         try {
-            return metadataNode.getDataverseSynonyms(ctx.getTxnId(), null, dataverseName);
+            return metadataNode.getDataverseSynonyms(ctx.getTxnId(), database, dataverseName);
         } catch (RemoteException e) {
             throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 519a29d..b8d1203 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -1890,12 +1890,11 @@
         return createTuple(dataverseName.getCanonicalForm(), rest);
     }
 
-    private static ITupleReference createTuple(String databaseName, DataverseName dataverseName, String... rest) {
-        //TODO(DB): pass mdIndexesProvider and use it instead of checking for null
-        if (databaseName == null) {
-            return createTuple(dataverseName.getCanonicalForm(), rest);
-        } else {
+    private ITupleReference createTuple(String databaseName, DataverseName dataverseName, String... rest) {
+        if (mdIndexesProvider.isUsingDatabase()) {
             return createDatabaseTuple(databaseName, dataverseName, rest);
+        } else {
+            return createTuple(dataverseName.getCanonicalForm(), rest);
         }
     }
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index 5ff8a03..a109785 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -122,7 +122,8 @@
      * @throws AlgebricksException
      *             For example, if the dataverse does not exist.
      */
-    Dataverse getDataverse(MetadataTransactionContext ctx, DataverseName dataverseName) throws AlgebricksException;
+    Dataverse getDataverse(MetadataTransactionContext ctx, String database, DataverseName dataverseName)
+            throws AlgebricksException;
 
     /**
      * Retrieves all datasets belonging to the given dataverse.
@@ -135,7 +136,7 @@
      * @throws AlgebricksException
      *             For example, if the dataverse does not exist.
      */
-    List<Dataset> getDataverseDatasets(MetadataTransactionContext ctx, DataverseName dataverseName)
+    List<Dataset> getDataverseDatasets(MetadataTransactionContext ctx, String database, DataverseName dataverseName)
             throws AlgebricksException;
 
     /**
@@ -149,7 +150,8 @@
      * @throws AlgebricksException
      *             For example, if the dataverse does not exist.
      */
-    void dropDataverse(MetadataTransactionContext ctx, DataverseName dataverseName) throws AlgebricksException;
+    void dropDataverse(MetadataTransactionContext ctx, String database, DataverseName dataverseName)
+            throws AlgebricksException;
 
     /**
      * Returns {@code true} if the dataverse with given name is not empty
@@ -159,7 +161,8 @@
      * @param dataverseName
      *            Name of the dataverse.
      */
-    boolean isDataverseNotEmpty(MetadataTransactionContext ctx, DataverseName dataverseName) throws AlgebricksException;
+    boolean isDataverseNotEmpty(MetadataTransactionContext ctx, String database, DataverseName dataverseName)
+            throws AlgebricksException;
 
     /**
      * Inserts a new dataset into the metadata.
@@ -186,7 +189,7 @@
      * @throws AlgebricksException
      *             For example, if the dataset does not exist.
      */
-    Dataset getDataset(MetadataTransactionContext ctx, DataverseName dataverseName, String datasetName)
+    Dataset getDataset(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String datasetName)
             throws AlgebricksException;
 
     /**
@@ -202,8 +205,8 @@
      * @throws AlgebricksException
      *             For example, if the dataset and/or dataverse does not exist.
      */
-    List<Index> getDatasetIndexes(MetadataTransactionContext ctx, DataverseName dataverseName, String datasetName)
-            throws AlgebricksException;
+    List<Index> getDatasetIndexes(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String datasetName) throws AlgebricksException;
 
     /**
      * Deletes the dataset with given name, and all it's associated indexes.
@@ -219,8 +222,8 @@
      * @throws AlgebricksException
      *             For example, if the dataset and/or dataverse does not exist.
      */
-    void dropDataset(MetadataTransactionContext ctx, DataverseName dataverseName, String datasetName, boolean force)
-            throws AlgebricksException;
+    void dropDataset(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String datasetName,
+            boolean force) throws AlgebricksException;
 
     /**
      * Inserts an index into the metadata. The index itself knows its name, and
@@ -250,8 +253,8 @@
      * @throws AlgebricksException
      *             For example, if the index does not exist.
      */
-    Index getIndex(MetadataTransactionContext ctx, DataverseName dataverseName, String datasetName, String indexName)
-            throws AlgebricksException;
+    Index getIndex(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String datasetName,
+            String indexName) throws AlgebricksException;
 
     /**
      * Deletes the index with given name, in given dataverse and dataset.
@@ -267,8 +270,8 @@
      * @throws AlgebricksException
      *             For example, if the index does not exist.
      */
-    void dropIndex(MetadataTransactionContext ctx, DataverseName dataverseName, String datasetName, String indexName)
-            throws AlgebricksException;
+    void dropIndex(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String datasetName,
+            String indexName) throws AlgebricksException;
 
     /**
      * Inserts a datatype.
@@ -295,8 +298,8 @@
      * @throws AlgebricksException
      *             For example, if the datatype does not exist.
      */
-    Datatype getDatatype(MetadataTransactionContext ctx, DataverseName dataverseName, String datatypeName)
-            throws AlgebricksException;
+    Datatype getDatatype(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String datatypeName) throws AlgebricksException;
 
     /**
      * Deletes the given datatype in given dataverse.
@@ -311,7 +314,7 @@
      *             For example, if there are still datasets using the type to be
      *             deleted.
      */
-    void dropDatatype(MetadataTransactionContext ctx, DataverseName dataverseName, String datatypeName)
+    void dropDatatype(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String datatypeName)
             throws AlgebricksException;
 
     /**
@@ -418,7 +421,7 @@
      * @throws AlgebricksException
      *             For example, if the dataverse does not exist.
      */
-    List<Function> getDataverseFunctions(MetadataTransactionContext ctx, DataverseName dataverseName)
+    List<Function> getDataverseFunctions(MetadataTransactionContext ctx, String database, DataverseName dataverseName)
             throws AlgebricksException;
 
     /**
@@ -442,8 +445,8 @@
      * @throws AlgebricksException
      *              For example, if the filter doesn't exist
      */
-    FullTextFilterMetadataEntity getFullTextFilter(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName,
-            String filterName) throws AlgebricksException;
+    FullTextFilterMetadataEntity getFullTextFilter(MetadataTransactionContext mdTxnCtx, String database,
+            DataverseName dataverseName, String filterName) throws AlgebricksException;
 
     /**
      * @param mdTxnCtx
@@ -455,8 +458,8 @@
      * @throws AlgebricksException
      *              For example, if ifExists is set to false and the filter doesn't exist
      */
-    void dropFullTextFilter(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName, String filterName)
-            throws AlgebricksException;
+    void dropFullTextFilter(MetadataTransactionContext mdTxnCtx, String database, DataverseName dataverseName,
+            String filterName) throws AlgebricksException;
 
     /**
      * @param mdTxnCtx
@@ -480,8 +483,8 @@
      *              For example, if the full-text config doesn't exist
      * @return
      */
-    FullTextConfigMetadataEntity getFullTextConfig(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName,
-            String configName) throws AlgebricksException;
+    FullTextConfigMetadataEntity getFullTextConfig(MetadataTransactionContext mdTxnCtx, String database,
+            DataverseName dataverseName, String configName) throws AlgebricksException;
 
     /**
      * @param mdTxnCtx
@@ -493,8 +496,8 @@
      * @throws AlgebricksException
      *              For example, if ifExists is set to false and the config doesn't exist
      */
-    void dropFullTextConfig(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName, String configName)
-            throws AlgebricksException;
+    void dropFullTextConfig(MetadataTransactionContext mdTxnCtx, String database, DataverseName dataverseName,
+            String configName) throws AlgebricksException;
 
     /**
      * @param mdTxnCtx
@@ -515,8 +518,8 @@
      *            name of the adapter
      * @throws AlgebricksException
      */
-    DatasourceAdapter getAdapter(MetadataTransactionContext ctx, DataverseName dataverseName, String name)
-            throws AlgebricksException;
+    DatasourceAdapter getAdapter(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String name) throws AlgebricksException;
 
     /**
      * @param ctx
@@ -527,7 +530,7 @@
      *            name of the adapter
      * @throws AlgebricksException
      */
-    void dropAdapter(MetadataTransactionContext ctx, DataverseName dataverseName, String name)
+    void dropAdapter(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String name)
             throws AlgebricksException;
 
     /**
@@ -537,8 +540,8 @@
      *            the dataverse whose associated adapters are being requested
      * @throws AlgebricksException
      */
-    List<DatasourceAdapter> getDataverseAdapters(MetadataTransactionContext ctx, DataverseName dataverseName)
-            throws AlgebricksException;
+    List<DatasourceAdapter> getDataverseAdapters(MetadataTransactionContext ctx, String database,
+            DataverseName dataverseName) throws AlgebricksException;
 
     /**
      * @param ctx
@@ -554,8 +557,8 @@
      * @return
      * @throws AlgebricksException
      */
-    CompactionPolicy getCompactionPolicy(MetadataTransactionContext ctx, DataverseName dataverse, String policyName)
-            throws AlgebricksException;
+    CompactionPolicy getCompactionPolicy(MetadataTransactionContext ctx, String database, DataverseName dataverse,
+            String policyName) throws AlgebricksException;
 
     /**
      * @param ctx
@@ -571,10 +574,11 @@
      * @return
      * @throws AlgebricksException
      */
-    Feed getFeed(MetadataTransactionContext ctx, DataverseName dataverseName, String feedName)
+    Feed getFeed(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String feedName)
             throws AlgebricksException;
 
-    List<Feed> getFeeds(MetadataTransactionContext ctx, DataverseName dataverseName) throws AlgebricksException;
+    List<Feed> getFeeds(MetadataTransactionContext ctx, String database, DataverseName dataverseName)
+            throws AlgebricksException;
 
     /**
      * @param ctx
@@ -582,7 +586,8 @@
      * @param feedName
      * @throws AlgebricksException
      */
-    void dropFeed(MetadataTransactionContext ctx, DataverseName dataverse, String feedName) throws AlgebricksException;
+    void dropFeed(MetadataTransactionContext ctx, String database, DataverseName dataverse, String feedName)
+            throws AlgebricksException;
 
     /**
      * @param ctx
@@ -597,7 +602,7 @@
      * @param policyName
      * @throws AlgebricksException
      */
-    void dropFeedPolicy(MetadataTransactionContext ctx, DataverseName dataverseName, String policyName)
+    void dropFeedPolicy(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String policyName)
             throws AlgebricksException;
 
     /**
@@ -607,11 +612,11 @@
      * @return
      * @throws AlgebricksException
      */
-    FeedPolicyEntity getFeedPolicy(MetadataTransactionContext ctx, DataverseName dataverseName, String policyName)
-            throws AlgebricksException;
+    FeedPolicyEntity getFeedPolicy(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String policyName) throws AlgebricksException;
 
-    List<FeedPolicyEntity> getDataverseFeedPolicies(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName)
-            throws AlgebricksException;
+    List<FeedPolicyEntity> getDataverseFeedPolicies(MetadataTransactionContext mdTxnCtx, String database,
+            DataverseName dataverseName) throws AlgebricksException;
 
     void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws AlgebricksException;
 
@@ -630,7 +635,7 @@
      *            the library does not exists.
      * @throws AlgebricksException
      */
-    void dropLibrary(MetadataTransactionContext ctx, DataverseName dataverseName, String libraryName)
+    void dropLibrary(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String libraryName)
             throws AlgebricksException;
 
     /**
@@ -656,7 +661,7 @@
      * @throws AlgebricksException
      * @throws RemoteException
      */
-    Library getLibrary(MetadataTransactionContext ctx, DataverseName dataverseName, String libraryName)
+    Library getLibrary(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String libraryName)
             throws AlgebricksException, RemoteException;
 
     /**
@@ -669,7 +674,7 @@
      * @return Library
      * @throws AlgebricksException
      */
-    List<Library> getDataverseLibraries(MetadataTransactionContext ctx, DataverseName dataverseName)
+    List<Library> getDataverseLibraries(MetadataTransactionContext ctx, String database, DataverseName dataverseName)
             throws AlgebricksException;
 
     /**
@@ -723,8 +728,8 @@
      * @return
      * @throws AlgebricksException
      */
-    ExternalFile getExternalFile(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName, String datasetName,
-            Integer fileNumber) throws AlgebricksException;
+    ExternalFile getExternalFile(MetadataTransactionContext mdTxnCtx, String database, DataverseName dataverseName,
+            String datasetName, Integer fileNumber) throws AlgebricksException;
 
     /**
      * Adds a synonym, acquiring local locks on behalf of the given transaction id.
@@ -750,7 +755,7 @@
      *            the synonym does not exists.
      * @throws AlgebricksException
      */
-    void dropSynonym(MetadataTransactionContext ctx, DataverseName dataverseName, String synonymName)
+    void dropSynonym(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String synonymName)
             throws AlgebricksException;
 
     /**
@@ -763,7 +768,7 @@
      * @return Library
      * @throws AlgebricksException
      */
-    Synonym getSynonym(MetadataTransactionContext ctx, DataverseName dataverseName, String synonymName)
+    Synonym getSynonym(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String synonymName)
             throws AlgebricksException;
 
     /**
@@ -776,7 +781,7 @@
      * @return list of synonyms
      * @throws AlgebricksException
      */
-    List<Synonym> getDataverseSynonyms(MetadataTransactionContext ctx, DataverseName dataverseName)
+    List<Synonym> getDataverseSynonyms(MetadataTransactionContext ctx, String database, DataverseName dataverseName)
             throws AlgebricksException;
 
     /**
@@ -875,14 +880,14 @@
      */
     void addFeedConnection(MetadataTransactionContext ctx, FeedConnection feedConnection) throws AlgebricksException;
 
-    void dropFeedConnection(MetadataTransactionContext ctx, DataverseName dataverseName, String feedName,
-            String datasetName) throws AlgebricksException;
+    void dropFeedConnection(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String feedName, String datasetName) throws AlgebricksException;
 
-    FeedConnection getFeedConnection(MetadataTransactionContext ctx, DataverseName dataverseName, String feedName,
-            String datasetName) throws AlgebricksException;
+    FeedConnection getFeedConnection(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String feedName, String datasetName) throws AlgebricksException;
 
-    List<FeedConnection> getFeedConections(MetadataTransactionContext ctx, DataverseName dataverseName, String feedName)
-            throws AlgebricksException;
+    List<FeedConnection> getFeedConections(MetadataTransactionContext ctx, String database, DataverseName dataverseName,
+            String feedName) throws AlgebricksException;
 
     long getMaxTxnId();
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 514dda4..1e84287 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -289,7 +289,7 @@
      */
     private static void insertNewCompactionPoliciesIfNotExist(MetadataTransactionContext mdTxnCtx)
             throws AlgebricksException {
-        if (MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+        if (MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx, null, MetadataConstants.METADATA_DATAVERSE_NAME,
                 ConcurrentMergePolicyFactory.NAME) == null) {
             CompactionPolicy compactionPolicy = getCompactionPolicyEntity(ConcurrentMergePolicyFactory.class.getName());
             MetadataManager.INSTANCE.addCompactionPolicy(mdTxnCtx, compactionPolicy);
@@ -299,12 +299,12 @@
     private static void insertSynonymEntitiesIfNotExist(MetadataTransactionContext mdTxnCtx,
             MetadataIndexesProvider mdIndexesProvider) throws AlgebricksException {
         IAType synonymDatasetRecordType = mdIndexesProvider.getSynonymEntity().getRecordType();
-        if (MetadataManager.INSTANCE.getDatatype(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+        if (MetadataManager.INSTANCE.getDatatype(mdTxnCtx, null, MetadataConstants.METADATA_DATAVERSE_NAME,
                 synonymDatasetRecordType.getTypeName()) == null) {
             MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
                     synonymDatasetRecordType.getTypeName(), synonymDatasetRecordType, false));
         }
-        if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+        if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, null, MetadataConstants.METADATA_DATAVERSE_NAME,
                 MetadataConstants.SYNONYM_DATASET_NAME) == null) {
             insertMetadataDatasets(mdTxnCtx, new IMetadataIndex[] { mdIndexesProvider.getSynonymEntity().getIndex() });
         }
@@ -320,24 +320,24 @@
         // We need to insert data types first because datasets depend on data types
         // ToDo: create a new function to reduce duplicated code here: addDatatypeIfNotExist()
         IAType fullTextConfigRecordType = metadataIndexesProvider.getFullTextConfigEntity().getRecordType();
-        if (MetadataManager.INSTANCE.getDatatype(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+        if (MetadataManager.INSTANCE.getDatatype(mdTxnCtx, null, MetadataConstants.METADATA_DATAVERSE_NAME,
                 fullTextConfigRecordType.getTypeName()) == null) {
             MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
                     fullTextConfigRecordType.getTypeName(), fullTextConfigRecordType, false));
         }
         IAType fullTextFilterRecordType = metadataIndexesProvider.getFullTextFilterEntity().getRecordType();
-        if (MetadataManager.INSTANCE.getDatatype(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+        if (MetadataManager.INSTANCE.getDatatype(mdTxnCtx, null, MetadataConstants.METADATA_DATAVERSE_NAME,
                 fullTextFilterRecordType.getTypeName()) == null) {
             MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(MetadataConstants.METADATA_DATAVERSE_NAME,
                     fullTextFilterRecordType.getTypeName(), fullTextFilterRecordType, false));
         }
 
-        if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+        if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, null, MetadataConstants.METADATA_DATAVERSE_NAME,
                 MetadataConstants.FULL_TEXT_CONFIG_DATASET_NAME) == null) {
             insertMetadataDatasets(mdTxnCtx,
                     new IMetadataIndex[] { metadataIndexesProvider.getFullTextConfigEntity().getIndex() });
         }
-        if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+        if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, null, MetadataConstants.METADATA_DATAVERSE_NAME,
                 MetadataConstants.FULL_TEXT_FILTER_DATASET_NAME) == null) {
             insertMetadataDatasets(mdTxnCtx,
                     new IMetadataIndex[] { metadataIndexesProvider.getFullTextFilterEntity().getIndex() });
@@ -481,19 +481,19 @@
             throws AlgebricksException {
         if (dataverse.getPendingOp() != MetadataUtil.PENDING_NO_OP) {
             // drop pending dataverse
-            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverse.getDataverseName());
+            MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverse.getDatabaseName(), dataverse.getDataverseName());
             if (LOGGER.isInfoEnabled()) {
                 LOGGER.info("Dropped a pending dataverse: " + dataverse.getDataverseName());
             }
         } else {
-            List<Dataset> datasets =
-                    MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverse.getDataverseName());
+            List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
+                    dataverse.getDatabaseName(), dataverse.getDataverseName());
             for (Dataset dataset : datasets) {
                 recoverDataset(mdTxnCtx, dataset);
             }
 
-            List<Library> libraries =
-                    MetadataManager.INSTANCE.getDataverseLibraries(mdTxnCtx, dataverse.getDataverseName());
+            List<Library> libraries = MetadataManager.INSTANCE.getDataverseLibraries(mdTxnCtx,
+                    dataverse.getDatabaseName(), dataverse.getDataverseName());
             for (Library library : libraries) {
                 recoverLibrary(mdTxnCtx, library);
             }
@@ -508,19 +508,20 @@
         }
         if (dataset.getPendingOp() != MetadataUtil.PENDING_NO_OP) {
             // drop pending dataset
-            MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), true);
+            MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataset.getDatabaseName(), dataset.getDataverseName(),
+                    dataset.getDatasetName(), true);
             if (LOGGER.isInfoEnabled()) {
                 LOGGER.info(
                         "Dropped a pending dataset: " + dataset.getDataverseName() + "." + dataset.getDatasetName());
             }
         } else {
-            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName());
+            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataset.getDatabaseName(),
+                    dataset.getDataverseName(), dataset.getDatasetName());
             for (Index index : indexes) {
                 if (index.getPendingOp() != MetadataUtil.PENDING_NO_OP) {
                     // drop pending index
-                    MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(),
-                            index.getIndexName());
+                    MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataset.getDatabaseName(), dataset.getDataverseName(),
+                            dataset.getDatasetName(), index.getIndexName());
                     if (LOGGER.isInfoEnabled()) {
                         LOGGER.info("Dropped a pending index: " + dataset.getDataverseName() + "."
                                 + dataset.getDatasetName() + "." + index.getIndexName());
@@ -530,8 +531,8 @@
         }
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
             // if the dataset has no indexes, delete all its files
-            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName());
+            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataset.getDatabaseName(),
+                    dataset.getDataverseName(), dataset.getDatasetName());
             if (indexes.isEmpty()) {
                 List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
                 for (ExternalFile file : files) {
@@ -549,7 +550,8 @@
             throws AlgebricksException {
         if (library.getPendingOp() != MetadataUtil.PENDING_NO_OP) {
             // drop pending library
-            MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, library.getDataverseName(), library.getName());
+            MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, library.getDatabaseName(), library.getDataverseName(),
+                    library.getName());
             if (LOGGER.isInfoEnabled()) {
                 LOGGER.info("Dropped a pending library: " + library.getDataverseName() + "." + library.getName());
             }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index ebf4204..e5842dc 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -125,9 +125,9 @@
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig,
             IProjectionFiltrationInfo projectionFiltrationInfo) throws AlgebricksException {
         String itemTypeName = dataset.getItemTypeName();
-        IAType itemType = MetadataManager.INSTANCE
-                .getDatatype(metadataProvider.getMetadataTxnContext(), dataset.getItemTypeDataverseName(), itemTypeName)
-                .getDatatype();
+        String itemTypeDatabase = null;
+        IAType itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
+                itemTypeDatabase, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
         switch (dataset.getDatasetType()) {
             case EXTERNAL:
                 DatasetDataSource externalDataSource = (DatasetDataSource) dataSource;
@@ -148,16 +148,18 @@
                         tupleFilterFactory, outputLimit);
             case INTERNAL:
                 DataSourceId id = getId();
+                String database = null;
                 DataverseName dataverseName = id.getDataverseName();
                 String datasetName = id.getDatasourceName();
                 Index primaryIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
-                        dataverseName, datasetName, datasetName);
+                        database, dataverseName, datasetName, datasetName);
 
                 ARecordType datasetType = (ARecordType) itemType;
                 ARecordType metaItemType = null;
                 if (dataset.hasMetaPart()) {
+                    String metaItemTypeDatabase = null;
                     metaItemType = (ARecordType) MetadataManager.INSTANCE
-                            .getDatatype(metadataProvider.getMetadataTxnContext(),
+                            .getDatatype(metadataProvider.getMetadataTxnContext(), metaItemTypeDatabase,
                                     dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName())
                             .getDatatype();
                 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
index b976448..152ace9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
@@ -26,6 +26,7 @@
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.metadata.MetadataManager;
@@ -60,7 +61,7 @@
 
     public static IAType findType(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName, String typeName)
             throws AlgebricksException {
-        Datatype type = findTypeEntity(mdTxnCtx, dataverseName, typeName);
+        Datatype type = findTypeEntity(mdTxnCtx, null, dataverseName, typeName);
         return type != null ? type.getDatatype() : null;
     }
 
@@ -73,7 +74,7 @@
      * @param metaItemType record type of the meta part of the dataset
      * @param dataset      the actual dataset
      * @return type computed from primary keys if dataset without type spec, otherwise the original itemType itself
-     * @throws AlgebricksException
+     * @throws AlgebricksException AlgebricksException
      */
     public static IAType findTypeForDatasetWithoutType(IAType itemType, IAType metaItemType, Dataset dataset)
             throws AlgebricksException {
@@ -91,12 +92,12 @@
         return ProjectionFiltrationTypeUtil.getRecordTypeWithFieldTypes(primaryKeys, primaryKeyTypes);
     }
 
-    public static Datatype findTypeEntity(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName,
-            String typeName) throws AlgebricksException {
+    public static Datatype findTypeEntity(MetadataTransactionContext mdTxnCtx, String database,
+            DataverseName dataverseName, String typeName) throws AlgebricksException {
         if (dataverseName == null || typeName == null) {
             return null;
         }
-        Datatype type = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
+        Datatype type = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, database, dataverseName, typeName);
         if (type == null) {
             throw new AsterixException(ErrorCode.UNKNOWN_TYPE, dataverseName + "." + typeName);
         }
@@ -109,31 +110,34 @@
             return null;
         }
         if (dataverseName == null) {
-            throw new AlgebricksException("Cannot declare output-record-type with no " + dataverse());
+            throw new CompilationException(ErrorCode.COMPILATION_ERROR,
+                    "Cannot declare output-record-type with no " + dataverse());
         }
         IAType type = findType(mdTxnCtx, dataverseName, outputRecordType);
         if (!(type instanceof ARecordType)) {
-            throw new AlgebricksException("Type " + outputRecordType + " is not a record type!");
+            throw new CompilationException(ErrorCode.COMPILATION_ERROR,
+                    "Type " + outputRecordType + " is not a record type!");
         }
         return (ARecordType) type;
     }
 
-    public static DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName,
-            String adapterName) throws AlgebricksException {
+    public static DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String database,
+            DataverseName dataverseName, String adapterName) throws AlgebricksException {
         DatasourceAdapter adapter;
         // search in default namespace (built-in adapter)
-        adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
+        adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, null, MetadataConstants.METADATA_DATAVERSE_NAME,
+                adapterName);
 
         // search in dataverse (user-defined adapter)
         if (adapter == null) {
-            adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName);
+            adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, database, dataverseName, adapterName);
         }
         return adapter;
     }
 
-    public static Dataset findDataset(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName,
+    public static Dataset findDataset(MetadataTransactionContext mdTxnCtx, String database, DataverseName dataverseName,
             String datasetName, boolean includingViews) throws AlgebricksException {
-        Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+        Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, database, dataverseName, datasetName);
         if (!includingViews && dataset != null && dataset.getDatasetType() == DatasetConfig.DatasetType.VIEW) {
             return null;
         }
@@ -142,7 +146,7 @@
 
     public static Dataset findDataset(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName,
             String datasetName) throws AlgebricksException {
-        return findDataset(mdTxnCtx, dataverseName, datasetName, false);
+        return findDataset(mdTxnCtx, null, dataverseName, datasetName, false);
     }
 
     public static Dataset findExistingDataset(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName,
@@ -172,44 +176,44 @@
         return MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroupName).getNodeNames();
     }
 
-    public static Feed findFeed(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName, String feedName)
-            throws AlgebricksException {
-        return MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedName);
+    public static Feed findFeed(MetadataTransactionContext mdTxnCtx, String database, DataverseName dataverseName,
+            String feedName) throws AlgebricksException {
+        return MetadataManager.INSTANCE.getFeed(mdTxnCtx, database, dataverseName, feedName);
     }
 
-    public static FeedConnection findFeedConnection(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName,
-            String feedName, String datasetName) throws AlgebricksException {
-        return MetadataManager.INSTANCE.getFeedConnection(mdTxnCtx, dataverseName, feedName, datasetName);
+    public static FeedConnection findFeedConnection(MetadataTransactionContext mdTxnCtx, String database,
+            DataverseName dataverseName, String feedName, String datasetName) throws AlgebricksException {
+        return MetadataManager.INSTANCE.getFeedConnection(mdTxnCtx, database, dataverseName, feedName, datasetName);
     }
 
-    public static FeedPolicyEntity findFeedPolicy(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName,
-            String policyName) throws AlgebricksException {
-        return MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName, policyName);
+    public static FeedPolicyEntity findFeedPolicy(MetadataTransactionContext mdTxnCtx, String database,
+            DataverseName dataverseName, String policyName) throws AlgebricksException {
+        return MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, database, dataverseName, policyName);
     }
 
-    public static Synonym findSynonym(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName,
+    public static Synonym findSynonym(MetadataTransactionContext mdTxnCtx, String database, DataverseName dataverseName,
             String synonymName) throws AlgebricksException {
-        return MetadataManager.INSTANCE.getSynonym(mdTxnCtx, dataverseName, synonymName);
+        return MetadataManager.INSTANCE.getSynonym(mdTxnCtx, database, dataverseName, synonymName);
     }
 
     public static FullTextConfigMetadataEntity findFullTextConfigDescriptor(MetadataTransactionContext mdTxnCtx,
-            DataverseName dataverseName, String ftConfigName) throws AlgebricksException {
+            String database, DataverseName dataverseName, String ftConfigName) throws AlgebricksException {
         // If the config name is null, then the default config will be returned
         if (Strings.isNullOrEmpty(ftConfigName)) {
             return FullTextConfigMetadataEntity.getDefaultFullTextConfigMetadataEntity();
         }
 
-        return MetadataManager.INSTANCE.getFullTextConfig(mdTxnCtx, dataverseName, ftConfigName);
+        return MetadataManager.INSTANCE.getFullTextConfig(mdTxnCtx, database, dataverseName, ftConfigName);
     }
 
     public static FullTextFilterMetadataEntity findFullTextFilterDescriptor(MetadataTransactionContext mdTxnCtx,
-            DataverseName dataverseName, String ftFilterName) throws AlgebricksException {
-        return MetadataManager.INSTANCE.getFullTextFilter(mdTxnCtx, dataverseName, ftFilterName);
+            String database, DataverseName dataverseName, String ftFilterName) throws AlgebricksException {
+        return MetadataManager.INSTANCE.getFullTextFilter(mdTxnCtx, database, dataverseName, ftFilterName);
     }
 
-    public static List<Index> getDatasetIndexes(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName,
-            String datasetName) throws AlgebricksException {
-        return MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
+    public static List<Index> getDatasetIndexes(MetadataTransactionContext mdTxnCtx, String database,
+            DataverseName dataverseName, String datasetName) throws AlgebricksException {
+        return MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, database, dataverseName, datasetName);
     }
 
     public static DataSource findDataSource(IClusterStateManager clusterStateManager,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 19b69f4..42162b3 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -356,7 +356,7 @@
         }
         appCtx.getMetadataLockManager().acquireDataverseReadLock(locks, dvName);
         appCtx.getMetadataLockManager().acquireDatasetReadLock(locks, dvName, datasetName);
-        return MetadataManagerUtil.findDataset(mdTxnCtx, dvName, datasetName, includingViews);
+        return MetadataManagerUtil.findDataset(mdTxnCtx, null, dvName, datasetName, includingViews);
     }
 
     public INodeDomain findNodeDomain(String nodeGroupName) throws AlgebricksException {
@@ -368,7 +368,7 @@
     }
 
     public Datatype findTypeEntity(DataverseName dataverseName, String typeName) throws AlgebricksException {
-        return MetadataManagerUtil.findTypeEntity(mdTxnCtx, dataverseName, typeName);
+        return MetadataManagerUtil.findTypeEntity(mdTxnCtx, null, dataverseName, typeName);
     }
 
     public IAType findTypeForDatasetWithoutType(IAType recordType, IAType metaRecordType, Dataset dataset)
@@ -389,16 +389,16 @@
     }
 
     public Feed findFeed(DataverseName dataverseName, String feedName) throws AlgebricksException {
-        return MetadataManagerUtil.findFeed(mdTxnCtx, dataverseName, feedName);
+        return MetadataManagerUtil.findFeed(mdTxnCtx, null, dataverseName, feedName);
     }
 
     public FeedConnection findFeedConnection(DataverseName dataverseName, String feedName, String datasetName)
             throws AlgebricksException {
-        return MetadataManagerUtil.findFeedConnection(mdTxnCtx, dataverseName, feedName, datasetName);
+        return MetadataManagerUtil.findFeedConnection(mdTxnCtx, null, dataverseName, feedName, datasetName);
     }
 
     public FeedPolicyEntity findFeedPolicy(DataverseName dataverseName, String policyName) throws AlgebricksException {
-        return MetadataManagerUtil.findFeedPolicy(mdTxnCtx, dataverseName, policyName);
+        return MetadataManagerUtil.findFeedPolicy(mdTxnCtx, null, dataverseName, policyName);
     }
 
     @Override
@@ -424,11 +424,11 @@
 
     public Index getIndex(DataverseName dataverseName, String datasetName, String indexName)
             throws AlgebricksException {
-        return MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
+        return MetadataManager.INSTANCE.getIndex(mdTxnCtx, null, dataverseName, datasetName, indexName);
     }
 
     public List<Index> getDatasetIndexes(DataverseName dataverseName, String datasetName) throws AlgebricksException {
-        return MetadataManagerUtil.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
+        return MetadataManagerUtil.getDatasetIndexes(mdTxnCtx, null, dataverseName, datasetName);
     }
 
     public Index findSampleIndex(DataverseName dataverseName, String datasetName) throws AlgebricksException {
@@ -448,7 +448,7 @@
             return null;
         }
         Synonym synonym = null;
-        while (MetadataManagerUtil.findDataset(mdTxnCtx, dvName, datasetName, includingViews) == null) {
+        while (MetadataManagerUtil.findDataset(mdTxnCtx, null, dvName, datasetName, includingViews) == null) {
             synonym = findSynonym(dvName, datasetName);
             if (synonym == null) {
                 return null;
@@ -460,17 +460,17 @@
     }
 
     public Synonym findSynonym(DataverseName dataverseName, String synonymName) throws AlgebricksException {
-        return MetadataManagerUtil.findSynonym(mdTxnCtx, dataverseName, synonymName);
+        return MetadataManagerUtil.findSynonym(mdTxnCtx, null, dataverseName, synonymName);
     }
 
     public FullTextConfigMetadataEntity findFullTextConfig(DataverseName dataverseName, String ftConfigName)
             throws AlgebricksException {
-        return MetadataManagerUtil.findFullTextConfigDescriptor(mdTxnCtx, dataverseName, ftConfigName);
+        return MetadataManagerUtil.findFullTextConfigDescriptor(mdTxnCtx, null, dataverseName, ftConfigName);
     }
 
     public FullTextFilterMetadataEntity findFullTextFilter(DataverseName dataverseName, String ftFilterName)
             throws AlgebricksException {
-        return MetadataManagerUtil.findFullTextFilterDescriptor(mdTxnCtx, dataverseName, ftFilterName);
+        return MetadataManagerUtil.findFullTextFilterDescriptor(mdTxnCtx, null, dataverseName, ftFilterName);
     }
 
     @Override
@@ -509,7 +509,7 @@
     }
 
     public Dataverse findDataverse(DataverseName dataverseName) throws AlgebricksException {
-        return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
+        return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, null, dataverseName);
     }
 
     public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, ITypedAdapterFactory> getFeedIntakeRuntime(
@@ -549,13 +549,13 @@
             boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch, ITupleProjectorFactory tupleProjectorFactory,
             boolean partitionInputTuples) throws AlgebricksException {
         boolean isSecondary = true;
-        Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                dataset.getDatasetName(), dataset.getDatasetName());
+        Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDatabaseName(),
+                dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName());
         if (primaryIndex != null && (dataset.getDatasetType() != DatasetType.EXTERNAL)) {
             isSecondary = !indexName.equals(primaryIndex.getIndexName());
         }
-        Index theIndex = isSecondary ? MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                dataset.getDatasetName(), indexName) : primaryIndex;
+        Index theIndex = isSecondary ? MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDatabaseName(),
+                dataset.getDataverseName(), dataset.getDatasetName(), indexName) : primaryIndex;
 
         int numSecondaryKeys;
         switch (theIndex.getIndexType()) {
@@ -641,8 +641,8 @@
             boolean propagateFilter, IMissingWriterFactory nonFilterWriterFactory, int[] minFilterFieldIndexes,
             int[] maxFilterFieldIndexes, boolean isIndexOnlyPlan) throws AlgebricksException {
         int numPrimaryKeys = dataset.getPrimaryKeys().size();
-        Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                dataset.getDatasetName(), indexName);
+        Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDatabaseName(),
+                dataset.getDataverseName(), dataset.getDatasetName(), indexName);
         if (secondaryIndex == null) {
             throw new AlgebricksException("Code generation error: no index " + indexName + " for " + dataset() + " "
                     + dataset.getDatasetName());
@@ -861,8 +861,8 @@
         }
 
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
-        Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                dataset.getDatasetName(), indexName);
+        Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDatabaseName(),
+                dataset.getDataverseName(), dataset.getDatasetName(), indexName);
         // TokenizeOperator only supports a keyword or n-gram index.
         switch (secondaryIndex.getIndexType()) {
             case SINGLE_PARTITION_WORD_INVIX:
@@ -936,11 +936,12 @@
             String adapterName) throws AlgebricksException {
         DatasourceAdapter adapter;
         // search in default namespace (built-in adapter)
-        adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
+        adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, null, MetadataConstants.METADATA_DATAVERSE_NAME,
+                adapterName);
 
         // search in dataverse (user-defined adapter)
         if (adapter == null) {
-            adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName);
+            adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, null, dataverseName, adapterName);
         }
         return adapter;
     }
@@ -1029,8 +1030,8 @@
             filterFields[0] = idx;
         }
 
-        Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                dataset.getDatasetName(), dataset.getDatasetName());
+        Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDatabaseName(),
+                dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName());
         PartitioningProperties partitioningProperties = getPartitioningProperties(dataset);
 
         // prepare callback
@@ -1059,8 +1060,8 @@
                 ISearchOperationCallbackFactory searchCallbackFactory = dataset
                         .getSearchCallbackFactory(storageComponentProvider, primaryIndex, indexOp, primaryKeyFields);
 
-                Optional<Index> primaryKeyIndex = MetadataManager.INSTANCE
-                        .getDatasetIndexes(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName()).stream()
+                Optional<Index> primaryKeyIndex = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
+                        dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName()).stream()
                         .filter(Index::isPrimaryKeyIndex).findFirst();
                 IIndexDataflowHelperFactory pkidfh = null;
                 if (primaryKeyIndex.isPresent()) {
@@ -1116,8 +1117,8 @@
         String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
 
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
-        Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                dataset.getDatasetName(), indexName);
+        Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDatabaseName(),
+                dataset.getDataverseName(), dataset.getDatasetName(), indexName);
 
         ArrayList<LogicalVariable> prevAdditionalFilteringKeys = null;
         if (indexOp == IndexOperation.UPSERT && prevAdditionalFilteringKey != null) {
@@ -1235,8 +1236,8 @@
         }
         try {
             // Index parameters.
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
+            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDatabaseName(),
+                    dataset.getDataverseName(), dataset.getDatasetName(), indexName);
             PartitioningProperties partitioningProperties =
                     getPartitioningProperties(dataset, secondaryIndex.getIndexName());
             // prepare callback
@@ -1304,8 +1305,8 @@
 
         try {
             // Index parameters.
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
+            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDatabaseName(),
+                    dataset.getDataverseName(), dataset.getDatasetName(), indexName);
 
             PartitioningProperties partitioningProperties =
                     getPartitioningProperties(dataset, secondaryIndex.getIndexName());
@@ -1346,12 +1347,14 @@
             throws AlgebricksException {
         Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
         String itemTypeName = dataset.getItemTypeName();
+        String itemTypeDatabase = null;
         IAType itemType = MetadataManager.INSTANCE
-                .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
+                .getDatatype(mdTxnCtx, itemTypeDatabase, dataset.getItemTypeDataverseName(), itemTypeName)
+                .getDatatype();
         validateRecordType(itemType);
         ARecordType recType = (ARecordType) itemType;
-        Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                dataset.getDatasetName(), indexName);
+        Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDatabaseName(),
+                dataset.getDataverseName(), dataset.getDatasetName(), indexName);
         Index.ValueIndexDetails secondaryIndexDetails = (Index.ValueIndexDetails) secondaryIndex.getIndexDetails();
         List<List<String>> secondaryKeyExprs = secondaryIndexDetails.getKeyFieldNames();
         List<IAType> secondaryKeyTypes = secondaryIndexDetails.getKeyFieldTypes();
@@ -1528,8 +1531,8 @@
         }
         try {
             // Index parameters.
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
+            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDatabaseName(),
+                    dataset.getDataverseName(), dataset.getDatasetName(), indexName);
 
             PartitioningProperties partitioningProperties =
                     getPartitioningProperties(dataset, secondaryIndex.getIndexName());
@@ -1654,7 +1657,9 @@
         String itemTypeName = dataset.getItemTypeName();
         IAType itemType;
         try {
-            itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
+            String itemTypeDatabase = null;
+            itemType = MetadataManager.INSTANCE
+                    .getDatatype(mdTxnCtx, itemTypeDatabase, dataset.getItemTypeDataverseName(), itemTypeName)
                     .getDatatype();
 
             if (itemType.getTypeTag() != ATypeTag.OBJECT) {
@@ -1664,8 +1669,8 @@
             ARecordType recType = (ARecordType) itemType;
 
             // Index parameters.
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
+            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDatabaseName(),
+                    dataset.getDataverseName(), dataset.getDatasetName(), indexName);
             Index.TextIndexDetails secondaryIndexDetails = (Index.TextIndexDetails) secondaryIndex.getIndexDetails();
 
             List<List<String>> secondaryKeyExprs = secondaryIndexDetails.getKeyFieldNames();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index b27ac38..cd8c64b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -47,7 +47,6 @@
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
 import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
@@ -87,7 +86,6 @@
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -355,11 +353,10 @@
             List<JobSpecification> jobsToExecute, MutableBoolean bActiveTxn, MutableObject<ProgressState> progress,
             IHyracksClientConnection hcc, boolean dropCorrespondingNodeGroup, SourceLocation sourceLoc,
             Set<DropOption> options, boolean force) throws Exception {
-        Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>();
         if (getDatasetType() == DatasetType.INTERNAL) {
             // #. prepare jobs to drop the datatset and the indexes in NC
-            List<Index> indexes =
-                    MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, datasetName);
+            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), databaseName,
+                    dataverseName, datasetName);
             for (int j = 0; j < indexes.size(); j++) {
                 if (indexes.get(j).isSecondaryIndex()) {
                     jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(indexes.get(j), metadataProvider, this, options,
@@ -368,7 +365,7 @@
             }
             jobsToExecute.add(DatasetUtil.dropDatasetJobSpec(this, metadataProvider, options));
             // #. mark the existing dataset as PendingDropOp
-            MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName, force);
+            MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), databaseName, dataverseName, datasetName, force);
             MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(),
                     new Dataset(dataverseName, datasetName, getItemTypeDataverseName(), getItemTypeName(),
                             getMetaItemTypeDataverseName(), getMetaItemTypeName(), getNodeGroupName(),
@@ -380,11 +377,6 @@
             bActiveTxn.setValue(false);
             progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
 
-            // # disconnect the feeds
-            for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) {
-                JobUtils.runJob(hcc, p.first, true);
-            }
-
             // #. run the jobs
             for (JobSpecification jobSpec : jobsToExecute) {
                 JobUtils.runJob(hcc, jobSpec, true);
@@ -397,14 +389,18 @@
         }
 
         // #. finally, delete the dataset.
-        MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName, force);
+        MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), databaseName, dataverseName, datasetName, force);
 
         // drop inline types
+        String recordTypeDatabase = null;
         if (TypeUtil.isDatasetInlineTypeName(this, recordTypeDataverseName, recordTypeName)) {
-            MetadataManager.INSTANCE.dropDatatype(mdTxnCtx.getValue(), recordTypeDataverseName, recordTypeName);
+            MetadataManager.INSTANCE.dropDatatype(mdTxnCtx.getValue(), recordTypeDatabase, recordTypeDataverseName,
+                    recordTypeName);
         }
+        String metaTypeDatabase = null;
         if (hasMetaPart() && TypeUtil.isDatasetInlineTypeName(this, metaTypeDataverseName, metaTypeName)) {
-            MetadataManager.INSTANCE.dropDatatype(mdTxnCtx.getValue(), metaTypeDataverseName, metaTypeName);
+            MetadataManager.INSTANCE.dropDatatype(mdTxnCtx.getValue(), metaTypeDatabase, metaTypeDataverseName,
+                    metaTypeName);
         }
 
         // Drops the associated nodegroup if it is no longer used by any other dataset.
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 78421f2..a76f8a9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -89,7 +89,8 @@
 
     public static Feed validateIfFeedExists(DataverseName dataverseName, String feedName,
             MetadataTransactionContext ctx) throws AlgebricksException {
-        Feed feed = MetadataManager.INSTANCE.getFeed(ctx, dataverseName, feedName);
+        String database = null;
+        Feed feed = MetadataManager.INSTANCE.getFeed(ctx, database, dataverseName, feedName);
         if (feed == null) {
             throw new CompilationException("Unknown source feed: " + feedName);
         }
@@ -98,10 +99,11 @@
 
     public static FeedPolicyEntity validateIfPolicyExists(DataverseName dataverseName, String policyName,
             MetadataTransactionContext ctx) throws AlgebricksException {
-        FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, dataverseName, policyName);
+        String database = null;
+        FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, database, dataverseName, policyName);
         if (feedPolicy == null) {
-            feedPolicy =
-                    MetadataManager.INSTANCE.getFeedPolicy(ctx, MetadataConstants.METADATA_DATAVERSE_NAME, policyName);
+            feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(ctx, null, MetadataConstants.METADATA_DATAVERSE_NAME,
+                    policyName);
             if (feedPolicy == null) {
                 throw new CompilationException("Unknown feed policy" + policyName);
             }
@@ -121,11 +123,12 @@
             if (adapterName == null) {
                 throw new AlgebricksException("cannot find adapter name");
             }
-            DatasourceAdapter adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx,
+            DatasourceAdapter adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, null,
                     MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
             // Get adapter from metadata dataset <The feed dataverse>
             if (adapterEntity == null) {
-                adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName);
+                adapterEntity =
+                        MetadataManager.INSTANCE.getAdapter(mdTxnCtx, null, feed.getDataverseName(), adapterName);
             }
             AdapterType adapterType;
             ITypedAdapterFactory adapterFactory;
@@ -177,7 +180,7 @@
             throws AlgebricksException, RemoteException, HyracksDataException {
         DataverseName libraryDataverse = adapterEntity.getLibraryDataverseName();
         String libraryName = adapterEntity.getLibraryName();
-        Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, libraryDataverse, libraryName);
+        Library library = MetadataManager.INSTANCE.getLibrary(mdTxnCtx, null, libraryDataverse, libraryName);
         if (library == null) {
             throw new CompilationException(ErrorCode.UNKNOWN_LIBRARY, libraryName);
         }
@@ -208,11 +211,12 @@
             metaType = getOutputType(feed, configuration.get(ExternalDataConstants.KEY_META_TYPE_NAME));
             ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(), feed.getFeedName());
             // Get adapter from metadata dataset <Metadata dataverse>
-            adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
-                    adapterName);
+            adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, null,
+                    MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
             // Get adapter from metadata dataset <The feed dataverse>
             if (adapterEntity == null) {
-                adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName);
+                adapterEntity =
+                        MetadataManager.INSTANCE.getAdapter(mdTxnCtx, null, feed.getDataverseName(), adapterName);
             }
             if (adapterEntity != null) {
                 adapterType = adapterEntity.getType();
@@ -305,7 +309,7 @@
         MetadataTransactionContext ctx = null;
         try {
             ctx = MetadataManager.INSTANCE.beginTransaction();
-            Datatype t = MetadataManager.INSTANCE.getDatatype(ctx, feed.getDataverseName(), fqOutputType);
+            Datatype t = MetadataManager.INSTANCE.getDatatype(ctx, null, feed.getDataverseName(), fqOutputType);
             if (t == null || t.getDatatype().getTypeTag() != ATypeTag.OBJECT) {
                 throw new MetadataException(ErrorCode.FEED_METADATA_UTIL_UNEXPECTED_FEED_DATATYPE, fqOutputType);
             }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 44964f3..09a8a4b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -250,7 +250,7 @@
     public static Pair<ILSMMergePolicyFactory, Map<String, String>> getMergePolicyFactory(Dataset dataset,
             MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
         String policyName = dataset.getCompactionPolicy();
-        CompactionPolicy compactionPolicy = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
+        CompactionPolicy compactionPolicy = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx, null,
                 MetadataConstants.METADATA_DATAVERSE_NAME, policyName);
         String compactionPolicyFactoryClassName = compactionPolicy.getClassName();
         ILSMMergePolicyFactory mergePolicyFactory;