[NO ISSUE][COMP] Refactor drop dataverse and drop function

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Refactor feed handling when processing drop dataverse
  and drop function
- Move feed dependency checking from QueryTranslator
  to MetadataNode to align with other entities

Change-Id: I493dfffd77c596bb3485eccb00b417d40f47b647
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7066
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Xikui Wang <xkkwww@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index a7df432..b3b55a8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -1443,6 +1443,8 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        List<FeedEventsListener> feedsToStop = new ArrayList<>();
+        List<Dataset> externalDatasetsToDeregister = new ArrayList<>();
         List<JobSpecification> jobsToExecute = new ArrayList<>();
         try {
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
@@ -1454,19 +1456,8 @@
                     throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName);
                 }
             }
-            // # check whether any function in current dataverse is being used by others
-            List<Function> functionsInDataverse =
-                    MetadataManager.INSTANCE.getDataverseFunctions(mdTxnCtx, dataverseName);
-            for (Function function : functionsInDataverse) {
-                if (isFunctionUsed(mdTxnCtx, function.getSignature(), dataverseName)) {
-                    throw new MetadataException(ErrorCode.METADATA_DROP_FUCTION_IN_USE, sourceLoc,
-                            function.getDataverseName() + "." + function.getName() + "@" + function.getArity());
-                }
-            }
 
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-            // # disconnect all feeds from any datasets in the dataverse.
+            // #. prepare jobs which will drop corresponding feed storage
             ActiveNotificationHandler activeEventHandler =
                     (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
             IActiveEntityEventsListener[] activeListeners = activeEventHandler.getEventListeners();
@@ -1474,47 +1465,34 @@
                 EntityId activeEntityId = listener.getEntityId();
                 if (activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME)
                         && activeEntityId.getDataverseName().equals(dataverseName)) {
-                    if (listener.getState() != ActivityState.STOPPED) {
-                        ((ActiveEntityEventsListener) listener).stop(metadataProvider);
-                    }
                     FeedEventsListener feedListener = (FeedEventsListener) listener;
-                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                    bActiveTxn = true;
-                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                    doDropFeed(hcc, metadataProvider, feedListener.getFeed(), sourceLoc);
-                    MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
-                    bActiveTxn = false;
+                    feedsToStop.add(feedListener);
+                    jobsToExecute
+                            .add(FeedOperations.buildRemoveFeedStorageJob(metadataProvider, feedListener.getFeed()));
                 }
             }
-            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            bActiveTxn = true;
-            metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
             // #. prepare jobs which will drop corresponding datasets with indexes.
             List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
             for (Dataset dataset : datasets) {
                 String datasetName = dataset.getDatasetName();
+                List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                 DatasetType dsType = dataset.getDatasetType();
                 if (dsType == DatasetType.INTERNAL) {
-                    List<Index> indexes =
-                            MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                     for (Index index : indexes) {
                         jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset, sourceLoc));
                     }
-                } else {
-                    // External dataset
-                    List<Index> indexes =
-                            MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
-                    for (int k = 0; k < indexes.size(); k++) {
-                        if (ExternalIndexingOperations.isFileIndex(indexes.get(k))) {
+                } else if (dsType == DatasetType.EXTERNAL) {
+                    for (Index index : indexes) {
+                        if (ExternalIndexingOperations.isFileIndex(index)) {
                             jobsToExecute.add(
                                     ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, dataset));
                         } else {
-                            jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider, dataset,
-                                    sourceLoc));
+                            jobsToExecute
+                                    .add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset, sourceLoc));
                         }
                     }
-                    ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(dataset);
+                    externalDatasetsToDeregister.add(dataset);
                 }
             }
 
@@ -1529,8 +1507,8 @@
 
             // #. mark PendingDropOp on the dataverse record by
             // first, deleting the dataverse record from the DATAVERSE_DATASET
-            // second, inserting the dataverse record with the PendingDropOp value into the
-            // DATAVERSE_DATASET
+            // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
+            // Note: the delete operation fails if the dataverse cannot be deleted due to metadata dependencies
             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
             MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
                     new Dataverse(dataverseName, dv.getDataFormat(), MetadataUtil.PENDING_DROP_OP));
@@ -1539,6 +1517,17 @@
             bActiveTxn = false;
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
+            for (Dataset externalDataset : externalDatasetsToDeregister) {
+                ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(externalDataset);
+            }
+
+            for (FeedEventsListener feedListener : feedsToStop) {
+                if (feedListener.getState() != ActivityState.STOPPED) {
+                    feedListener.stop(metadataProvider);
+                }
+                feedListener.unregister();
+            }
+
             for (JobSpecification jobSpec : jobsToExecute) {
                 runJob(hcc, jobSpec);
             }
@@ -2159,27 +2148,6 @@
         }
     }
 
-    protected boolean isFunctionUsed(MetadataTransactionContext ctx, FunctionSignature signature,
-            DataverseName currentDataverse) throws AlgebricksException {
-        List<Dataverse> allDataverses = MetadataManager.INSTANCE.getDataverses(ctx);
-        for (Dataverse dataverse : allDataverses) {
-            if (dataverse.getDataverseName().equals(currentDataverse)) {
-                continue;
-            }
-            List<Feed> feeds = MetadataManager.INSTANCE.getFeeds(ctx, dataverse.getDataverseName());
-            for (Feed feed : feeds) {
-                List<FeedConnection> feedConnections = MetadataManager.INSTANCE.getFeedConections(ctx,
-                        dataverse.getDataverseName(), feed.getFeedName());
-                for (FeedConnection conn : feedConnections) {
-                    if (conn.containsFunction(signature)) {
-                        return true;
-                    }
-                }
-            }
-        }
-        return false;
-    }
-
     protected void handleFunctionDropStatement(MetadataProvider metadataProvider, Statement stmt) throws Exception {
         FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
         FunctionSignature signature = stmtDropFunction.getFunctionSignature();
@@ -2200,15 +2168,12 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
             Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
-            // If function == null && stmtDropFunction.getIfExists() == true, commit txn directly.
-            if (function == null && !stmtDropFunction.getIfExists()) {
-                throw new CompilationException(ErrorCode.UNKNOWN_FUNCTION, sourceLoc, signature);
-            } else if (function != null) {
-                if (isFunctionUsed(mdTxnCtx, signature, null)) {
-                    throw new MetadataException(ErrorCode.METADATA_DROP_FUCTION_IN_USE, sourceLoc, signature);
-                } else {
-                    MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
+            if (function == null) {
+                if (!stmtDropFunction.getIfExists()) {
+                    throw new CompilationException(ErrorCode.UNKNOWN_FUNCTION, sourceLoc, signature);
                 }
+            } else {
+                MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 1b9c610..4b2a209 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -12454,7 +12454,8 @@
     <test-case FilePath="feeds">
       <compilation-unit name="drop-function-used-by-feed">
         <output-dir compare="Text">drop-function-used-by-feed</output-dir>
-        <expected-error>Function experiments.test_func0@1 is being used. It cannot be dropped</expected-error>
+        <expected-error>Cannot drop function experiments.test_func0@1 being used by feed connection TwitterUsers.UserFeed</expected-error>
+        <source-location>false</source-location>
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
@@ -12465,7 +12466,8 @@
     <test-case FilePath="feeds">
       <compilation-unit name="drop-dataverse-with-function-used-by-feed">
         <output-dir compare="Text">drop-dataverse-with-function-used-by-feed</output-dir>
-        <expected-error>Function fundv.test_func0@1 is being used. It cannot be dropped</expected-error>
+        <expected-error>Cannot drop dataverse. Feed connection feeddv.UserFeed depends on function fundv.test_func0@1</expected-error>
+        <source-location>false</source-location>
       </compilation-unit>
     </test-case>
     <test-case FilePath="feeds">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 9cf6f9d..b277d8f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -315,7 +315,6 @@
     public static final int ACTIVE_RUNTIME_IS_ALREADY_REGISTERED = 3105;
     public static final int ACTIVE_RUNTIME_IS_NOT_REGISTERED = 3106;
     public static final int ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED = 3107;
-    public static final int METADATA_DROP_FUCTION_IN_USE = 3109;
     public static final int FEED_FAILED_WHILE_GETTING_A_NEW_RECORD = 3110;
     public static final int FEED_START_FEED_WITHOUT_CONNECTION = 3111;
     public static final int PARSER_COLLECTION_ITEM_CANNOT_BE_NULL = 3112;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 1cf653f..426c592 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -313,7 +313,6 @@
 3105 = %1$s is already registered
 3106 = %1$s is not registered
 3107 = Active Notification Handler is already suspended
-3109 = Function %1$s is being used. It cannot be dropped
 3110 = Feed failed while reading a new record
 3111 = Feed %1$s is not connected to any dataset
 3112 = Array/Multiset item cannot be null
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 6fd6376..d5d181f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -914,6 +914,19 @@
         }
     }
 
+    public List<FeedConnection> getAllFeedConnections(TxnId txnId) throws AlgebricksException {
+        try {
+            FeedConnectionTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getFeedConnectionTupleTranslator(false);
+            List<FeedConnection> results = new ArrayList<>();
+            IValueExtractor<FeedConnection> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
+            searchIndex(txnId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET, null, valueExtractor, results);
+            return results;
+        } catch (HyracksDataException e) {
+            throw new AlgebricksException(e);
+        }
+    }
+
     private void confirmDataverseCanBeDeleted(TxnId txnId, DataverseName dataverseName) throws AlgebricksException {
         // If a dataset from a DIFFERENT dataverse
         // uses a type from this dataverse
@@ -928,10 +941,16 @@
                         "Cannot drop dataverse. Type " + dataverseName + "." + set.getItemTypeName()
                                 + " used by dataset " + set.getDataverseName() + "." + set.getDatasetName());
             }
+            if (set.getMetaItemTypeDataverseName() != null
+                    && set.getMetaItemTypeDataverseName().equals(dataverseName)) {
+                throw new AlgebricksException(
+                        "Cannot drop dataverse. Type " + dataverseName + "." + set.getMetaItemTypeName()
+                                + " used by dataset " + set.getDataverseName() + "." + set.getDatasetName());
+            }
         }
 
         // If a function from a DIFFERENT dataverse
-        // uses functions or datatypes from this dataverse
+        // uses datasets, functions or datatypes from this dataverse
         // throw an error
         List<Function> functions = getAllFunctions(txnId);
         for (Function function : functions) {
@@ -961,6 +980,23 @@
                 }
             }
         }
+
+        // If a feed connection from a DIFFERENT dataverse applies
+        // a function from this dataverse then throw an error
+        List<FeedConnection> feedConnections = getAllFeedConnections(txnId);
+        for (FeedConnection feedConnection : feedConnections) {
+            if (dataverseName.equals(feedConnection.getDataverseName())) {
+                continue;
+            }
+            for (FunctionSignature functionSignature : feedConnection.getAppliedFunctions()) {
+                if (dataverseName.equals(functionSignature.getDataverseName())) {
+                    throw new AlgebricksException("Cannot drop dataverse. Feed connection "
+                            + feedConnection.getDataverseName() + "." + feedConnection.getFeedName()
+                            + " depends on function " + functionSignature.getDataverseName() + "."
+                            + functionSignature.getName() + "@" + functionSignature.getArity());
+                }
+            }
+        }
     }
 
     private void confirmFunctionCanBeDeleted(TxnId txnId, FunctionSignature signature) throws AlgebricksException {
@@ -976,6 +1012,15 @@
                 }
             }
         }
+
+        // if any other feed connection uses this function, throw an error
+        List<FeedConnection> feedConnections = getAllFeedConnections(txnId);
+        for (FeedConnection feedConnection : feedConnections) {
+            if (feedConnection.containsFunction(signature)) {
+                throw new AlgebricksException("Cannot drop function " + signature + " being used by feed connection "
+                        + feedConnection.getDatasetName() + "." + feedConnection.getFeedName());
+            }
+        }
     }
 
     private void confirmDatasetCanBeDeleted(TxnId txnId, DataverseName dataverseName, String datasetName)