[ASTERIXDB-3259][MTD] Drop database objects when dropping DATABASE

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Drop all metadata objects of a database from the metadata collections
when dropping the database.

- include the database name when checking dependency of metadata
  objects.

Change-Id: Ifd629b9d324716ea1e903292494120e6e768c172
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17836
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index e895465..3871ffb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -699,7 +699,95 @@
     @Override
     public void dropDatabase(TxnId txnId, String databaseName) throws AlgebricksException, RemoteException {
         try {
-            //TODO(DB): delete from other metadata collections
+            //TODO(DB): review
+            confirmDatabaseCanBeDeleted(txnId, databaseName);
+
+            // Drop all feeds and connections in this database.
+            // Feeds may depend on datatypes and adapters
+            List<Feed> databaseFeeds = getDatabaseFeeds(txnId, databaseName);
+            for (Feed feed : databaseFeeds) {
+                List<FeedConnection> feedConnections =
+                        getFeedConnections(txnId, databaseName, feed.getDataverseName(), feed.getFeedName());
+                for (FeedConnection feedConnection : feedConnections) {
+                    dropFeedConnection(txnId, databaseName, feedConnection.getDataverseName(), feed.getFeedName(),
+                            feedConnection.getDatasetName());
+                }
+                dropFeed(txnId, databaseName, feed.getDataverseName(), feed.getFeedName());
+            }
+
+            // Drop all feed ingestion policies in this database.
+            List<FeedPolicyEntity> feedPolicies = getDatabaseFeedPolicies(txnId, databaseName);
+            for (FeedPolicyEntity feedPolicy : feedPolicies) {
+                dropFeedPolicy(txnId, databaseName, feedPolicy.getDataverseName(), feedPolicy.getPolicyName());
+            }
+
+            // Drop all functions in this database.
+            // Functions may depend on libraries, datasets, functions, datatypes, synonyms
+            // As a side effect, acquires an S lock on the 'Function' dataset on behalf of txnId.
+            List<Function> databaseFunctions = getDatabaseFunctions(txnId, databaseName);
+            for (Function function : databaseFunctions) {
+                dropFunction(txnId, function.getSignature(), true);
+            }
+
+            // Drop all adapters in this database.
+            // Adapters depend on libraries.
+            // As a side effect, acquires an S lock on the 'Adapter' dataset on behalf of txnId.
+            List<DatasourceAdapter> databaseAdapters = getDatabaseAdapters(txnId, databaseName);
+            for (DatasourceAdapter adapter : databaseAdapters) {
+                dropAdapter(txnId, databaseName, adapter.getAdapterIdentifier().getDataverseName(),
+                        adapter.getAdapterIdentifier().getName());
+            }
+
+            // Drop all libraries in this database.
+            List<Library> databaseLibraries = getDatabaseLibraries(txnId, databaseName);
+            for (Library lib : databaseLibraries) {
+                dropLibrary(txnId, lib.getDatabaseName(), lib.getDataverseName(), lib.getName());
+            }
+
+            // Drop all synonyms in this database.
+            List<Synonym> databaseSynonyms = getDatabaseSynonyms(txnId, databaseName);
+            for (Synonym synonym : databaseSynonyms) {
+                dropSynonym(txnId, databaseName, synonym.getDataverseName(), synonym.getSynonymName(), true);
+            }
+
+            // Drop all datasets and indexes in this database.
+            // Datasets depend on datatypes
+            List<Dataset> databaseDatasets = getDatabaseDatasets(txnId, databaseName);
+            for (Dataset ds : databaseDatasets) {
+                dropDataset(txnId, databaseName, ds.getDataverseName(), ds.getDatasetName(), true);
+            }
+
+            // Drop full-text configs in this database.
+            // Note that full-text configs are utilized by the index, and we need to always drop index first
+            // and then full-text config
+            List<FullTextConfigMetadataEntity> configMetadataEntities = getDatabaseFullTextConfigs(txnId, databaseName);
+            for (FullTextConfigMetadataEntity configMetadataEntity : configMetadataEntities) {
+                dropFullTextConfigDescriptor(txnId, databaseName,
+                        configMetadataEntity.getFullTextConfig().getDataverseName(),
+                        configMetadataEntity.getFullTextConfig().getName(), true);
+            }
+
+            // Drop full-text filters in this database.
+            // Note that full-text filters are utilized by the full-text configs,
+            // and we need to always drop full-text configs first
+            // and then full-text filter
+            List<FullTextFilterMetadataEntity> filters = getDatabaseFullTextFilters(txnId, databaseName);
+            for (FullTextFilterMetadataEntity filter : filters) {
+                dropFullTextFilterDescriptor(txnId, databaseName, filter.getFullTextFilter().getDataverseName(),
+                        filter.getFullTextFilter().getName(), true);
+            }
+
+            // Drop all types in this database.
+            // As a side effect, acquires an S lock on the 'datatype' dataset on behalf of txnId.
+            List<Datatype> databaseDatatypes = getDatabaseDatatypes(txnId, databaseName);
+            for (Datatype dataType : databaseDatatypes) {
+                forceDropDatatype(txnId, databaseName, dataType.getDataverseName(), dataType.getDatatypeName());
+            }
+
+            List<Dataverse> databaseDataverses = getDatabaseDataverses(txnId, databaseName);
+            for (Dataverse dataverse : databaseDataverses) {
+                forceDropDataverse(txnId, databaseName, dataverse.getDataverseName());
+            }
 
             // delete the database entry from the 'Database' collection
             // as a side effect, acquires an S lock on the 'Database' collection on behalf of txnId
@@ -805,10 +893,7 @@
 
             // Delete the dataverse entry from the 'dataverse' dataset.
             // As a side effect, acquires an S lock on the 'dataverse' dataset on behalf of txnId.
-            ITupleReference searchKey = createTuple(database, dataverseName);
-            ITupleReference tuple =
-                    getTupleToBeDeleted(txnId, mdIndexesProvider.getDataverseEntity().getIndex(), searchKey);
-            deleteTupleFromIndex(txnId, mdIndexesProvider.getDataverseEntity().getIndex(), tuple);
+            forceDropDataverse(txnId, database, dataverseName);
         } catch (HyracksDataException e) {
             if (e.matches(ErrorCode.UPDATE_OR_DELETE_NON_EXISTENT_KEY)) {
                 throw new AsterixException(org.apache.asterix.common.exceptions.ErrorCode.UNKNOWN_DATAVERSE, e,
@@ -819,6 +904,14 @@
         }
     }
 
+    private void forceDropDataverse(TxnId txnId, String database, DataverseName dataverseName)
+            throws AlgebricksException, HyracksDataException {
+        ITupleReference searchKey = createTuple(database, dataverseName);
+        ITupleReference tuple =
+                getTupleToBeDeleted(txnId, mdIndexesProvider.getDataverseEntity().getIndex(), searchKey);
+        deleteTupleFromIndex(txnId, mdIndexesProvider.getDataverseEntity().getIndex(), tuple);
+    }
+
     @Override
     public boolean isDataverseNotEmpty(TxnId txnId, String database, DataverseName dataverseName)
             throws AlgebricksException {
@@ -1077,6 +1170,32 @@
         }
     }
 
+    private List<Dataverse> getDatabaseDataverses(TxnId txnId, String database) throws AlgebricksException {
+        try {
+            ITupleReference searchKey = createTuple(database);
+            DataverseTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDataverseTupleTranslator(false);
+            IValueExtractor<Dataverse> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
+            List<Dataverse> results = new ArrayList<>();
+            searchIndex(txnId, mdIndexesProvider.getDataverseEntity().getIndex(), searchKey, valueExtractor, results);
+            return results;
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
+    private List<Dataset> getDatabaseDatasets(TxnId txnId, String database) throws AlgebricksException {
+        try {
+            ITupleReference searchKey = createTuple(database);
+            DatasetTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDatasetTupleTranslator(false);
+            IValueExtractor<Dataset> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
+            List<Dataset> results = new ArrayList<>();
+            searchIndex(txnId, mdIndexesProvider.getDatasetEntity().getIndex(), searchKey, valueExtractor, results);
+            return results;
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
     @Override
     public List<Feed> getDataverseFeeds(TxnId txnId, String database, DataverseName dataverseName)
             throws AlgebricksException {
@@ -1092,6 +1211,19 @@
         }
     }
 
+    private List<Feed> getDatabaseFeeds(TxnId txnId, String database) throws AlgebricksException {
+        try {
+            ITupleReference searchKey = createTuple(database);
+            FeedTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getFeedTupleTranslator(false);
+            IValueExtractor<Feed> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
+            List<Feed> results = new ArrayList<>();
+            searchIndex(txnId, mdIndexesProvider.getFeedEntity().getIndex(), searchKey, valueExtractor, results);
+            return results;
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
     @Override
     public List<Library> getDataverseLibraries(TxnId txnId, String database, DataverseName dataverseName)
             throws AlgebricksException {
@@ -1107,6 +1239,19 @@
         }
     }
 
+    private List<Library> getDatabaseLibraries(TxnId txnId, String database) throws AlgebricksException {
+        try {
+            ITupleReference searchKey = createTuple(database);
+            LibraryTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getLibraryTupleTranslator(false);
+            IValueExtractor<Library> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
+            List<Library> results = new ArrayList<>();
+            searchIndex(txnId, mdIndexesProvider.getLibraryEntity().getIndex(), searchKey, valueExtractor, results);
+            return results;
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
     private List<Datatype> getDataverseDatatypes(TxnId txnId, String database, DataverseName dataverseName)
             throws AlgebricksException {
         try {
@@ -1122,6 +1267,20 @@
         }
     }
 
+    private List<Datatype> getDatabaseDatatypes(TxnId txnId, String database) throws AlgebricksException {
+        try {
+            ITupleReference searchKey = createTuple(database);
+            DatatypeTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getDataTypeTupleTranslator(txnId, this, false);
+            IValueExtractor<Datatype> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
+            List<Datatype> results = new ArrayList<>();
+            searchIndex(txnId, mdIndexesProvider.getDatatypeEntity().getIndex(), searchKey, valueExtractor, results);
+            return results;
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
     private List<FullTextConfigMetadataEntity> getDataverseFullTextConfigs(TxnId txnId, String database,
             DataverseName dataverseName) throws AlgebricksException {
         ITupleReference searchKey = createTuple(database, dataverseName);
@@ -1139,6 +1298,23 @@
         return results;
     }
 
+    private List<FullTextConfigMetadataEntity> getDatabaseFullTextConfigs(TxnId txnId, String database)
+            throws AlgebricksException {
+        ITupleReference searchKey = createTuple(database);
+        FullTextConfigMetadataEntityTupleTranslator tupleReaderWriter =
+                tupleTranslatorProvider.getFullTextConfigTupleTranslator(true);
+        IValueExtractor<FullTextConfigMetadataEntity> valueExtractor =
+                new MetadataEntityValueExtractor<>(tupleReaderWriter);
+        List<FullTextConfigMetadataEntity> results = new ArrayList<>();
+        try {
+            searchIndex(txnId, mdIndexesProvider.getFullTextConfigEntity().getIndex(), searchKey, valueExtractor,
+                    results);
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+        return results;
+    }
+
     private List<FullTextFilterMetadataEntity> getDataverseFullTextFilters(TxnId txnId, String database,
             DataverseName dataverseName) throws AlgebricksException {
         ITupleReference searchKey = createTuple(database, dataverseName);
@@ -1156,6 +1332,23 @@
         return results;
     }
 
+    private List<FullTextFilterMetadataEntity> getDatabaseFullTextFilters(TxnId txnId, String database)
+            throws AlgebricksException {
+        ITupleReference searchKey = createTuple(database);
+        FullTextFilterMetadataEntityTupleTranslator tupleReaderWriter =
+                tupleTranslatorProvider.getFullTextFilterTupleTranslator(true);
+        IValueExtractor<FullTextFilterMetadataEntity> valueExtractor =
+                new MetadataEntityValueExtractor<>(tupleReaderWriter);
+        List<FullTextFilterMetadataEntity> results = new ArrayList<>();
+        try {
+            searchIndex(txnId, mdIndexesProvider.getFullTextFilterEntity().getIndex(), searchKey, valueExtractor,
+                    results);
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+        return results;
+    }
+
     @Override
     public Dataset getDataset(TxnId txnId, String database, DataverseName dataverseName, String datasetName)
             throws AlgebricksException {
@@ -1230,6 +1423,84 @@
         }
     }
 
+    private void confirmDatabaseCanBeDeleted(TxnId txnId, String database) throws AlgebricksException {
+        // if a dataset from a DIFFERENT database uses a type from this database throw an error
+        List<Dataset> datasets = getAllDatasets(txnId);
+        for (Dataset otherDataset : datasets) {
+            if (otherDataset.getDatabaseName().equals(database)) {
+                continue;
+            }
+            if (otherDataset.getItemTypeDatabaseName().equals(database)) {
+                throw new AsterixException(
+                        org.apache.asterix.common.exceptions.ErrorCode.CANNOT_DROP_DATAVERSE_DEPENDENT_EXISTS, "type",
+                        TypeUtil.getFullyQualifiedDisplayName(otherDataset.getItemTypeDataverseName(),
+                                otherDataset.getItemTypeName()),
+                        dataset(), DatasetUtil.getFullyQualifiedDisplayName(otherDataset));
+            }
+            if (otherDataset.hasMetaPart() && otherDataset.getMetaItemTypeDatabaseName().equals(database)) {
+                throw new AsterixException(
+                        org.apache.asterix.common.exceptions.ErrorCode.CANNOT_DROP_DATAVERSE_DEPENDENT_EXISTS, "type",
+                        TypeUtil.getFullyQualifiedDisplayName(otherDataset.getMetaItemTypeDataverseName(),
+                                otherDataset.getMetaItemTypeName()),
+                        dataset(), DatasetUtil.getFullyQualifiedDisplayName(otherDataset));
+            }
+            if (otherDataset.getDatasetType() == DatasetType.VIEW) {
+                ViewDetails viewDetails = (ViewDetails) otherDataset.getDatasetDetails();
+                List<DependencyKind> dependenciesSchema = ViewDetails.DEPENDENCIES_SCHEMA;
+                List<List<DependencyFullyQualifiedName>> dependencies = viewDetails.getDependencies();
+                for (int i = 0, n = dependencies.size(); i < n; i++) {
+                    for (DependencyFullyQualifiedName dependency : dependencies.get(i)) {
+                        if (dependency.getDatabaseName().equals(database)) {
+                            DependencyKind dependencyKind = dependenciesSchema.get(i);
+                            throw new AsterixException(
+                                    org.apache.asterix.common.exceptions.ErrorCode.CANNOT_DROP_DATAVERSE_DEPENDENT_EXISTS,
+                                    dependencyKind, dependencyKind.getDependencyDisplayName(dependency), "view",
+                                    DatasetUtil.getFullyQualifiedDisplayName(otherDataset));
+                        }
+                    }
+                }
+            }
+        }
+
+        // if a function from a DIFFERENT database uses datasets, functions, datatypes, or synonyms from this database
+        // throw an error
+        List<Function> functions = getAllFunctions(txnId);
+        for (Function otherFunction : functions) {
+            if (otherFunction.getDatabaseName().equals(database)) {
+                continue;
+            }
+            List<DependencyKind> dependenciesSchema = Function.DEPENDENCIES_SCHEMA;
+            List<List<DependencyFullyQualifiedName>> dependencies = otherFunction.getDependencies();
+            for (int i = 0, n = dependencies.size(); i < n; i++) {
+                for (DependencyFullyQualifiedName dependency : dependencies.get(i)) {
+                    if (dependency.getDatabaseName().equals(database)) {
+                        DependencyKind dependencyKind = dependenciesSchema.get(i);
+                        throw new AsterixException(
+                                org.apache.asterix.common.exceptions.ErrorCode.CANNOT_DROP_DATAVERSE_DEPENDENT_EXISTS,
+                                dependencyKind, dependencyKind.getDependencyDisplayName(dependency), "function",
+                                otherFunction.getSignature());
+                    }
+                }
+            }
+        }
+
+        // if a feed connection from a DIFFERENT database applies a function from this database then throw an error
+        List<FeedConnection> feedConnections = getAllFeedConnections(txnId);
+        for (FeedConnection feedConnection : feedConnections) {
+            if (database.equals(feedConnection.getDatabaseName())) {
+                continue;
+            }
+            for (FunctionSignature functionSignature : feedConnection.getAppliedFunctions()) {
+                if (database.equals(functionSignature.getDatabaseName())) {
+                    throw new AsterixException(
+                            org.apache.asterix.common.exceptions.ErrorCode.CANNOT_DROP_DATAVERSE_DEPENDENT_EXISTS,
+                            "function", functionSignature, "feed connection", MetadataUtil.getFullyQualifiedDisplayName(
+                                    feedConnection.getDataverseName(), feedConnection.getFeedName()));
+                }
+            }
+        }
+    }
+
     private void confirmDataverseCanBeDeleted(TxnId txnId, String database, DataverseName dataverseName)
             throws AlgebricksException {
         // If a dataset from a DIFFERENT dataverse uses a type from this dataverse throw an error
@@ -1238,14 +1509,16 @@
             if (dataset.getDataverseName().equals(dataverseName) && dataset.getDatabaseName().equals(database)) {
                 continue;
             }
-            if (dataset.getItemTypeDataverseName().equals(dataverseName)) {
+            if (dataset.getItemTypeDataverseName().equals(dataverseName)
+                    && dataset.getItemTypeDatabaseName().equals(database)) {
                 throw new AsterixException(
                         org.apache.asterix.common.exceptions.ErrorCode.CANNOT_DROP_DATAVERSE_DEPENDENT_EXISTS, "type",
                         TypeUtil.getFullyQualifiedDisplayName(dataset.getItemTypeDataverseName(),
                                 dataset.getItemTypeName()),
                         dataset(), DatasetUtil.getFullyQualifiedDisplayName(dataset));
             }
-            if (dataset.hasMetaPart() && dataset.getMetaItemTypeDataverseName().equals(dataverseName)) {
+            if (dataset.hasMetaPart() && dataset.getMetaItemTypeDataverseName().equals(dataverseName)
+                    && dataset.getMetaItemTypeDatabaseName().equals(database)) {
                 throw new AsterixException(
                         org.apache.asterix.common.exceptions.ErrorCode.CANNOT_DROP_DATAVERSE_DEPENDENT_EXISTS, "type",
                         TypeUtil.getFullyQualifiedDisplayName(dataset.getMetaItemTypeDataverseName(),
@@ -1258,7 +1531,8 @@
                 List<List<DependencyFullyQualifiedName>> dependencies = viewDetails.getDependencies();
                 for (int i = 0, n = dependencies.size(); i < n; i++) {
                     for (DependencyFullyQualifiedName dependency : dependencies.get(i)) {
-                        if (dependency.getDataverseName().equals(dataverseName)) {
+                        if (dependency.getDataverseName().equals(dataverseName)
+                                && dependency.getDatabaseName().equals(database)) {
                             DependencyKind dependencyKind = dependenciesSchema.get(i);
                             throw new AsterixException(
                                     org.apache.asterix.common.exceptions.ErrorCode.CANNOT_DROP_DATAVERSE_DEPENDENT_EXISTS,
@@ -1274,14 +1548,15 @@
         // throw an error
         List<Function> functions = getAllFunctions(txnId);
         for (Function function : functions) {
-            if (function.getDataverseName().equals(dataverseName)) {
+            if (function.getDataverseName().equals(dataverseName) && function.getDatabaseName().equals(database)) {
                 continue;
             }
             List<DependencyKind> dependenciesSchema = Function.DEPENDENCIES_SCHEMA;
             List<List<DependencyFullyQualifiedName>> dependencies = function.getDependencies();
             for (int i = 0, n = dependencies.size(); i < n; i++) {
                 for (DependencyFullyQualifiedName dependency : dependencies.get(i)) {
-                    if (dependency.getDataverseName().equals(dataverseName)) {
+                    if (dependency.getDataverseName().equals(dataverseName)
+                            && dependency.getDatabaseName().equals(database)) {
                         DependencyKind dependencyKind = dependenciesSchema.get(i);
                         throw new AsterixException(
                                 org.apache.asterix.common.exceptions.ErrorCode.CANNOT_DROP_DATAVERSE_DEPENDENT_EXISTS,
@@ -1295,11 +1570,13 @@
         // If a feed connection from a DIFFERENT dataverse applies a function from this dataverse then throw an error
         List<FeedConnection> feedConnections = getAllFeedConnections(txnId);
         for (FeedConnection feedConnection : feedConnections) {
-            if (dataverseName.equals(feedConnection.getDataverseName())) {
+            if (dataverseName.equals(feedConnection.getDataverseName())
+                    && database.equals(feedConnection.getDatabaseName())) {
                 continue;
             }
             for (FunctionSignature functionSignature : feedConnection.getAppliedFunctions()) {
-                if (dataverseName.equals(functionSignature.getDataverseName())) {
+                if (dataverseName.equals(functionSignature.getDataverseName())
+                        && database.equals(functionSignature.getDatabaseName())) {
                     throw new AsterixException(
                             org.apache.asterix.common.exceptions.ErrorCode.CANNOT_DROP_DATAVERSE_DEPENDENT_EXISTS,
                             "function", functionSignature, "feed connection", MetadataUtil.getFullyQualifiedDisplayName(
@@ -1362,6 +1639,7 @@
                 if (functionObjectDependencies != null) {
                     for (DependencyFullyQualifiedName dependency : functionObjectDependencies) {
                         if (dependency.getDataverseName().equals(dataverseName)
+                                && dependency.getDatabaseName().equals(database)
                                 && dependency.getSubName1().equals(objectName)
                                 && (objectArg == null || objectArg.equals(dependency.getSubName2()))) {
                             throw new AsterixException(
@@ -1400,6 +1678,7 @@
                     if (viewObjectDependencies != null) {
                         for (DependencyFullyQualifiedName dependency : viewObjectDependencies) {
                             if (dependency.getDataverseName().equals(dataverseName)
+                                    && dependency.getDatabaseName().equals(database)
                                     && dependency.getSubName1().equals(objectName)
                                     && (objectArg == null || objectArg.equals(dependency.getSubName2()))) {
                                 throw new AsterixException(
@@ -1434,7 +1713,8 @@
                 if (Index.IndexCategory.of(index.getIndexType()) == Index.IndexCategory.TEXT) {
                     String indexConfigName = ((Index.TextIndexDetails) index.getIndexDetails()).getFullTextConfigName();
                     if (index.getDataverseName().equals(dataverseNameFullTextConfig)
-                            && !Strings.isNullOrEmpty(indexConfigName) && indexConfigName.equals(configName)) {
+                            && index.getDatabaseName().equals(database) && !Strings.isNullOrEmpty(indexConfigName)
+                            && indexConfigName.equals(configName)) {
                         throw new AsterixException(
                                 org.apache.asterix.common.exceptions.ErrorCode.CANNOT_DROP_OBJECT_DEPENDENT_EXISTS,
                                 "full-text config",
@@ -1514,8 +1794,10 @@
         for (Dataset dataset : datasets) {
             if ((dataset.getItemTypeName().equals(datatypeName)
                     && dataset.getItemTypeDataverseName().equals(dataverseName))
+                    && dataset.getItemTypeDatabaseName().equals(database)
                     || ((dataset.hasMetaPart() && dataset.getMetaItemTypeName().equals(datatypeName)
-                            && dataset.getMetaItemTypeDataverseName().equals(dataverseName)))) {
+                            && dataset.getMetaItemTypeDataverseName().equals(dataverseName)
+                            && dataset.getMetaItemTypeDatabaseName().equals(database)))) {
                 throw new AsterixException(
                         org.apache.asterix.common.exceptions.ErrorCode.CANNOT_DROP_OBJECT_DEPENDENT_EXISTS, "type",
                         TypeUtil.getFullyQualifiedDisplayName(dataverseName, datatypeName), dataset(),
@@ -1539,8 +1821,9 @@
         IAType typeToBeDropped = dataTypeToBeDropped.getDatatype();
         List<Datatype> datatypes = getAllDatatypes(txnId);
         for (Datatype dataType : datatypes) {
+
             // skip types in different dataverses as well as the type to be dropped itself
-            if (!dataType.getDataverseName().equals(dataverseName)
+            if (!dataType.getDataverseName().equals(dataverseName) || !dataType.getDatabaseName().equals(database)
                     || dataType.getDatatype().getTypeName().equals(datatypeName)) {
                 continue;
             }
@@ -1706,6 +1989,10 @@
         return getFunctionsImpl(txnId, createTuple(database, dataverseName));
     }
 
+    private List<Function> getDatabaseFunctions(TxnId txnId, String database) throws AlgebricksException {
+        return getFunctionsImpl(txnId, createTuple(database));
+    }
+
     private List<Function> getFunctionsImpl(TxnId txnId, ITupleReference searchKey) throws AlgebricksException {
         try {
             FunctionTupleTranslator tupleReaderWriter =
@@ -2123,6 +2410,21 @@
         }
     }
 
+    private List<DatasourceAdapter> getDatabaseAdapters(TxnId txnId, String database) throws AlgebricksException {
+        try {
+            ITupleReference searchKey = createTuple(database);
+            DatasourceAdapterTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getAdapterTupleTranslator(false);
+            IValueExtractor<DatasourceAdapter> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
+            List<DatasourceAdapter> results = new ArrayList<>();
+            searchIndex(txnId, mdIndexesProvider.getDatasourceAdapterEntity().getIndex(), searchKey, valueExtractor,
+                    results);
+            return results;
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
     @Override
     public void addLibrary(TxnId txnId, Library library) throws AlgebricksException {
         try {
@@ -2398,6 +2700,19 @@
         }
     }
 
+    private List<FeedPolicyEntity> getDatabaseFeedPolicies(TxnId txnId, String database) throws AlgebricksException {
+        try {
+            ITupleReference searchKey = createTuple(database);
+            FeedPolicyTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getFeedPolicyTupleTranslator(false);
+            IValueExtractor<FeedPolicyEntity> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
+            List<FeedPolicyEntity> results = new ArrayList<>();
+            searchIndex(txnId, mdIndexesProvider.getFeedPolicyEntity().getIndex(), searchKey, valueExtractor, results);
+            return results;
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
     @Override
     public void addExternalFile(TxnId txnId, ExternalFile externalFile) throws AlgebricksException {
         try {
@@ -2619,6 +2934,19 @@
         }
     }
 
+    private List<Synonym> getDatabaseSynonyms(TxnId txnId, String database) throws AlgebricksException {
+        try {
+            ITupleReference searchKey = createTuple(database);
+            SynonymTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getSynonymTupleTranslator(false);
+            IValueExtractor<Synonym> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
+            List<Synonym> results = new ArrayList<>();
+            searchIndex(txnId, mdIndexesProvider.getSynonymEntity().getIndex(), searchKey, valueExtractor, results);
+            return results;
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
     @Override
     public void updateDataset(TxnId txnId, Dataset dataset) throws AlgebricksException {
         try {