[ASTERIXDB-3259][MTD] Handle 'database' in MetadataCache

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- add getDatabase() in IMetadataNode/IMetadataManager
- default the database for FunctionSignature

Change-Id: I456c05d9f823a6524bad6ed75f618f5a95e969b0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17814
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/functions/FunctionSignature.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/functions/FunctionSignature.java
index f3b1605..7bb538f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/functions/FunctionSignature.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/functions/FunctionSignature.java
@@ -23,6 +23,7 @@
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataUtil;
 import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 
@@ -92,6 +93,10 @@
         return sb.toString();
     }
 
+    public String getDatabaseName() {
+        return MetadataUtil.databaseFor(dataverseName);
+    }
+
     public DataverseName getDataverseName() {
         return dataverseName;
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
index a8d57ce..f054796 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
@@ -58,36 +58,38 @@
 
     protected final Map<String, Database> databases = new HashMap<>();
     // Key is dataverse name.
-    protected final Map<DataverseName, Dataverse> dataverses = new HashMap<>();
+    protected final Map<String, Map<DataverseName, Dataverse>> dataverses = new HashMap<>();
     // Key is dataverse name. Key of value map is dataset name.
-    protected final Map<DataverseName, Map<String, Dataset>> datasets = new HashMap<>();
+    protected final Map<String, Map<DataverseName, Map<String, Dataset>>> datasets = new HashMap<>();
     // Key is dataverse name. Key of value map is dataset name. Key of value map of value map is index name.
-    protected final Map<DataverseName, Map<String, Map<String, Index>>> indexes = new HashMap<>();
+    protected final Map<String, Map<DataverseName, Map<String, Map<String, Index>>>> indexes = new HashMap<>();
     // Key is dataverse name. Key of value map is datatype name.
-    protected final Map<DataverseName, Map<String, Datatype>> datatypes = new HashMap<>();
+    protected final Map<String, Map<DataverseName, Map<String, Datatype>>> datatypes = new HashMap<>();
     // Key is node group name.
     protected final Map<String, NodeGroup> nodeGroups = new HashMap<>();
     // Key is function Identifier . Key of value map is function name.
     protected final Map<FunctionSignature, Function> functions = new HashMap<>();
     // Key is adapter dataverse name. Key of value map is the adapter name
-    protected final Map<DataverseName, Map<String, DatasourceAdapter>> adapters = new HashMap<>();
+    protected final Map<String, Map<DataverseName, Map<String, DatasourceAdapter>>> adapters = new HashMap<>();
 
     // Key is DataverseName, Key of the value map is the Policy name
-    protected final Map<DataverseName, Map<String, FeedPolicyEntity>> feedPolicies = new HashMap<>();
+    protected final Map<String, Map<DataverseName, Map<String, FeedPolicyEntity>>> feedPolicies = new HashMap<>();
     // Key is library dataverse. Key of value map is the library name
-    protected final Map<DataverseName, Map<String, Library>> libraries = new HashMap<>();
+    protected final Map<String, Map<DataverseName, Map<String, Library>>> libraries = new HashMap<>();
     // Key is library dataverse. Key of value map is the feed name
-    protected final Map<DataverseName, Map<String, Feed>> feeds = new HashMap<>();
+    protected final Map<String, Map<DataverseName, Map<String, Feed>>> feeds = new HashMap<>();
     // Key is DataverseName, Key of the value map is the Policy name
-    protected final Map<DataverseName, Map<String, CompactionPolicy>> compactionPolicies = new HashMap<>();
+    protected final Map<String, Map<DataverseName, Map<String, CompactionPolicy>>> compactionPolicies = new HashMap<>();
     // Key is DataverseName, Key of value map is feedConnectionId
-    protected final Map<DataverseName, Map<String, FeedConnection>> feedConnections = new HashMap<>();
+    protected final Map<String, Map<DataverseName, Map<String, FeedConnection>>> feedConnections = new HashMap<>();
     // Key is synonym dataverse. Key of value map is the synonym name
-    protected final Map<DataverseName, Map<String, Synonym>> synonyms = new HashMap<>();
+    protected final Map<String, Map<DataverseName, Map<String, Synonym>>> synonyms = new HashMap<>();
     // Key is DataverseName. Key of value map is the full-text filter name
-    protected final Map<DataverseName, Map<String, FullTextFilterMetadataEntity>> fullTextFilters = new HashMap<>();
+    protected final Map<String, Map<DataverseName, Map<String, FullTextFilterMetadataEntity>>> fullTextFilters =
+            new HashMap<>();
     // Key is DataverseName. Key of value map is the full-text config name
-    protected final Map<DataverseName, Map<String, FullTextConfigMetadataEntity>> fullTextConfigs = new HashMap<>();
+    protected final Map<String, Map<DataverseName, Map<String, FullTextConfigMetadataEntity>>> fullTextConfigs =
+            new HashMap<>();
 
     // Atomically executes all metadata operations in ctx's log.
     public void commit(MetadataTransactionContext ctx) {
@@ -116,30 +118,33 @@
     }
 
     public void clear() {
-        synchronized (dataverses) {
-            synchronized (nodeGroups) {
-                synchronized (datasets) {
-                    synchronized (indexes) {
-                        synchronized (datatypes) {
-                            synchronized (functions) {
-                                synchronized (fullTextConfigs) {
-                                    synchronized (fullTextFilters) {
-                                        synchronized (adapters) {
-                                            synchronized (libraries) {
-                                                synchronized (compactionPolicies) {
-                                                    synchronized (synonyms) {
-                                                        dataverses.clear();
-                                                        nodeGroups.clear();
-                                                        datasets.clear();
-                                                        indexes.clear();
-                                                        datatypes.clear();
-                                                        functions.clear();
-                                                        fullTextConfigs.clear();
-                                                        fullTextFilters.clear();
-                                                        adapters.clear();
-                                                        libraries.clear();
-                                                        compactionPolicies.clear();
-                                                        synonyms.clear();
+        synchronized (databases) {
+            synchronized (dataverses) {
+                synchronized (nodeGroups) {
+                    synchronized (datasets) {
+                        synchronized (indexes) {
+                            synchronized (datatypes) {
+                                synchronized (functions) {
+                                    synchronized (fullTextConfigs) {
+                                        synchronized (fullTextFilters) {
+                                            synchronized (adapters) {
+                                                synchronized (libraries) {
+                                                    synchronized (compactionPolicies) {
+                                                        synchronized (synonyms) {
+                                                            databases.clear();
+                                                            dataverses.clear();
+                                                            nodeGroups.clear();
+                                                            datasets.clear();
+                                                            indexes.clear();
+                                                            datatypes.clear();
+                                                            functions.clear();
+                                                            fullTextConfigs.clear();
+                                                            fullTextFilters.clear();
+                                                            adapters.clear();
+                                                            libraries.clear();
+                                                            compactionPolicies.clear();
+                                                            synonyms.clear();
+                                                        }
                                                     }
                                                 }
                                             }
@@ -168,12 +173,25 @@
         synchronized (dataverses) {
             synchronized (datasets) {
                 synchronized (datatypes) {
+                    String databaseName = dataverse.getDatabaseName();
+                    Map<DataverseName, Dataverse> databaseDataverses =
+                            dataverses.computeIfAbsent(databaseName, k -> new HashMap<>());
                     DataverseName dataverseName = dataverse.getDataverseName();
-                    if (!dataverses.containsKey(dataverseName)) {
-                        datasets.put(dataverseName, new HashMap<>());
-                        datatypes.put(dataverseName, new HashMap<>());
-                        adapters.put(dataverseName, new HashMap<>());
-                        return dataverses.put(dataverseName, dataverse);
+                    if (!databaseDataverses.containsKey(dataverseName)) {
+                        //TODO(DB): why are we clearing 'datasets', 'datatypes' and 'adapters'? should it be done for db
+                        Map<DataverseName, Map<String, Dataset>> dataverseDatasets =
+                                datasets.computeIfAbsent(databaseName, k -> new HashMap<>());
+                        dataverseDatasets.put(dataverseName, new HashMap<>());
+
+                        Map<DataverseName, Map<String, Datatype>> dataverseDatatypes =
+                                datatypes.computeIfAbsent(databaseName, k -> new HashMap<>());
+                        dataverseDatatypes.put(dataverseName, new HashMap<>());
+
+                        Map<DataverseName, Map<String, DatasourceAdapter>> dataverseAdapters =
+                                adapters.computeIfAbsent(databaseName, k -> new HashMap<>());
+                        dataverseAdapters.put(dataverseName, new HashMap<>());
+
+                        return databaseDataverses.put(dataverseName, dataverse);
                     }
                     return null;
                 }
@@ -191,13 +209,12 @@
                     addIndexIfNotExistsInternal(index);
                 }
 
-                Map<String, Dataset> m = datasets.get(dataset.getDataverseName());
-                if (m == null) {
-                    m = new HashMap<>();
-                    datasets.put(dataset.getDataverseName(), m);
-                }
-                if (!m.containsKey(dataset.getDatasetName())) {
-                    return m.put(dataset.getDatasetName(), dataset);
+                Map<DataverseName, Map<String, Dataset>> databaseDataverses =
+                        datasets.computeIfAbsent(dataset.getDatabaseName(), k -> new HashMap<>());
+                Map<String, Dataset> dataverseDatasets =
+                        databaseDataverses.computeIfAbsent(dataset.getDataverseName(), k -> new HashMap<>());
+                if (!dataverseDatasets.containsKey(dataset.getDatasetName())) {
+                    return dataverseDatasets.put(dataset.getDatasetName(), dataset);
                 }
                 return null;
             }
@@ -212,13 +229,12 @@
 
     public Datatype addDatatypeIfNotExists(Datatype datatype) {
         synchronized (datatypes) {
-            Map<String, Datatype> m = datatypes.get(datatype.getDataverseName());
-            if (m == null) {
-                m = new HashMap<>();
-                datatypes.put(datatype.getDataverseName(), m);
-            }
-            if (!m.containsKey(datatype.getDatatypeName())) {
-                return m.put(datatype.getDatatypeName(), datatype);
+            Map<DataverseName, Map<String, Datatype>> databaseDataverses =
+                    datatypes.computeIfAbsent(datatype.getDatabaseName(), k -> new HashMap<>());
+            Map<String, Datatype> dataverseDatatypes =
+                    databaseDataverses.computeIfAbsent(datatype.getDataverseName(), k -> new HashMap<>());
+            if (!dataverseDatatypes.containsKey(datatype.getDatatypeName())) {
+                return dataverseDatatypes.put(datatype.getDatatypeName(), datatype);
             }
             return null;
         }
@@ -232,15 +248,12 @@
 
     public CompactionPolicy addCompactionPolicyIfNotExists(CompactionPolicy compactionPolicy) {
         synchronized (compactionPolicies) {
-            Map<String, CompactionPolicy> p = compactionPolicies.get(compactionPolicy.getDataverseName());
-            if (p == null) {
-                p = new HashMap<>();
-                p.put(compactionPolicy.getPolicyName(), compactionPolicy);
-                compactionPolicies.put(compactionPolicy.getDataverseName(), p);
-            } else {
-                if (p.get(compactionPolicy.getPolicyName()) == null) {
-                    p.put(compactionPolicy.getPolicyName(), compactionPolicy);
-                }
+            Map<DataverseName, Map<String, CompactionPolicy>> databaseDataverses =
+                    compactionPolicies.computeIfAbsent(compactionPolicy.getDatabaseName(), k -> new HashMap<>());
+            Map<String, CompactionPolicy> dataverseCompactionPolicies =
+                    databaseDataverses.computeIfAbsent(compactionPolicy.getDataverseName(), k -> new HashMap<>());
+            if (!dataverseCompactionPolicies.containsKey(compactionPolicy.getPolicyName())) {
+                return dataverseCompactionPolicies.put(compactionPolicy.getPolicyName(), compactionPolicy);
             }
             return null;
         }
@@ -248,7 +261,12 @@
 
     public CompactionPolicy dropCompactionPolicy(CompactionPolicy compactionPolicy) {
         synchronized (compactionPolicies) {
-            Map<String, CompactionPolicy> p = compactionPolicies.get(compactionPolicy.getDataverseName());
+            Map<DataverseName, Map<String, CompactionPolicy>> databaseDataverses =
+                    compactionPolicies.get(compactionPolicy.getDatabaseName());
+            if (databaseDataverses == null) {
+                return null;
+            }
+            Map<String, CompactionPolicy> p = databaseDataverses.get(compactionPolicy.getDataverseName());
             if (p != null && p.get(compactionPolicy.getPolicyName()) != null) {
                 return p.remove(compactionPolicy.getPolicyName());
             }
@@ -270,7 +288,31 @@
                                                 synchronized (feeds) {
                                                     synchronized (compactionPolicies) {
                                                         synchronized (synonyms) {
-                                                            return databases.remove(database.getDatabaseName());
+                                                            String databaseName = database.getDatabaseName();
+                                                            synonyms.remove(databaseName);
+                                                            compactionPolicies.remove(databaseName);
+                                                            //TODO(DB): how about feedConnections, feedPolicies?
+                                                            feeds.remove(databaseName);
+                                                            libraries.remove(databaseName);
+                                                            adapters.remove(databaseName);
+                                                            fullTextFilters.remove(databaseName);
+                                                            fullTextConfigs.remove(databaseName);
+                                                            datatypes.remove(databaseName);
+                                                            indexes.remove(databaseName);
+                                                            datasets.remove(databaseName);
+                                                            dataverses.remove(databaseName);
+
+                                                            List<FunctionSignature> markedFunctionsForRemoval =
+                                                                    new ArrayList<>();
+                                                            for (FunctionSignature signature : functions.keySet()) {
+                                                                if (signature.getDatabaseName().equals(databaseName)) {
+                                                                    markedFunctionsForRemoval.add(signature);
+                                                                }
+                                                            }
+                                                            for (FunctionSignature signature : markedFunctionsForRemoval) {
+                                                                functions.remove(signature);
+                                                            }
+                                                            return databases.remove(databaseName);
                                                         }
                                                     }
                                                 }
@@ -299,28 +341,74 @@
                                             synchronized (feeds) {
                                                 synchronized (compactionPolicies) {
                                                     synchronized (synonyms) {
-                                                        datasets.remove(dataverse.getDataverseName());
-                                                        indexes.remove(dataverse.getDataverseName());
-                                                        datatypes.remove(dataverse.getDataverseName());
-                                                        adapters.remove(dataverse.getDataverseName());
-                                                        compactionPolicies.remove(dataverse.getDataverseName());
+                                                        String databaseName = dataverse.getDatabaseName();
+                                                        DataverseName dataverseName = dataverse.getDataverseName();
+                                                        Map<DataverseName, Map<String, Dataset>> ds =
+                                                                datasets.get(databaseName);
+                                                        if (ds != null) {
+                                                            ds.remove(dataverseName);
+                                                        }
+                                                        Map<DataverseName, Map<String, Map<String, Index>>> idx =
+                                                                indexes.get(databaseName);
+                                                        if (idx != null) {
+                                                            idx.remove(dataverseName);
+                                                        }
+                                                        Map<DataverseName, Map<String, Datatype>> dt =
+                                                                datatypes.get(databaseName);
+                                                        if (dt != null) {
+                                                            dt.remove(dataverseName);
+                                                        }
+                                                        Map<DataverseName, Map<String, DatasourceAdapter>> ad =
+                                                                adapters.get(databaseName);
+                                                        if (ad != null) {
+                                                            ad.remove(dataverseName);
+                                                        }
+                                                        Map<DataverseName, Map<String, CompactionPolicy>> cp =
+                                                                compactionPolicies.get(databaseName);
+                                                        if (cp != null) {
+                                                            cp.remove(dataverseName);
+                                                        }
+
                                                         List<FunctionSignature> markedFunctionsForRemoval =
                                                                 new ArrayList<>();
                                                         for (FunctionSignature signature : functions.keySet()) {
-                                                            if (signature.getDataverseName()
-                                                                    .equals(dataverse.getDataverseName())) {
+                                                            if (signature.getDatabaseName().equals(databaseName)
+                                                                    && signature.getDataverseName()
+                                                                            .equals(dataverseName)) {
                                                                 markedFunctionsForRemoval.add(signature);
                                                             }
                                                         }
                                                         for (FunctionSignature signature : markedFunctionsForRemoval) {
                                                             functions.remove(signature);
                                                         }
-                                                        fullTextConfigs.remove(dataverse.getDataverseName());
-                                                        fullTextFilters.remove(dataverse.getDataverseName());
-                                                        libraries.remove(dataverse.getDataverseName());
-                                                        feeds.remove(dataverse.getDataverseName());
-                                                        synonyms.remove(dataverse.getDataverseName());
-                                                        return dataverses.remove(dataverse.getDataverseName());
+                                                        Map<DataverseName, Map<String, FullTextConfigMetadataEntity>> ftc =
+                                                                fullTextConfigs.get(databaseName);
+                                                        if (ftc != null) {
+                                                            ftc.remove(dataverseName);
+                                                        }
+                                                        Map<DataverseName, Map<String, FullTextFilterMetadataEntity>> ftf =
+                                                                fullTextFilters.get(databaseName);
+                                                        if (ftf != null) {
+                                                            ftf.remove(dataverseName);
+                                                        }
+                                                        Map<DataverseName, Map<String, Library>> lib =
+                                                                libraries.get(databaseName);
+                                                        if (lib != null) {
+                                                            lib.remove(dataverseName);
+                                                        }
+                                                        //TODO(DB): how about feedConnections, feedPolicies?
+                                                        Map<DataverseName, Map<String, Feed>> fd =
+                                                                feeds.get(databaseName);
+                                                        if (fd != null) {
+                                                            fd.remove(dataverseName);
+                                                        }
+                                                        Map<DataverseName, Map<String, Synonym>> syn =
+                                                                synonyms.get(databaseName);
+                                                        if (syn != null) {
+                                                            syn.remove(dataverseName);
+                                                        }
+                                                        Map<DataverseName, Dataverse> dv = dataverses.get(databaseName);
+                                                        return dv == null ? null : dv.remove(dataverseName);
                                                     }
                                                 }
                                             }
@@ -340,13 +428,20 @@
             synchronized (indexes) {
 
                 //remove the indexes of the dataset from indexes' cache
-                Map<String, Map<String, Index>> datasetMap = indexes.get(dataset.getDataverseName());
-                if (datasetMap != null) {
-                    datasetMap.remove(dataset.getDatasetName());
+                Map<DataverseName, Map<String, Map<String, Index>>> idxDb = indexes.get(dataset.getDatabaseName());
+                if (idxDb != null) {
+                    Map<String, Map<String, Index>> datasetMap = idxDb.get(dataset.getDataverseName());
+                    if (datasetMap != null) {
+                        datasetMap.remove(dataset.getDatasetName());
+                    }
                 }
 
                 //remove the dataset from datasets' cache
-                Map<String, Dataset> m = datasets.get(dataset.getDataverseName());
+                Map<DataverseName, Map<String, Dataset>> dsDb = datasets.get(dataset.getDatabaseName());
+                if (dsDb == null) {
+                    return null;
+                }
+                Map<String, Dataset> m = dsDb.get(dataset.getDataverseName());
                 if (m == null) {
                     return null;
                 }
@@ -357,7 +452,12 @@
 
     public Index dropIndex(Index index) {
         synchronized (indexes) {
-            Map<String, Map<String, Index>> datasetMap = indexes.get(index.getDataverseName());
+            Map<DataverseName, Map<String, Map<String, Index>>> databaseDataverses =
+                    indexes.get(index.getDatabaseName());
+            if (databaseDataverses == null) {
+                return null;
+            }
+            Map<String, Map<String, Index>> datasetMap = databaseDataverses.get(index.getDataverseName());
             if (datasetMap == null) {
                 return null;
             }
@@ -372,7 +472,11 @@
 
     public Datatype dropDatatype(Datatype datatype) {
         synchronized (datatypes) {
-            Map<String, Datatype> m = datatypes.get(datatype.getDataverseName());
+            Map<DataverseName, Map<String, Datatype>> databaseDataverses = datatypes.get(datatype.getDatabaseName());
+            if (databaseDataverses == null) {
+                return null;
+            }
+            Map<String, Datatype> m = databaseDataverses.get(datatype.getDataverseName());
             if (m == null) {
                 return null;
             }
@@ -386,25 +490,37 @@
         }
     }
 
-    public Dataverse getDataverse(DataverseName dataverseName) {
-        synchronized (dataverses) {
-            return dataverses.get(dataverseName);
+    public Database getDatabase(String databaseName) {
+        synchronized (databases) {
+            return databases.get(databaseName);
         }
     }
 
-    public Dataset getDataset(DataverseName dataverseName, String datasetName) {
+    public Dataverse getDataverse(String databaseName, DataverseName dataverseName) {
+        synchronized (dataverses) {
+            Map<DataverseName, Dataverse> db = dataverses.get(databaseName);
+            return db == null ? null : db.get(dataverseName);
+        }
+    }
+
+    public Dataset getDataset(String databaseName, DataverseName dataverseName, String datasetName) {
         synchronized (datasets) {
-            Map<String, Dataset> m = datasets.get(dataverseName);
-            if (m == null) {
+            Map<DataverseName, Map<String, Dataset>> db = datasets.get(databaseName);
+            if (db == null) {
                 return null;
             }
-            return m.get(datasetName);
+            Map<String, Dataset> dv = db.get(dataverseName);
+            return dv == null ? null : dv.get(datasetName);
         }
     }
 
-    public Index getIndex(DataverseName dataverseName, String datasetName, String indexName) {
+    public Index getIndex(String databaseName, DataverseName dataverseName, String datasetName, String indexName) {
         synchronized (indexes) {
-            Map<String, Map<String, Index>> datasetMap = indexes.get(dataverseName);
+            Map<DataverseName, Map<String, Map<String, Index>>> db = indexes.get(databaseName);
+            if (db == null) {
+                return null;
+            }
+            Map<String, Map<String, Index>> datasetMap = db.get(dataverseName);
             if (datasetMap == null) {
                 return null;
             }
@@ -416,9 +532,13 @@
         }
     }
 
-    public Datatype getDatatype(DataverseName dataverseName, String datatypeName) {
+    public Datatype getDatatype(String databaseName, DataverseName dataverseName, String datatypeName) {
         synchronized (datatypes) {
-            Map<String, Datatype> m = datatypes.get(dataverseName);
+            Map<DataverseName, Map<String, Datatype>> db = datatypes.get(databaseName);
+            if (db == null) {
+                return null;
+            }
+            Map<String, Datatype> m = db.get(dataverseName);
             if (m == null) {
                 return null;
             }
@@ -438,9 +558,14 @@
         }
     }
 
-    public FullTextConfigMetadataEntity getFullTextConfig(DataverseName dataverseName, String configName) {
+    public FullTextConfigMetadataEntity getFullTextConfig(String databaseName, DataverseName dataverseName,
+            String configName) {
         synchronized (fullTextConfigs) {
-            Map<String, FullTextConfigMetadataEntity> m = fullTextConfigs.get(dataverseName);
+            Map<DataverseName, Map<String, FullTextConfigMetadataEntity>> db = fullTextConfigs.get(databaseName);
+            if (db == null) {
+                return null;
+            }
+            Map<String, FullTextConfigMetadataEntity> m = db.get(dataverseName);
             if (m == null) {
                 return null;
             }
@@ -448,9 +573,14 @@
         }
     }
 
-    public FullTextFilterMetadataEntity getFullTextFilter(DataverseName dataverseName, String filterName) {
+    public FullTextFilterMetadataEntity getFullTextFilter(String databaseName, DataverseName dataverseName,
+            String filterName) {
         synchronized (fullTextFilters) {
-            Map<String, FullTextFilterMetadataEntity> m = fullTextFilters.get(dataverseName);
+            Map<DataverseName, Map<String, FullTextFilterMetadataEntity>> db = fullTextFilters.get(databaseName);
+            if (db == null) {
+                return null;
+            }
+            Map<String, FullTextFilterMetadataEntity> m = db.get(dataverseName);
             if (m == null) {
                 return null;
             }
@@ -458,9 +588,13 @@
         }
     }
 
-    public List<Dataset> getDataverseDatasets(DataverseName dataverseName) {
+    public List<Dataset> getDataverseDatasets(String databaseName, DataverseName dataverseName) {
         synchronized (datasets) {
-            Map<String, Dataset> m = datasets.get(dataverseName);
+            Map<DataverseName, Map<String, Dataset>> db = datasets.get(databaseName);
+            if (db == null) {
+                return Collections.emptyList();
+            }
+            Map<String, Dataset> m = db.get(dataverseName);
             if (m == null) {
                 return Collections.emptyList();
             }
@@ -468,9 +602,17 @@
         }
     }
 
-    public List<Index> getDatasetIndexes(DataverseName dataverseName, String datasetName) {
-        synchronized (datasets) {
-            Map<String, Index> map = indexes.get(dataverseName).get(datasetName);
+    public List<Index> getDatasetIndexes(String databaseName, DataverseName dataverseName, String datasetName) {
+        synchronized (indexes) {
+            Map<DataverseName, Map<String, Map<String, Index>>> db = indexes.get(databaseName);
+            if (db == null) {
+                return Collections.emptyList();
+            }
+            Map<String, Map<String, Index>> dv = db.get(dataverseName);
+            if (dv == null) {
+                return Collections.emptyList();
+            }
+            Map<String, Index> map = dv.get(datasetName);
             if (map == null) {
                 return Collections.emptyList();
             }
@@ -519,13 +661,16 @@
     }
 
     public FullTextFilterMetadataEntity addFullTextFilterIfNotExists(FullTextFilterMetadataEntity filter) {
+        String databaseName = filter.getFullTextFilter().getDatabaseName();
         DataverseName dataverseName = filter.getFullTextFilter().getDataverseName();
         String filterName = filter.getFullTextFilter().getName();
         synchronized (fullTextFilters) {
-            Map<String, FullTextFilterMetadataEntity> m = fullTextFilters.get(dataverseName);
+            Map<DataverseName, Map<String, FullTextFilterMetadataEntity>> databaseDataverses =
+                    fullTextFilters.computeIfAbsent(databaseName, k -> new HashMap<>());
+            Map<String, FullTextFilterMetadataEntity> m = databaseDataverses.get(dataverseName);
             if (m == null) {
                 m = new HashMap<>();
-                fullTextFilters.put(dataverseName, m);
+                databaseDataverses.put(dataverseName, m);
             }
             if (!m.containsKey(filterName)) {
                 return m.put(filterName, filter);
@@ -538,7 +683,12 @@
         DataverseName dataverseName = filterMetadataEntity.getFullTextFilter().getDataverseName();
         String filterName = filterMetadataEntity.getFullTextFilter().getName();
         synchronized (fullTextFilters) {
-            Map<String, FullTextFilterMetadataEntity> m = fullTextFilters.get(dataverseName);
+            Map<DataverseName, Map<String, FullTextFilterMetadataEntity>> databaseDataverses =
+                    fullTextFilters.get(filterMetadataEntity.getFullTextFilter().getDatabaseName());
+            if (databaseDataverses == null) {
+                return null;
+            }
+            Map<String, FullTextFilterMetadataEntity> m = databaseDataverses.get(dataverseName);
             if (m == null) {
                 return null;
             }
@@ -549,13 +699,16 @@
     public FullTextConfigMetadataEntity addFullTextConfigIfNotExists(
             FullTextConfigMetadataEntity configMetadataEntity) {
         FullTextConfigDescriptor config = configMetadataEntity.getFullTextConfig();
+        String databaseName = config.getDatabaseName();
         DataverseName dataverseName = config.getDataverseName();
         String configName = config.getName();
         synchronized (fullTextConfigs) {
-            Map<String, FullTextConfigMetadataEntity> m = fullTextConfigs.get(dataverseName);
+            Map<DataverseName, Map<String, FullTextConfigMetadataEntity>> databaseDataverses =
+                    fullTextConfigs.computeIfAbsent(databaseName, k -> new HashMap<>());
+            Map<String, FullTextConfigMetadataEntity> m = databaseDataverses.get(dataverseName);
             if (m == null) {
                 m = new HashMap<>();
-                fullTextConfigs.put(dataverseName, m);
+                databaseDataverses.put(dataverseName, m);
             }
             if (!m.containsKey(configName)) {
                 return m.put(configName, configMetadataEntity);
@@ -569,7 +722,12 @@
         DataverseName dataverseName = config.getDataverseName();
         String configName = config.getName();
         synchronized (fullTextConfigs) {
-            Map<String, FullTextConfigMetadataEntity> m = fullTextConfigs.get(dataverseName);
+            Map<DataverseName, Map<String, FullTextConfigMetadataEntity>> databaseDataverses =
+                    fullTextConfigs.get(config.getDatabaseName());
+            if (databaseDataverses == null) {
+                return null;
+            }
+            Map<String, FullTextConfigMetadataEntity> m = databaseDataverses.get(dataverseName);
             if (m == null) {
                 return null;
             }
@@ -579,11 +737,13 @@
 
     public Object addFeedPolicyIfNotExists(FeedPolicyEntity feedPolicy) {
         synchronized (feedPolicies) {
-            Map<String, FeedPolicyEntity> p = feedPolicies.get(feedPolicy.getDataverseName());
+            Map<DataverseName, Map<String, FeedPolicyEntity>> databaseDataverses =
+                    feedPolicies.computeIfAbsent(feedPolicy.getDatabaseName(), k -> new HashMap<>());
+            Map<String, FeedPolicyEntity> p = databaseDataverses.get(feedPolicy.getDataverseName());
             if (p == null) {
                 p = new HashMap<>();
                 p.put(feedPolicy.getPolicyName(), feedPolicy);
-                feedPolicies.put(feedPolicy.getDataverseName(), p);
+                databaseDataverses.put(feedPolicy.getDataverseName(), p);
             } else {
                 if (p.get(feedPolicy.getPolicyName()) == null) {
                     p.put(feedPolicy.getPolicyName(), feedPolicy);
@@ -595,7 +755,12 @@
 
     public Object dropFeedPolicy(FeedPolicyEntity feedPolicy) {
         synchronized (feedPolicies) {
-            Map<String, FeedPolicyEntity> p = feedPolicies.get(feedPolicy.getDataverseName());
+            Map<DataverseName, Map<String, FeedPolicyEntity>> databaseDataverses =
+                    feedPolicies.get(feedPolicy.getDatabaseName());
+            if (databaseDataverses == null) {
+                return null;
+            }
+            Map<String, FeedPolicyEntity> p = databaseDataverses.get(feedPolicy.getDataverseName());
             if (p != null && p.get(feedPolicy.getPolicyName()) != null) {
                 return p.remove(feedPolicy.getPolicyName()).getPolicyName();
             }
@@ -605,11 +770,13 @@
 
     public DatasourceAdapter addAdapterIfNotExists(DatasourceAdapter adapter) {
         synchronized (adapters) {
+            Map<DataverseName, Map<String, DatasourceAdapter>> databaseDataverses =
+                    adapters.computeIfAbsent(adapter.getAdapterIdentifier().getDatabaseName(), k -> new HashMap<>());
             Map<String, DatasourceAdapter> adaptersInDataverse =
-                    adapters.get(adapter.getAdapterIdentifier().getDataverseName());
+                    databaseDataverses.get(adapter.getAdapterIdentifier().getDataverseName());
             if (adaptersInDataverse == null) {
                 adaptersInDataverse = new HashMap<>();
-                adapters.put(adapter.getAdapterIdentifier().getDataverseName(), adaptersInDataverse);
+                databaseDataverses.put(adapter.getAdapterIdentifier().getDataverseName(), adaptersInDataverse);
             }
             DatasourceAdapter adapterObject = adaptersInDataverse.get(adapter.getAdapterIdentifier().getName());
             if (adapterObject == null) {
@@ -621,8 +788,13 @@
 
     public DatasourceAdapter dropAdapterIfExists(DatasourceAdapter adapter) {
         synchronized (adapters) {
+            Map<DataverseName, Map<String, DatasourceAdapter>> databaseDataverses =
+                    adapters.get(adapter.getAdapterIdentifier().getDatabaseName());
+            if (databaseDataverses == null) {
+                return null;
+            }
             Map<String, DatasourceAdapter> adaptersInDataverse =
-                    adapters.get(adapter.getAdapterIdentifier().getDataverseName());
+                    databaseDataverses.get(adapter.getAdapterIdentifier().getDataverseName());
             if (adaptersInDataverse != null) {
                 return adaptersInDataverse.remove(adapter.getAdapterIdentifier().getName());
             }
@@ -632,12 +804,14 @@
 
     public Library addLibraryIfNotExists(Library library) {
         synchronized (libraries) {
-            Map<String, Library> libsInDataverse = libraries.get(library.getDataverseName());
+            Map<DataverseName, Map<String, Library>> databaseDataverses =
+                    libraries.computeIfAbsent(library.getDatabaseName(), k -> new HashMap<>());
+            Map<String, Library> libsInDataverse = databaseDataverses.get(library.getDataverseName());
             boolean needToAdd = (libsInDataverse == null || libsInDataverse.get(library.getName()) != null);
             if (needToAdd) {
                 if (libsInDataverse == null) {
                     libsInDataverse = new HashMap<>();
-                    libraries.put(library.getDataverseName(), libsInDataverse);
+                    databaseDataverses.put(library.getDataverseName(), libsInDataverse);
                 }
                 return libsInDataverse.put(library.getName(), library);
             }
@@ -647,7 +821,11 @@
 
     public Library dropLibrary(Library library) {
         synchronized (libraries) {
-            Map<String, Library> librariesInDataverse = libraries.get(library.getDataverseName());
+            Map<DataverseName, Map<String, Library>> databaseDataverses = libraries.get(library.getDatabaseName());
+            if (databaseDataverses == null) {
+                return null;
+            }
+            Map<String, Library> librariesInDataverse = databaseDataverses.get(library.getDataverseName());
             if (librariesInDataverse != null) {
                 return librariesInDataverse.remove(library.getName());
             }
@@ -657,10 +835,13 @@
 
     public FeedConnection addFeedConnectionIfNotExists(FeedConnection feedConnection) {
         synchronized (feedConnections) {
-            Map<String, FeedConnection> feedConnsInDataverse = feedConnections.get(feedConnection.getDataverseName());
+            Map<DataverseName, Map<String, FeedConnection>> databaseDataverses =
+                    feedConnections.computeIfAbsent(feedConnection.getDatabaseName(), k -> new HashMap<>());
+            Map<String, FeedConnection> feedConnsInDataverse =
+                    databaseDataverses.get(feedConnection.getDataverseName());
             if (feedConnsInDataverse == null) {
                 feedConnsInDataverse = new HashMap<>();
-                feedConnections.put(feedConnection.getDataverseName(), feedConnsInDataverse);
+                databaseDataverses.put(feedConnection.getDataverseName(), feedConnsInDataverse);
             }
             return feedConnsInDataverse.put(feedConnection.getConnectionId(), feedConnection);
         }
@@ -668,7 +849,13 @@
 
     public FeedConnection dropFeedConnection(FeedConnection feedConnection) {
         synchronized (feedConnections) {
-            Map<String, FeedConnection> feedConnsInDataverse = feedConnections.get(feedConnection.getDataverseName());
+            Map<DataverseName, Map<String, FeedConnection>> databaseDataverses =
+                    feedConnections.get(feedConnection.getDatabaseName());
+            if (databaseDataverses == null) {
+                return null;
+            }
+            Map<String, FeedConnection> feedConnsInDataverse =
+                    databaseDataverses.get(feedConnection.getDataverseName());
             if (feedConnsInDataverse != null) {
                 return feedConnsInDataverse.remove(feedConnection.getConnectionId());
             } else {
@@ -679,10 +866,12 @@
 
     public Feed addFeedIfNotExists(Feed feed) {
         synchronized (feeds) {
-            Map<String, Feed> feedsInDataverse = feeds.get(feed.getDataverseName());
+            Map<DataverseName, Map<String, Feed>> databaseDataverses =
+                    feeds.computeIfAbsent(feed.getDatabaseName(), k -> new HashMap<>());
+            Map<String, Feed> feedsInDataverse = databaseDataverses.get(feed.getDataverseName());
             if (feedsInDataverse == null) {
                 feedsInDataverse = new HashMap<>();
-                feeds.put(feed.getDataverseName(), feedsInDataverse);
+                databaseDataverses.put(feed.getDataverseName(), feedsInDataverse);
             }
             return feedsInDataverse.put(feed.getFeedName(), feed);
         }
@@ -690,7 +879,11 @@
 
     public Feed dropFeedIfExists(Feed feed) {
         synchronized (feeds) {
-            Map<String, Feed> feedsInDataverse = feeds.get(feed.getDataverseName());
+            Map<DataverseName, Map<String, Feed>> databaseDataverses = feeds.get(feed.getDatabaseName());
+            if (databaseDataverses == null) {
+                return null;
+            }
+            Map<String, Feed> feedsInDataverse = databaseDataverses.get(feed.getDataverseName());
             if (feedsInDataverse != null) {
                 return feedsInDataverse.remove(feed.getFeedName());
             }
@@ -700,18 +893,25 @@
 
     public Synonym addSynonymIfNotExists(Synonym synonym) {
         synchronized (synonyms) {
-            Map<String, Synonym> synonymsInDataverse = synonyms.get(synonym.getDataverseName());
+            Map<DataverseName, Map<String, Synonym>> databaseDataverses =
+                    synonyms.computeIfAbsent(synonym.getDatabaseName(), k -> new HashMap<>());
+            Map<String, Synonym> synonymsInDataverse = databaseDataverses.get(synonym.getDataverseName());
             if (synonymsInDataverse == null) {
                 synonymsInDataverse = new HashMap<>();
-                synonyms.put(synonym.getDataverseName(), synonymsInDataverse);
+                databaseDataverses.put(synonym.getDataverseName(), synonymsInDataverse);
             }
+            //TODO(DB): this actually overwrites if existing
             return synonymsInDataverse.put(synonym.getSynonymName(), synonym);
         }
     }
 
     public Synonym dropSynonym(Synonym synonym) {
         synchronized (synonyms) {
-            Map<String, Synonym> synonymsInDataverse = synonyms.get(synonym.getDataverseName());
+            Map<DataverseName, Map<String, Synonym>> databaseDataverses = synonyms.get(synonym.getDatabaseName());
+            if (databaseDataverses == null) {
+                return null;
+            }
+            Map<String, Synonym> synonymsInDataverse = databaseDataverses.get(synonym.getDataverseName());
             if (synonymsInDataverse != null) {
                 return synonymsInDataverse.remove(synonym.getSynonymName());
             }
@@ -720,10 +920,12 @@
     }
 
     private Index addIndexIfNotExistsInternal(Index index) {
-        Map<String, Map<String, Index>> datasetMap = indexes.get(index.getDataverseName());
+        Map<DataverseName, Map<String, Map<String, Index>>> databaseDataverses =
+                indexes.computeIfAbsent(index.getDatabaseName(), k -> new HashMap<>());
+        Map<String, Map<String, Index>> datasetMap = databaseDataverses.get(index.getDataverseName());
         if (datasetMap == null) {
             datasetMap = new HashMap<>();
-            indexes.put(index.getDataverseName(), datasetMap);
+            databaseDataverses.put(index.getDataverseName(), datasetMap);
         }
         Map<String, Index> indexMap = datasetMap.get(index.getDatasetName());
         if (indexMap == null) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index f2d9883..040b489 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -171,6 +171,37 @@
     }
 
     @Override
+    public Database getDatabase(MetadataTransactionContext ctx, String databaseName) throws AlgebricksException {
+        Objects.requireNonNull(databaseName);
+        // first look in the context to see if this transaction created the
+        // requested database itself (but the database is still uncommitted)
+        Database database = ctx.getDatabase(databaseName);
+        if (database != null) {
+            // don't add this database to the cache, since it is still uncommitted
+            return database;
+        }
+        if (ctx.databaseIsDropped(databaseName)) {
+            // database has been dropped by this transaction but could still be in the cache
+            return null;
+        }
+        database = cache.getDatabase(databaseName);
+        if (database != null) {
+            // database is already in the cache, don't add it again
+            return database;
+        }
+        try {
+            database = metadataNode.getDatabase(ctx.getTxnId(), databaseName);
+        } catch (RemoteException e) {
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
+        }
+        // we fetched the database from the MetadataNode. add it to the cache when this transaction commits
+        if (database != null) {
+            ctx.addDatabase(database);
+        }
+        return database;
+    }
+
+    @Override
     public void addDatabase(MetadataTransactionContext ctx, Database database) throws AlgebricksException {
         try {
             metadataNode.addDatabase(ctx.getTxnId(), database);
@@ -240,18 +271,18 @@
         Objects.requireNonNull(database);
         // First look in the context to see if this transaction created the
         // requested dataverse itself (but the dataverse is still uncommitted).
-        Dataverse dataverse = ctx.getDataverse(dataverseName);
+        Dataverse dataverse = ctx.getDataverse(database, dataverseName);
         if (dataverse != null) {
             // Don't add this dataverse to the cache, since it is still
             // uncommitted.
             return dataverse;
         }
-        if (ctx.dataverseIsDropped(dataverseName)) {
+        if (ctx.dataverseIsDropped(database, dataverseName)) {
             // Dataverse has been dropped by this transaction but could still be
             // in the cache.
             return null;
         }
-        dataverse = cache.getDataverse(dataverseName);
+        dataverse = cache.getDataverse(database, dataverseName);
         if (dataverse != null) {
             // Dataverse is already in the cache, don't add it again.
             return dataverse;
@@ -316,19 +347,19 @@
         Objects.requireNonNull(database);
         // First look in the context to see if this transaction created the
         // requested dataset itself (but the dataset is still uncommitted).
-        Dataset dataset = ctx.getDataset(dataverseName, datasetName);
+        Dataset dataset = ctx.getDataset(database, dataverseName, datasetName);
         if (dataset != null) {
             // Don't add this dataverse to the cache, since it is still
             // uncommitted.
             return dataset;
         }
-        if (ctx.datasetIsDropped(dataverseName, datasetName)) {
+        if (ctx.datasetIsDropped(database, dataverseName, datasetName)) {
             // Dataset has been dropped by this transaction but could still be
             // in the cache.
             return null;
         }
 
-        dataset = cache.getDataset(dataverseName, datasetName);
+        dataset = cache.getDataset(database, dataverseName, datasetName);
         if (dataset != null) {
             // Dataset is already in the cache, don't add it again.
             return dataset;
@@ -420,19 +451,19 @@
         Objects.requireNonNull(database);
         // First look in the context to see if this transaction created the
         // requested datatype itself (but the datatype is still uncommitted).
-        Datatype datatype = ctx.getDatatype(dataverseName, datatypeName);
+        Datatype datatype = ctx.getDatatype(database, dataverseName, datatypeName);
         if (datatype != null) {
             // Don't add this dataverse to the cache, since it is still
             // uncommitted.
             return datatype;
         }
-        if (ctx.datatypeIsDropped(dataverseName, datatypeName)) {
+        if (ctx.datatypeIsDropped(database, dataverseName, datatypeName)) {
             // Datatype has been dropped by this transaction but could still be
             // in the cache.
             return null;
         }
 
-        datatype = cache.getDatatype(dataverseName, datatypeName);
+        datatype = cache.getDatatype(database, dataverseName, datatypeName);
         if (datatype != null) {
             // Datatype is already in the cache, don't add it again.
             return datatype;
@@ -488,20 +519,20 @@
         Objects.requireNonNull(database);
         // First look in the context to see if this transaction created the
         // requested index itself (but the index is still uncommitted).
-        Index index = ctx.getIndex(dataverseName, datasetName, indexName);
+        Index index = ctx.getIndex(database, dataverseName, datasetName, indexName);
         if (index != null) {
             // Don't add this index to the cache, since it is still
             // uncommitted.
             return index;
         }
 
-        if (ctx.indexIsDropped(dataverseName, datasetName, indexName)) {
+        if (ctx.indexIsDropped(database, dataverseName, datasetName, indexName)) {
             // Index has been dropped by this transaction but could still be
             // in the cache.
             return null;
         }
 
-        index = cache.getIndex(dataverseName, datasetName, indexName);
+        index = cache.getIndex(database, dataverseName, datasetName, indexName);
         if (index != null) {
             // Index is already in the cache, don't add it again.
             return index;
@@ -632,7 +663,7 @@
             // in the cache.
             return null;
         }
-        if (ctx.getDataverse(functionSignature.getDataverseName()) != null) {
+        if (ctx.getDataverse(functionSignature.getDatabaseName(), functionSignature.getDataverseName()) != null) {
             // This transaction has dropped and subsequently created the same
             // dataverse.
             return null;
@@ -695,26 +726,26 @@
         Objects.requireNonNull(database);
         // First look in the context to see if this transaction created the
         // requested full-text filter itself (but the full-text filter is still uncommitted).
-        FullTextFilterMetadataEntity filter = ctx.getFullTextFilter(dataverseName, filterName);
+        FullTextFilterMetadataEntity filter = ctx.getFullTextFilter(database, dataverseName, filterName);
         if (filter != null) {
             // Don't add this filter to the cache, since it is still
             // uncommitted.
             return filter;
         }
 
-        if (ctx.fullTextFilterIsDropped(dataverseName, filterName)) {
+        if (ctx.fullTextFilterIsDropped(database, dataverseName, filterName)) {
             // Filter has been dropped by this transaction but could still be
             // in the cache.
             return null;
         }
 
-        if (ctx.getDataverse(dataverseName) != null) {
+        if (ctx.getDataverse(database, dataverseName) != null) {
             // This transaction has dropped and subsequently created the same
             // dataverse.
             return null;
         }
 
-        filter = cache.getFullTextFilter(dataverseName, filterName);
+        filter = cache.getFullTextFilter(database, dataverseName, filterName);
         if (filter != null) {
             // filter is already in the cache, don't add it again.
             return filter;
@@ -754,26 +785,26 @@
         Objects.requireNonNull(database);
         // First look in the context to see if this transaction created the
         // requested full-text config itself (but the full-text config is still uncommitted).
-        FullTextConfigMetadataEntity configMetadataEntity = ctx.getFullTextConfig(dataverseName, configName);
+        FullTextConfigMetadataEntity configMetadataEntity = ctx.getFullTextConfig(database, dataverseName, configName);
         if (configMetadataEntity != null) {
             // Don't add this config to the cache, since it is still
             // uncommitted.
             return configMetadataEntity;
         }
 
-        if (ctx.fullTextConfigIsDropped(dataverseName, configName)) {
+        if (ctx.fullTextConfigIsDropped(database, dataverseName, configName)) {
             // config has been dropped by this transaction but could still be
             // in the cache.
             return null;
         }
 
-        if (ctx.getDataverse(dataverseName) != null) {
+        if (ctx.getDataverse(database, dataverseName) != null) {
             // This transaction has dropped and subsequently created the same
             // dataverse.
             return null;
         }
 
-        configMetadataEntity = cache.getFullTextConfig(dataverseName, configName);
+        configMetadataEntity = cache.getFullTextConfig(database, dataverseName, configName);
         if (configMetadataEntity != null) {
             // config is already in the cache, don't add it again.
             return configMetadataEntity;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 7d7289d..e7d3a2f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -1012,6 +1012,23 @@
     }
 
     @Override
+    public Database getDatabase(TxnId txnId, String databaseName) throws AlgebricksException {
+        try {
+            ITupleReference searchKey = createTuple(databaseName);
+            DatabaseTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDatabaseTupleTranslator(false);
+            IValueExtractor<Database> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
+            List<Database> results = new ArrayList<>();
+            searchIndex(txnId, mdIndexesProvider.getDatabaseEntity().getIndex(), searchKey, valueExtractor, results);
+            if (results.isEmpty()) {
+                return null;
+            }
+            return results.get(0);
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    @Override
     public Dataverse getDataverse(TxnId txnId, String database, DataverseName dataverseName)
             throws AlgebricksException {
         try {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
index 88fd202..71015fc 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
@@ -142,6 +142,27 @@
         logAndApply(new MetadataLogicalOperation(compactionPolicy, true));
     }
 
+    public void addLibrary(Library library) {
+        droppedCache.dropLibrary(library);
+        logAndApply(new MetadataLogicalOperation(library, true));
+    }
+
+    public void addFeedPolicy(FeedPolicyEntity feedPolicy) {
+        droppedCache.dropFeedPolicy(feedPolicy);
+        logAndApply(new MetadataLogicalOperation(feedPolicy, true));
+
+    }
+
+    public void addFeed(Feed feed) {
+        droppedCache.dropFeedIfExists(feed);
+        logAndApply(new MetadataLogicalOperation(feed, true));
+    }
+
+    public void addFeedConnection(FeedConnection feedConnection) {
+        droppedCache.dropFeedConnection(feedConnection);
+        logAndApply(new MetadataLogicalOperation(feedConnection, true));
+    }
+
     public void dropDataset(String database, DataverseName dataverseName, String datasetName) {
         Dataset dataset = new Dataset(database, dataverseName, datasetName, null, null, null, null, null, null, null,
                 null, -1, MetadataUtil.PENDING_NO_OP);
@@ -168,11 +189,6 @@
         logAndApply(new MetadataLogicalOperation(dataverse, false));
     }
 
-    public void addLibrary(Library library) {
-        droppedCache.dropLibrary(library);
-        logAndApply(new MetadataLogicalOperation(library, true));
-    }
-
     public void dropDataDatatype(String database, DataverseName dataverseName, String datatypeName) {
         Datatype datatype = new Datatype(database, dataverseName, datatypeName, null, false);
         droppedCache.addDatatypeIfNotExists(datatype);
@@ -233,80 +249,11 @@
         logAndApply(new MetadataLogicalOperation(library, false));
     }
 
-    public void logAndApply(MetadataLogicalOperation op) {
-        opLog.add(op);
-        doOperation(op);
-    }
-
-    public boolean dataverseIsDropped(DataverseName dataverseName) {
-        return droppedCache.getDataverse(dataverseName) != null;
-    }
-
-    public boolean datasetIsDropped(DataverseName dataverseName, String datasetName) {
-        if (droppedCache.getDataverse(dataverseName) != null) {
-            return true;
-        }
-        return droppedCache.getDataset(dataverseName, datasetName) != null;
-    }
-
-    public boolean indexIsDropped(DataverseName dataverseName, String datasetName, String indexName) {
-        if (droppedCache.getDataverse(dataverseName) != null) {
-            return true;
-        }
-        if (droppedCache.getDataset(dataverseName, datasetName) != null) {
-            return true;
-        }
-        return droppedCache.getIndex(dataverseName, datasetName, indexName) != null;
-    }
-
-    public boolean datatypeIsDropped(DataverseName dataverseName, String datatypeName) {
-        if (droppedCache.getDataverse(dataverseName) != null) {
-            return true;
-        }
-        return droppedCache.getDatatype(dataverseName, datatypeName) != null;
-    }
-
-    public boolean nodeGroupIsDropped(String nodeGroup) {
-        return droppedCache.getNodeGroup(nodeGroup) != null;
-    }
-
-    public boolean functionIsDropped(FunctionSignature functionSignature) {
-        return droppedCache.getFunction(functionSignature) != null;
-    }
-
-    public boolean fullTextConfigIsDropped(DataverseName dataverseName, String configName) {
-        return droppedCache.getFullTextConfig(dataverseName, configName) != null;
-    }
-
-    public boolean fullTextFilterIsDropped(DataverseName dataverseName, String filterName) {
-        return droppedCache.getFullTextFilter(dataverseName, filterName) != null;
-    }
-
-    public List<MetadataLogicalOperation> getOpLog() {
-        return opLog;
-    }
-
-    public void addFeedPolicy(FeedPolicyEntity feedPolicy) {
-        droppedCache.dropFeedPolicy(feedPolicy);
-        logAndApply(new MetadataLogicalOperation(feedPolicy, true));
-
-    }
-
-    public void addFeed(Feed feed) {
-        droppedCache.dropFeedIfExists(feed);
-        logAndApply(new MetadataLogicalOperation(feed, true));
-    }
-
     public void dropFeed(Feed feed) {
         droppedCache.addFeedIfNotExists(feed);
         logAndApply(new MetadataLogicalOperation(feed, false));
     }
 
-    public void addFeedConnection(FeedConnection feedConnection) {
-        droppedCache.dropFeedConnection(feedConnection);
-        logAndApply(new MetadataLogicalOperation(feedConnection, true));
-    }
-
     public void dropFeedConnection(String database, DataverseName dataverseName, String feedName, String datasetName) {
         FeedConnection feedConnection =
                 new FeedConnection(database, dataverseName, feedName, datasetName, null, null, null, null);
@@ -314,6 +261,79 @@
         logAndApply(new MetadataLogicalOperation(feedConnection, false));
     }
 
+    public void logAndApply(MetadataLogicalOperation op) {
+        opLog.add(op);
+        doOperation(op);
+    }
+
+    public boolean databaseIsDropped(String databaseName) {
+        return droppedCache.getDatabase(databaseName) != null;
+    }
+
+    public boolean dataverseIsDropped(String databaseName, DataverseName dataverseName) {
+        if (droppedCache.getDatabase(databaseName) != null) {
+            return true;
+        }
+        return droppedCache.getDataverse(databaseName, dataverseName) != null;
+    }
+
+    public boolean datasetIsDropped(String databaseName, DataverseName dataverseName, String datasetName) {
+        if (droppedCache.getDatabase(databaseName) != null) {
+            return true;
+        }
+        if (droppedCache.getDataverse(databaseName, dataverseName) != null) {
+            return true;
+        }
+        return droppedCache.getDataset(databaseName, dataverseName, datasetName) != null;
+    }
+
+    public boolean indexIsDropped(String databaseName, DataverseName dataverseName, String datasetName,
+            String indexName) {
+        if (droppedCache.getDatabase(databaseName) != null) {
+            return true;
+        }
+        if (droppedCache.getDataverse(databaseName, dataverseName) != null) {
+            return true;
+        }
+        if (droppedCache.getDataset(databaseName, dataverseName, datasetName) != null) {
+            return true;
+        }
+        return droppedCache.getIndex(databaseName, dataverseName, datasetName, indexName) != null;
+    }
+
+    public boolean datatypeIsDropped(String databaseName, DataverseName dataverseName, String datatypeName) {
+        if (droppedCache.getDatabase(databaseName) != null) {
+            return true;
+        }
+        if (droppedCache.getDataverse(databaseName, dataverseName) != null) {
+            return true;
+        }
+        return droppedCache.getDatatype(databaseName, dataverseName, datatypeName) != null;
+    }
+
+    public boolean nodeGroupIsDropped(String nodeGroup) {
+        return droppedCache.getNodeGroup(nodeGroup) != null;
+    }
+
+    public boolean functionIsDropped(FunctionSignature functionSignature) {
+        //TODO(DB): check database and dataverse first?
+        return droppedCache.getFunction(functionSignature) != null;
+    }
+
+    public boolean fullTextConfigIsDropped(String databaseName, DataverseName dataverseName, String configName) {
+        //TODO(DB): check database and dataverse first?
+        return droppedCache.getFullTextConfig(databaseName, dataverseName, configName) != null;
+    }
+
+    public boolean fullTextFilterIsDropped(String databaseName, DataverseName dataverseName, String filterName) {
+        //TODO(DB): check database and dataverse first?
+        return droppedCache.getFullTextFilter(databaseName, dataverseName, filterName) != null;
+    }
+
+    public List<MetadataLogicalOperation> getOpLog() {
+        return opLog;
+    }
+
     @Override
     public void clear() {
         super.clear();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index 074a5cd..226dd70 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -90,6 +90,8 @@
      */
     void abortTransaction(MetadataTransactionContext ctx) throws ACIDException, RemoteException;
 
+    Database getDatabase(MetadataTransactionContext ctx, String database) throws AlgebricksException;
+
     void addDatabase(MetadataTransactionContext ctx, Database database) throws AlgebricksException;
 
     void dropDatabase(MetadataTransactionContext ctx, String databaseName) throws AlgebricksException;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index c167dd6..b738533 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -83,6 +83,8 @@
 
     void dropDatabase(TxnId txnId, String databaseName) throws AlgebricksException, RemoteException;
 
+    Database getDatabase(TxnId txnId, String databaseName) throws AlgebricksException, RemoteException;
+
     /**
      * Inserts a new dataverse into the metadata, acquiring local locks on behalf of
      * the given transaction id.
@@ -98,17 +100,19 @@
     void addDataverse(TxnId txnId, Dataverse dataverse) throws AlgebricksException, RemoteException;
 
     /**
-     * Retrieves all dataverses, acquiring local locks on behalf of the given
-     * transaction id.
+     * Deletes the dataverse with given name, and all it's associated datasets,
+     * indexes, and types, acquiring local locks on behalf of the given transaction
+     * id.
      *
      * @param txnId
      *            A globally unique id for an active metadata transaction.
-     * @return A list of dataverse instances.
+     * @param dataverseName
+     *            Name of the dataverse to drop.
      * @throws AlgebricksException
      *             For example, if the dataverse does not exist.
-     * @throws RemoteException remote exception
      */
-    List<Dataverse> getDataverses(TxnId txnId) throws AlgebricksException, RemoteException;
+    void dropDataverse(TxnId txnId, String database, DataverseName dataverseName)
+            throws AlgebricksException, RemoteException;
 
     /**
      * Retrieves a dataverse with given name, acquiring local locks on behalf of the
@@ -127,6 +131,19 @@
             throws AlgebricksException, RemoteException;
 
     /**
+     * Retrieves all dataverses, acquiring local locks on behalf of the given
+     * transaction id.
+     *
+     * @param txnId
+     *            A globally unique id for an active metadata transaction.
+     * @return A list of dataverse instances.
+     * @throws AlgebricksException
+     *             For example, if the dataverse does not exist.
+     * @throws RemoteException remote exception
+     */
+    List<Dataverse> getDataverses(TxnId txnId) throws AlgebricksException, RemoteException;
+
+    /**
      * Retrieves all datasets belonging to the given dataverse, acquiring local
      * locks on behalf of the given transaction id.
      *
@@ -142,21 +159,6 @@
             throws AlgebricksException, RemoteException;
 
     /**
-     * Deletes the dataverse with given name, and all it's associated datasets,
-     * indexes, and types, acquiring local locks on behalf of the given transaction
-     * id.
-     *
-     * @param txnId
-     *            A globally unique id for an active metadata transaction.
-     * @param dataverseName
-     *            Name of the dataverse to drop.
-     * @throws AlgebricksException
-     *             For example, if the dataverse does not exist.
-     */
-    void dropDataverse(TxnId txnId, String database, DataverseName dataverseName)
-            throws AlgebricksException, RemoteException;
-
-    /**
      * Returns {@code true} if given dataverse is not empty
      * (i.e. contains any datatypes, datasets or any other entities).
      *  @param txnId