[ASTERIXDB-3259][MTD] Prepare steps for dropping database artifacts

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
The drop jobs will be changed to operate at the database level.

Change-Id: Iec9b559b6de09fa191d7b54a759c05a874d15913
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17841
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 024cb17..89161b9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -1927,6 +1927,7 @@
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         List<FeedEventsListener> feedsToStop = new ArrayList<>();
         List<JobSpecification> jobsToExecute = new ArrayList<>();
+        //TODO(DB): resolve database directory
         try {
             Database database = MetadataManager.INSTANCE.getDatabase(mdTxnCtx, databaseName);
             if (database == null) {
@@ -1941,16 +1942,49 @@
             validateDatabaseStateBeforeDrop(metadataProvider, database, sourceLoc);
 
             // #. prepare jobs which will drop corresponding feed storage
-            //TODO(DB):
+            ActiveNotificationHandler activeEventHandler =
+                    (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
+            IActiveEntityEventsListener[] activeListeners = activeEventHandler.getEventListeners();
+            for (IActiveEntityEventsListener listener : activeListeners) {
+                EntityId activeEntityId = listener.getEntityId();
+                if (activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME)
+                        && activeEntityId.getDatabaseName().equals(databaseName)) {
+                    FeedEventsListener feedListener = (FeedEventsListener) listener;
+                    feedsToStop.add(feedListener);
+                    jobsToExecute
+                            .add(FeedOperations.buildRemoveFeedStorageJob(metadataProvider, feedListener.getFeed()));
+                }
+            }
 
             // #. prepare jobs which will drop corresponding datasets with indexes
-            //TODO(DB):
+            List<Dataset> datasets = MetadataManager.INSTANCE.getDatabaseDatasets(mdTxnCtx, databaseName);
+            for (Dataset dataset : datasets) {
+                String datasetName = dataset.getDatasetName();
+                DatasetType dsType = dataset.getDatasetType();
+                switch (dsType) {
+                    case INTERNAL:
+                        List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, databaseName,
+                                dataset.getDataverseName(), datasetName);
+                        for (Index index : indexes) {
+                            jobsToExecute
+                                    .add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset, sourceLoc));
+                        }
+                        break;
+                    case EXTERNAL:
+                    case VIEW:
+                        break;
+                }
+            }
 
             // #. prepare jobs which will drop corresponding libraries
-            //TODO(DB):
+            List<Library> libraries = MetadataManager.INSTANCE.getDatabaseLibraries(mdTxnCtx, databaseName);
+            for (Library library : libraries) {
+                jobsToExecute.add(ExternalLibraryJobUtils.buildDropLibraryJobSpec(library.getDataverseName(),
+                        library.getName(), metadataProvider));
+            }
 
             // #. prepare jobs which will drop the database
-            //TODO(DB):
+            jobsToExecute.add(DataverseUtil.dropDatabaseJobSpec(databaseName, metadataProvider));
 
             // #. mark PendingDropOp on the database record by
             // first, deleting the database record from the 'Database' collection
@@ -1983,11 +2017,20 @@
             MetadataManager.INSTANCE.dropDatabase(mdTxnCtx, databaseName);
 
             // drop all node groups that no longer needed
-            //TODO(DB):
+            for (Dataset dataset : datasets) {
+                String nodeGroup = dataset.getNodeGroupName();
+                lockManager.acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup);
+                if (MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroup) != null) {
+                    MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodeGroup, true);
+                }
+            }
 
             //TODO(DB): switch active database to the DEFAULT if the dropped database is the currently active one
+            if (activeDataverse.getDatabaseName().equals(databaseName)) {
+                activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
+            }
 
-            //TODO(DB): validateDatabaseDatasetsStateAfterDrop
+            validateDatasetsStateAfterNamespaceDrop(metadataProvider, mdTxnCtx, datasets);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             return true;
@@ -1997,7 +2040,9 @@
             }
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //TODO(DB): switch active database to the DEFAULT if the dropped database is the currently active one
+                if (activeDataverse.getDatabaseName().equals(databaseName)) {
+                    activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
+                }
 
                 // #. execute compensation operations
                 // remove the all artifacts in NC
@@ -2018,7 +2063,7 @@
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent state: pending dataverse(" + databaseName
+                    throw new IllegalStateException("System is inconsistent state: pending database(" + databaseName
                             + ") couldn't be removed from the metadata", e);
                 }
             }
@@ -2165,7 +2210,7 @@
                 activeDataverse = MetadataBuiltinEntities.DEFAULT_DATAVERSE;
             }
 
-            validateDataverseDatasetsStateAfterDrop(metadataProvider, mdTxnCtx, datasets);
+            validateDatasetsStateAfterNamespaceDrop(metadataProvider, mdTxnCtx, datasets);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             return true;
         } catch (Exception e) {
@@ -2220,9 +2265,9 @@
         // may be overridden by product extensions for additional checks before dropping the dataverse
     }
 
-    protected void validateDataverseDatasetsStateAfterDrop(MetadataProvider metadataProvider,
+    protected void validateDatasetsStateAfterNamespaceDrop(MetadataProvider metadataProvider,
             MetadataTransactionContext mdTxnCtx, List<Dataset> datasets) throws AlgebricksException {
-        // may be overridden by product extensions for additional checks after dropping the dataverse
+        // may be overridden by product extensions for additional checks after dropping a database/dataverse
     }
 
     public void handleDatasetDropStatement(MetadataProvider metadataProvider, Statement stmt,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
index 61b526c..3b7cb00 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
@@ -41,4 +41,15 @@
         jobSpec.addRoot(frod);
         return jobSpec;
     }
+
+    public static JobSpecification dropDatabaseJobSpec(String database, MetadataProvider metadata) {
+        JobSpecification jobSpec = RuntimeUtils.createJobSpecification(metadata.getApplicationContext());
+        PartitioningProperties partitioningProperties = metadata.splitAndConstraints(database);
+        FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec,
+                partitioningProperties.getSplitsProvider(), false, partitioningProperties.getComputeStorageMap());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod,
+                partitioningProperties.getConstraints());
+        jobSpec.addRoot(frod);
+        return jobSpec;
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 040b489..ed3474d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -301,6 +301,21 @@
     }
 
     @Override
+    public List<Dataset> getDatabaseDatasets(MetadataTransactionContext ctx, String database)
+            throws AlgebricksException {
+        List<Dataset> databaseDatasets;
+        try {
+            Objects.requireNonNull(database);
+            // assuming that the transaction can read its own writes on the metadata node
+            databaseDatasets = metadataNode.getDatabaseDatasets(ctx.getTxnId(), database);
+        } catch (RemoteException e) {
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
+        }
+        // don't update the cache to avoid checking against the transaction's uncommitted datasets
+        return databaseDatasets;
+    }
+
+    @Override
     public List<Dataset> getDataverseDatasets(MetadataTransactionContext ctx, String database,
             DataverseName dataverseName) throws AlgebricksException {
         List<Dataset> dataverseDatasets;
@@ -903,6 +918,21 @@
     }
 
     @Override
+    public List<Library> getDatabaseLibraries(MetadataTransactionContext ctx, String database)
+            throws AlgebricksException {
+        List<Library> databaseLibraries;
+        try {
+            // assuming that the transaction can read its own writes on the metadata node
+            Objects.requireNonNull(database);
+            databaseLibraries = metadataNode.getDatabaseLibraries(ctx.getTxnId(), database);
+        } catch (RemoteException e) {
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
+        }
+        // don't update the cache to avoid checking against the transaction's uncommitted functions
+        return databaseLibraries;
+    }
+
+    @Override
     public List<Library> getDataverseLibraries(MetadataTransactionContext ctx, String database,
             DataverseName dataverseName) throws AlgebricksException {
         List<Library> dataverseLibaries;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 3871ffb..cc1251b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -1183,7 +1183,8 @@
         }
     }
 
-    private List<Dataset> getDatabaseDatasets(TxnId txnId, String database) throws AlgebricksException {
+    @Override
+    public List<Dataset> getDatabaseDatasets(TxnId txnId, String database) throws AlgebricksException {
         try {
             ITupleReference searchKey = createTuple(database);
             DatasetTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDatasetTupleTranslator(false);
@@ -1239,7 +1240,8 @@
         }
     }
 
-    private List<Library> getDatabaseLibraries(TxnId txnId, String database) throws AlgebricksException {
+    @Override
+    public List<Library> getDatabaseLibraries(TxnId txnId, String database) throws AlgebricksException {
         try {
             ITupleReference searchKey = createTuple(database);
             LibraryTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getLibraryTupleTranslator(false);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index 226dd70..f3cc811 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -132,6 +132,8 @@
     Dataverse getDataverse(MetadataTransactionContext ctx, String database, DataverseName dataverseName)
             throws AlgebricksException;
 
+    List<Dataset> getDatabaseDatasets(MetadataTransactionContext ctx, String database) throws AlgebricksException;
+
     /**
      * Retrieves all datasets belonging to the given dataverse.
      *
@@ -671,6 +673,8 @@
     Library getLibrary(MetadataTransactionContext ctx, String database, DataverseName dataverseName, String libraryName)
             throws AlgebricksException, RemoteException;
 
+    List<Library> getDatabaseLibraries(MetadataTransactionContext ctx, String database) throws AlgebricksException;
+
     /**
      * Retireve libraries installed in a given dataverse.
      *
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index b738533..a4f2ca7 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -143,6 +143,8 @@
      */
     List<Dataverse> getDataverses(TxnId txnId) throws AlgebricksException, RemoteException;
 
+    List<Dataset> getDatabaseDatasets(TxnId txnId, String database) throws AlgebricksException, RemoteException;
+
     /**
      * Retrieves all datasets belonging to the given dataverse, acquiring local
      * locks on behalf of the given transaction id.
@@ -714,6 +716,8 @@
     Library getLibrary(TxnId txnId, String database, DataverseName dataverseName, String libraryName)
             throws AlgebricksException, RemoteException;
 
+    List<Library> getDatabaseLibraries(TxnId txnId, String database) throws AlgebricksException, RemoteException;
+
     /**
      * Retireve libraries installed in a given dataverse.
      *
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f2dcde4..0ecef7d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -955,6 +955,10 @@
                 numKeyFields / 2);
     }
 
+    public PartitioningProperties splitAndConstraints(String databaseName) {
+        return dataPartitioningProvider.getPartitioningProperties(databaseName);
+    }
+
     public PartitioningProperties splitAndConstraints(DataverseName dataverseName) {
         return dataPartitioningProvider.getPartitioningProperties(dataverseName);
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
index d763430..d99aa13 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
@@ -68,6 +68,8 @@
         }
     }
 
+    public abstract PartitioningProperties getPartitioningProperties(String databaseName);
+
     public abstract PartitioningProperties getPartitioningProperties(DataverseName dataverseName);
 
     public abstract PartitioningProperties getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
index 95dae4a..803a53b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
@@ -37,6 +37,14 @@
     }
 
     @Override
+    public PartitioningProperties getPartitioningProperties(String databaseName) {
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = SplitsAndConstraintsUtil
+                .getDatabaseSplitProviderAndConstraints(appCtx.getClusterStateManager(), databaseName);
+        int[][] partitionsMap = getOneToOnePartitionsMap(getLocationsCount(splitsAndConstraints.second));
+        return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap);
+    }
+
+    @Override
     public PartitioningProperties getPartitioningProperties(DataverseName dataverseName) {
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = SplitsAndConstraintsUtil
                 .getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), dataverseName);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index a83b8dc..2abb4f6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -45,6 +45,17 @@
     private SplitsAndConstraintsUtil() {
     }
 
+    private static FileSplit[] getDatabaseSplits(IClusterStateManager clusterStateManager, String databaseName) {
+        List<FileSplit> splits = new ArrayList<>();
+        // get all partitions
+        for (ClusterPartition clusterPartition : clusterStateManager.getClusterPartitons()) {
+            File f = new File(StoragePathUtil.prepareStoragePartitionPath(clusterPartition.getPartitionId()),
+                    databaseName);
+            splits.add(StoragePathUtil.getFileSplitForClusterPartition(clusterPartition, f.getPath()));
+        }
+        return splits.toArray(new FileSplit[] {});
+    }
+
     private static FileSplit[] getDataverseSplits(IClusterStateManager clusterStateManager,
             DataverseName dataverseName) {
         List<FileSplit> splits = new ArrayList<>();
@@ -84,6 +95,12 @@
         return splits.toArray(new FileSplit[] {});
     }
 
+    public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getDatabaseSplitProviderAndConstraints(
+            IClusterStateManager clusterStateManager, String databaseName) {
+        FileSplit[] splits = getDatabaseSplits(clusterStateManager, databaseName);
+        return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+    }
+
     public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getDataverseSplitProviderAndConstraints(
             IClusterStateManager clusterStateManager, DataverseName dataverseName) {
         FileSplit[] splits = getDataverseSplits(clusterStateManager, dataverseName);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
index ffcdc57..f8f967a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
@@ -49,6 +49,15 @@
     }
 
     @Override
+    public PartitioningProperties getPartitioningProperties(String databaseName) {
+        SplitComputeLocations dataverseSplits = getSplits(databaseName);
+        StorageComputePartitionsMap partitionMap = clusterStateManager.getStorageComputeMap();
+        int[][] partitionsMap = partitionMap.getComputeToStorageMap(false);
+        return PartitioningProperties.of(dataverseSplits.getSplitsProvider(), dataverseSplits.getConstraints(),
+                partitionsMap);
+    }
+
+    @Override
     public PartitioningProperties getPartitioningProperties(DataverseName dataverseName) {
         SplitComputeLocations dataverseSplits = getDataverseSplits(dataverseName);
         StorageComputePartitionsMap partitionMap = clusterStateManager.getStorageComputeMap();
@@ -69,13 +78,16 @@
     }
 
     private SplitComputeLocations getDataverseSplits(DataverseName dataverseName) {
+        return getSplits(StoragePathUtil.prepareDataverseName(dataverseName));
+    }
+
+    private SplitComputeLocations getSplits(String subPath) {
         List<FileSplit> splits = new ArrayList<>();
         List<String> locations = new ArrayList<>();
         Set<Integer> uniqueLocations = new HashSet<>();
         StorageComputePartitionsMap partitionMap = clusterStateManager.getStorageComputeMap();
         for (int i = 0; i < storagePartitionsCounts; i++) {
-            File f = new File(StoragePathUtil.prepareStoragePartitionPath(i),
-                    StoragePathUtil.prepareDataverseName(dataverseName));
+            File f = new File(StoragePathUtil.prepareStoragePartitionPath(i), subPath);
             ComputePartition computePartition = partitionMap.getComputePartition(i);
             splits.add(new MappedFileSplit(computePartition.getNodeId(), f.getPath(), 0));
             if (!uniqueLocations.contains(computePartition.getId())) {