[NO ISSUE][COMP] Support dropping dataverse only if it's empty

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Add ifEmpty flag to DataverseDropStatement to indicate that
  the dataverse must be dropped only if it does not contain any
  objects. This flag is currently not supported in the grammar,
  but may be used by product extensions.
- Add validateDataverseStateBeforeDrop() method that is called
  before a dataverse is about to be dropped. It may be used
  by product extensions to perform additional checks before
  dropping the dataverse.
- Refactor handleCreateIndexStatement() to align with other
  handle*() methods (acquires locks, then calls doCreateIndex()).
- Ensure that all doDrop*() methods return a boolean value
  indicating whether the entity was actually dropped.

Change-Id: I86aca8ab02cf1bf0e0600237d3e2f71293166b08
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7204
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b2477a4..39d9c48d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -512,7 +512,7 @@
         DataverseName dvName = stmtCreateDataverse.getDataverseName();
         lockUtil.createDataverseBegin(lockManager, metadataProvider.getLocks(), dvName);
         try {
-            doCreateDataverseStatement(metadataProvider, stmtCreateDataverse);
+            doCreateDataverseStatement(metadataProvider, stmtCreateDataverse, requestParameters);
         } finally {
             metadataProvider.getLocks().unlock();
         }
@@ -520,7 +520,7 @@
 
     @SuppressWarnings("squid:S00112")
     protected boolean doCreateDataverseStatement(MetadataProvider metadataProvider,
-            CreateDataverseStatement stmtCreateDataverse) throws Exception {
+            CreateDataverseStatement stmtCreateDataverse, IRequestParameters requestParameters) throws Exception {
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
@@ -929,35 +929,43 @@
     public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
         CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
-        SourceLocation sourceLoc = stmtCreateIndex.getSourceLocation();
         DataverseName dataverseName = getActiveDataverseName(stmtCreateIndex.getDataverseName());
         String datasetName = stmtCreateIndex.getDatasetName().getValue();
-        String indexName = stmtCreateIndex.getIndexName().getValue();
-        IndexType indexType = stmtCreateIndex.getIndexType();
-        List<Integer> keySourceIndicators = stmtCreateIndex.getFieldSourceIndicators();
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        boolean isSecondaryPrimary = stmtCreateIndex.getFieldExprs().isEmpty();
-        Dataset ds;
-        Index index;
         lockUtil.createIndexBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
         try {
+            doCreateIndex(metadataProvider, stmtCreateIndex, dataverseName, datasetName, hcc, requestParameters);
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+    }
+
+    protected void doCreateIndex(MetadataProvider metadataProvider, CreateIndexStatement stmtCreateIndex,
+            DataverseName dataverseName, String datasetName, IHyracksClientConnection hcc,
+            IRequestParameters requestParameters) throws Exception {
+        SourceLocation sourceLoc = stmtCreateIndex.getSourceLocation();
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        try {
             // Check if the dataverse exists
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
             if (dv == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName);
             }
 
-            ds = metadataProvider.findDataset(dataverseName, datasetName);
+            Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
             if (ds == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName,
                         dataverseName);
             }
 
             DatasetType datasetType = ds.getDatasetType();
+            IndexType indexType = stmtCreateIndex.getIndexType();
+            boolean isSecondaryPrimary = stmtCreateIndex.getFieldExprs().isEmpty();
             validateIndexType(datasetType, indexType, isSecondaryPrimary, sourceLoc);
 
-            index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+            String indexName = stmtCreateIndex.getIndexName().getValue();
+            Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
                     datasetName, indexName);
             if (index != null) {
                 if (stmtCreateIndex.getIfNotExists()) {
@@ -968,9 +976,12 @@
                 }
             }
 
-            // find keySourceIndicators for secondary primary index since the parser isn't aware of them
+            List<Integer> keySourceIndicators;
             if (isSecondaryPrimary && datasetType == DatasetType.INTERNAL) {
+                // find keySourceIndicators for secondary primary index since the parser isn't aware of them
                 keySourceIndicators = ((InternalDatasetDetails) ds.getDatasetDetails()).getKeySourceIndicator();
+            } else {
+                keySourceIndicators = stmtCreateIndex.getFieldSourceIndicators();
             }
             // disable creating an index on meta fields (fields with source indicator == 1 are meta fields)
             if (keySourceIndicators.stream().anyMatch(fieldSource -> fieldSource == 1) && !isSecondaryPrimary) {
@@ -1094,13 +1105,19 @@
             Index newIndex = new Index(dataverseName, datasetName, indexName, indexType, indexFields,
                     keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), overridesFieldTypes,
                     stmtCreateIndex.isEnforced(), false, MetadataUtil.PENDING_ADD_OP);
-            doCreateIndex(hcc, metadataProvider, ds, newIndex, jobFlags, sourceLoc);
-        } finally {
-            metadataProvider.getLocks().unlock();
+
+            bActiveTxn = false; // doCreateIndexImpl() takes over the current transaction
+            doCreateIndexImpl(hcc, metadataProvider, ds, newIndex, jobFlags, sourceLoc);
+
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+            throw e;
         }
     }
 
-    protected void doCreateIndex(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Dataset ds,
+    private void doCreateIndexImpl(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Dataset ds,
             Index index, EnumSet<JobFlag> jobFlags, SourceLocation sourceLoc) throws Exception {
         ProgressState progress = ProgressState.NO_PROGRESS;
         boolean bActiveTxn = true;
@@ -1447,6 +1464,13 @@
                 }
             }
 
+            if (stmtDropDataverse.getIfEmpty() && isDataverseNotEmpty(dataverseName, mdTxnCtx)) {
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                return false;
+            }
+
+            validateDataverseStateBeforeDrop(metadataProvider, dv, sourceLoc);
+
             // #. prepare jobs which will drop corresponding feed storage
             ActiveNotificationHandler activeEventHandler =
                     (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
@@ -1580,6 +1604,16 @@
         }
     }
 
+    protected boolean isDataverseNotEmpty(DataverseName dataverseName, MetadataTransactionContext mdTxnCtx)
+            throws AlgebricksException {
+        return MetadataManager.INSTANCE.isDataverseNotEmpty(mdTxnCtx, dataverseName);
+    }
+
+    protected void validateDataverseStateBeforeDrop(MetadataProvider metadataProvider, Dataverse dataverse,
+            SourceLocation sourceLoc) throws AlgebricksException {
+        // may be overriden by product extensions for additional checks before dropping the dataverse
+    }
+
     public void handleDatasetDropStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
         DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
@@ -1596,7 +1630,7 @@
         }
     }
 
-    public void doDropDataset(DataverseName dataverseName, String datasetName, MetadataProvider metadataProvider,
+    protected boolean doDropDataset(DataverseName dataverseName, String datasetName, MetadataProvider metadataProvider,
             boolean ifExists, IHyracksClientConnection hcc, IRequestParameters requestParameters,
             boolean dropCorrespondingNodeGroup, SourceLocation sourceLoc) throws Exception {
         MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
@@ -1615,7 +1649,7 @@
             if (ds == null) {
                 if (ifExists) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
-                    return;
+                    return false;
                 } else {
                     throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName,
                             dataverseName);
@@ -1656,6 +1690,7 @@
             }
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
+            return true;
         } catch (Exception e) {
             if (bActiveTxn.booleanValue()) {
                 abort(e, e, mdTxnCtx.getValue());
@@ -1693,20 +1728,31 @@
 
     protected void handleIndexDropStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
-
         IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
-        SourceLocation sourceLoc = stmtIndexDrop.getSourceLocation();
-        String datasetName = stmtIndexDrop.getDatasetName().getValue();
         DataverseName dataverseName = getActiveDataverseName(stmtIndexDrop.getDataverseName());
+        String datasetName = stmtIndexDrop.getDatasetName().getValue();
+        lockUtil.dropIndexBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
+        try {
+            doDropIndex(metadataProvider, stmtIndexDrop, dataverseName, datasetName, hcc, requestParameters);
+        } finally {
+            metadataProvider.getLocks().unlock();
+            ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
+        }
+    }
+
+    protected boolean doDropIndex(MetadataProvider metadataProvider, IndexDropStatement stmtIndexDrop,
+            DataverseName dataverseName, String datasetName, IHyracksClientConnection hcc,
+            IRequestParameters requestParameters) throws Exception {
+        SourceLocation sourceLoc = stmtIndexDrop.getSourceLocation();
         String indexName = stmtIndexDrop.getIndexName().getValue();
         ProgressState progress = ProgressState.NO_PROGRESS;
+        List<JobSpecification> jobsToExecute = new ArrayList<>();
+        // For external index
+        boolean dropFilesIndex = false;
+
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        List<JobSpecification> jobsToExecute = new ArrayList<>();
-        lockUtil.dropIndexBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
-        // For external index
-        boolean dropFilesIndex = false;
         try {
             Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
             if (ds == null) {
@@ -1718,7 +1764,7 @@
                 if (index == null) {
                     if (stmtIndexDrop.getIfExists()) {
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                        return;
+                        return false;
                     } else {
                         throw new CompilationException(ErrorCode.UNKNOWN_INDEX, sourceLoc, indexName);
                     }
@@ -1759,7 +1805,7 @@
                 if (index == null) {
                     if (stmtIndexDrop.getIfExists()) {
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                        return;
+                        return false;
                     } else {
                         throw new CompilationException(ErrorCode.UNKNOWN_INDEX, sourceLoc, indexName);
                     }
@@ -1825,7 +1871,7 @@
                 }
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
+            return true;
         } catch (Exception e) {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
@@ -1863,10 +1909,6 @@
             }
 
             throw e;
-
-        } finally {
-            metadataProvider.getLocks().unlock();
-            ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
     }
 
@@ -2158,7 +2200,7 @@
         }
     }
 
-    protected void doDropFunction(MetadataProvider metadataProvider, FunctionDropStatement stmtDropFunction,
+    protected boolean doDropFunction(MetadataProvider metadataProvider, FunctionDropStatement stmtDropFunction,
             FunctionSignature signature) throws Exception {
         SourceLocation sourceLoc = stmtDropFunction.getSourceLocation();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -2166,13 +2208,17 @@
         try {
             Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
             if (function == null) {
-                if (!stmtDropFunction.getIfExists()) {
+                if (stmtDropFunction.getIfExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return false;
+                } else {
                     throw new CompilationException(ErrorCode.UNKNOWN_FUNCTION, sourceLoc, signature);
                 }
-            } else {
-                MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
             }
+
+            MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            return true;
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
             throw e;
@@ -2428,7 +2474,7 @@
         }
     }
 
-    protected void doDropSynonym(MetadataProvider metadataProvider, SynonymDropStatement stmtSynDrop,
+    protected boolean doDropSynonym(MetadataProvider metadataProvider, SynonymDropStatement stmtSynDrop,
             DataverseName dataverseName, String synonymName) throws Exception {
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2437,12 +2483,13 @@
             if (synonym == null) {
                 if (stmtSynDrop.getIfExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    return;
+                    return false;
                 }
                 throw new CompilationException(ErrorCode.UNKNOWN_SYNONYM, stmtSynDrop.getSourceLocation(), synonymName);
             }
             MetadataManager.INSTANCE.dropSynonym(mdTxnCtx, dataverseName, synonymName);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            return true;
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
             throw e;
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DataverseDropStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DataverseDropStatement.java
index c20bd32..fa6bc4e 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DataverseDropStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DataverseDropStatement.java
@@ -28,10 +28,16 @@
 
     private DataverseName dataverseName;
     private boolean ifExists;
+    private boolean ifEmpty;
 
     public DataverseDropStatement(DataverseName dataverseName, boolean ifExists) {
+        this(dataverseName, ifExists, false);
+    }
+
+    public DataverseDropStatement(DataverseName dataverseName, boolean ifExists, boolean ifEmpty) {
         this.dataverseName = dataverseName;
         this.ifExists = ifExists;
+        this.ifEmpty = ifEmpty;
     }
 
     @Override
@@ -47,6 +53,10 @@
         return ifExists;
     }
 
+    public boolean getIfEmpty() {
+        return ifEmpty;
+    }
+
     @Override
     public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
         return visitor.visit(this, arg);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 7f50cab..afc0dc5 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -180,6 +180,16 @@
     }
 
     @Override
+    public boolean isDataverseNotEmpty(MetadataTransactionContext ctx, DataverseName dataverseName)
+            throws AlgebricksException {
+        try {
+            return metadataNode.isDataverseNotEmpty(ctx.getTxnId(), dataverseName);
+        } catch (RemoteException e) {
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
+        }
+    }
+
+    @Override
     public List<Dataverse> getDataverses(MetadataTransactionContext ctx) throws AlgebricksException {
         try {
             return metadataNode.getDataverses(ctx.getTxnId());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index d5d181f..169e436 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -596,6 +596,18 @@
     }
 
     @Override
+    public boolean isDataverseNotEmpty(TxnId txnId, DataverseName dataverseName) throws AlgebricksException {
+        return !getDataverseDatatypes(txnId, dataverseName).isEmpty()
+                || !getDataverseDatasets(txnId, dataverseName).isEmpty()
+                || !getDataverseLibraries(txnId, dataverseName).isEmpty()
+                || !getDataverseAdapters(txnId, dataverseName).isEmpty()
+                || !getDataverseFunctions(txnId, dataverseName).isEmpty()
+                || !getDataverseFeedPolicies(txnId, dataverseName).isEmpty()
+                || !getDataverseFeeds(txnId, dataverseName).isEmpty()
+                || !getDataverseSynonyms(txnId, dataverseName).isEmpty();
+    }
+
+    @Override
     public void dropDataset(TxnId txnId, DataverseName dataverseName, String datasetName) throws AlgebricksException {
         dropDataset(txnId, dataverseName, datasetName, false);
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index 09a4a94..da4d2fb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -150,6 +150,16 @@
     void dropDataverse(MetadataTransactionContext ctx, DataverseName dataverseName) throws AlgebricksException;
 
     /**
+     * Returns {@code true} if the dataverse with given name is not empty
+     * (i.e. contains any datatypes, datasets or any other entities).
+     *  @param ctx
+     *            MetadataTransactionContext of an active metadata transaction.
+     * @param dataverseName
+     *            Name of the dataverse.
+     */
+    boolean isDataverseNotEmpty(MetadataTransactionContext ctx, DataverseName dataverseName) throws AlgebricksException;
+
+    /**
      * Inserts a new dataset into the metadata.
      *
      * @param ctx
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index 0330008..6a450f2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -140,14 +140,24 @@
      *
      * @param txnId
      *            A globally unique id for an active metadata transaction.
-     * @return A list of dataset instances.
+     * @param dataverseName
+     *            Name of the dataverse to drop.
      * @throws AlgebricksException
      *             For example, if the dataverse does not exist.
-     * @throws RemoteException
      */
     void dropDataverse(TxnId txnId, DataverseName dataverseName) throws AlgebricksException, RemoteException;
 
     /**
+     * Returns {@code true} if given dataverse is not empty
+     * (i.e. contains any datatypes, datasets or any other entities).
+     *  @param txnId
+     *            A globally unique id for an active metadata transaction.
+     * @param dataverseName
+     *            Name of the dataverse
+     */
+    boolean isDataverseNotEmpty(TxnId txnId, DataverseName dataverseName) throws AlgebricksException, RemoteException;
+
+    /**
      * Inserts a new dataset into the metadata, acquiring local locks on behalf of
      * the given transaction id.
      *