diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b2477a4..39d9c48d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -512,7 +512,7 @@
         DataverseName dvName = stmtCreateDataverse.getDataverseName();
         lockUtil.createDataverseBegin(lockManager, metadataProvider.getLocks(), dvName);
         try {
-            doCreateDataverseStatement(metadataProvider, stmtCreateDataverse);
+            doCreateDataverseStatement(metadataProvider, stmtCreateDataverse, requestParameters);
         } finally {
             metadataProvider.getLocks().unlock();
         }
@@ -520,7 +520,7 @@
 
     @SuppressWarnings("squid:S00112")
     protected boolean doCreateDataverseStatement(MetadataProvider metadataProvider,
-            CreateDataverseStatement stmtCreateDataverse) throws Exception {
+            CreateDataverseStatement stmtCreateDataverse, IRequestParameters requestParameters) throws Exception {
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
@@ -929,35 +929,43 @@
     public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
         CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
-        SourceLocation sourceLoc = stmtCreateIndex.getSourceLocation();
         DataverseName dataverseName = getActiveDataverseName(stmtCreateIndex.getDataverseName());
         String datasetName = stmtCreateIndex.getDatasetName().getValue();
-        String indexName = stmtCreateIndex.getIndexName().getValue();
-        IndexType indexType = stmtCreateIndex.getIndexType();
-        List<Integer> keySourceIndicators = stmtCreateIndex.getFieldSourceIndicators();
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        boolean isSecondaryPrimary = stmtCreateIndex.getFieldExprs().isEmpty();
-        Dataset ds;
-        Index index;
         lockUtil.createIndexBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
         try {
+            doCreateIndex(metadataProvider, stmtCreateIndex, dataverseName, datasetName, hcc, requestParameters);
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+    }
+
+    protected void doCreateIndex(MetadataProvider metadataProvider, CreateIndexStatement stmtCreateIndex,
+            DataverseName dataverseName, String datasetName, IHyracksClientConnection hcc,
+            IRequestParameters requestParameters) throws Exception {
+        SourceLocation sourceLoc = stmtCreateIndex.getSourceLocation();
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        try {
             // Check if the dataverse exists
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
             if (dv == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName);
             }
 
-            ds = metadataProvider.findDataset(dataverseName, datasetName);
+            Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
             if (ds == null) {
                 throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName,
                         dataverseName);
             }
 
             DatasetType datasetType = ds.getDatasetType();
+            IndexType indexType = stmtCreateIndex.getIndexType();
+            boolean isSecondaryPrimary = stmtCreateIndex.getFieldExprs().isEmpty();
             validateIndexType(datasetType, indexType, isSecondaryPrimary, sourceLoc);
 
-            index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
+            String indexName = stmtCreateIndex.getIndexName().getValue();
+            Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
                     datasetName, indexName);
             if (index != null) {
                 if (stmtCreateIndex.getIfNotExists()) {
@@ -968,9 +976,12 @@
                 }
             }
 
-            // find keySourceIndicators for secondary primary index since the parser isn't aware of them
+            List<Integer> keySourceIndicators;
             if (isSecondaryPrimary && datasetType == DatasetType.INTERNAL) {
+                // find keySourceIndicators for secondary primary index since the parser isn't aware of them
                 keySourceIndicators = ((InternalDatasetDetails) ds.getDatasetDetails()).getKeySourceIndicator();
+            } else {
+                keySourceIndicators = stmtCreateIndex.getFieldSourceIndicators();
             }
             // disable creating an index on meta fields (fields with source indicator == 1 are meta fields)
             if (keySourceIndicators.stream().anyMatch(fieldSource -> fieldSource == 1) && !isSecondaryPrimary) {
@@ -1094,13 +1105,19 @@
             Index newIndex = new Index(dataverseName, datasetName, indexName, indexType, indexFields,
                     keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), overridesFieldTypes,
                     stmtCreateIndex.isEnforced(), false, MetadataUtil.PENDING_ADD_OP);
-            doCreateIndex(hcc, metadataProvider, ds, newIndex, jobFlags, sourceLoc);
-        } finally {
-            metadataProvider.getLocks().unlock();
+
+            bActiveTxn = false; // doCreateIndexImpl() takes over the current transaction
+            doCreateIndexImpl(hcc, metadataProvider, ds, newIndex, jobFlags, sourceLoc);
+
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+            throw e;
         }
     }
 
-    protected void doCreateIndex(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Dataset ds,
+    private void doCreateIndexImpl(IHyracksClientConnection hcc, MetadataProvider metadataProvider, Dataset ds,
             Index index, EnumSet<JobFlag> jobFlags, SourceLocation sourceLoc) throws Exception {
         ProgressState progress = ProgressState.NO_PROGRESS;
         boolean bActiveTxn = true;
@@ -1447,6 +1464,13 @@
                 }
             }
 
+            if (stmtDropDataverse.getIfEmpty() && isDataverseNotEmpty(dataverseName, mdTxnCtx)) {
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                return false;
+            }
+
+            validateDataverseStateBeforeDrop(metadataProvider, dv, sourceLoc);
+
             // #. prepare jobs which will drop corresponding feed storage
             ActiveNotificationHandler activeEventHandler =
                     (ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
@@ -1580,6 +1604,16 @@
         }
     }
 
+    protected boolean isDataverseNotEmpty(DataverseName dataverseName, MetadataTransactionContext mdTxnCtx)
+            throws AlgebricksException {
+        return MetadataManager.INSTANCE.isDataverseNotEmpty(mdTxnCtx, dataverseName);
+    }
+
+    protected void validateDataverseStateBeforeDrop(MetadataProvider metadataProvider, Dataverse dataverse,
+            SourceLocation sourceLoc) throws AlgebricksException {
+        // may be overriden by product extensions for additional checks before dropping the dataverse
+    }
+
     public void handleDatasetDropStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
         DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
@@ -1596,7 +1630,7 @@
         }
     }
 
-    public void doDropDataset(DataverseName dataverseName, String datasetName, MetadataProvider metadataProvider,
+    protected boolean doDropDataset(DataverseName dataverseName, String datasetName, MetadataProvider metadataProvider,
             boolean ifExists, IHyracksClientConnection hcc, IRequestParameters requestParameters,
             boolean dropCorrespondingNodeGroup, SourceLocation sourceLoc) throws Exception {
         MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
@@ -1615,7 +1649,7 @@
             if (ds == null) {
                 if (ifExists) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
-                    return;
+                    return false;
                 } else {
                     throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName,
                             dataverseName);
@@ -1656,6 +1690,7 @@
             }
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
+            return true;
         } catch (Exception e) {
             if (bActiveTxn.booleanValue()) {
                 abort(e, e, mdTxnCtx.getValue());
@@ -1693,20 +1728,31 @@
 
     protected void handleIndexDropStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
-
         IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
-        SourceLocation sourceLoc = stmtIndexDrop.getSourceLocation();
-        String datasetName = stmtIndexDrop.getDatasetName().getValue();
         DataverseName dataverseName = getActiveDataverseName(stmtIndexDrop.getDataverseName());
+        String datasetName = stmtIndexDrop.getDatasetName().getValue();
+        lockUtil.dropIndexBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
+        try {
+            doDropIndex(metadataProvider, stmtIndexDrop, dataverseName, datasetName, hcc, requestParameters);
+        } finally {
+            metadataProvider.getLocks().unlock();
+            ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
+        }
+    }
+
+    protected boolean doDropIndex(MetadataProvider metadataProvider, IndexDropStatement stmtIndexDrop,
+            DataverseName dataverseName, String datasetName, IHyracksClientConnection hcc,
+            IRequestParameters requestParameters) throws Exception {
+        SourceLocation sourceLoc = stmtIndexDrop.getSourceLocation();
         String indexName = stmtIndexDrop.getIndexName().getValue();
         ProgressState progress = ProgressState.NO_PROGRESS;
+        List<JobSpecification> jobsToExecute = new ArrayList<>();
+        // For external index
+        boolean dropFilesIndex = false;
+
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        List<JobSpecification> jobsToExecute = new ArrayList<>();
-        lockUtil.dropIndexBegin(lockManager, metadataProvider.getLocks(), dataverseName, datasetName);
-        // For external index
-        boolean dropFilesIndex = false;
         try {
             Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
             if (ds == null) {
@@ -1718,7 +1764,7 @@
                 if (index == null) {
                     if (stmtIndexDrop.getIfExists()) {
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                        return;
+                        return false;
                     } else {
                         throw new CompilationException(ErrorCode.UNKNOWN_INDEX, sourceLoc, indexName);
                     }
@@ -1759,7 +1805,7 @@
                 if (index == null) {
                     if (stmtIndexDrop.getIfExists()) {
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                        return;
+                        return false;
                     } else {
                         throw new CompilationException(ErrorCode.UNKNOWN_INDEX, sourceLoc, indexName);
                     }
@@ -1825,7 +1871,7 @@
                 }
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
+            return true;
         } catch (Exception e) {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
@@ -1863,10 +1909,6 @@
             }
 
             throw e;
-
-        } finally {
-            metadataProvider.getLocks().unlock();
-            ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
     }
 
@@ -2158,7 +2200,7 @@
         }
     }
 
-    protected void doDropFunction(MetadataProvider metadataProvider, FunctionDropStatement stmtDropFunction,
+    protected boolean doDropFunction(MetadataProvider metadataProvider, FunctionDropStatement stmtDropFunction,
             FunctionSignature signature) throws Exception {
         SourceLocation sourceLoc = stmtDropFunction.getSourceLocation();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -2166,13 +2208,17 @@
         try {
             Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
             if (function == null) {
-                if (!stmtDropFunction.getIfExists()) {
+                if (stmtDropFunction.getIfExists()) {
+                    MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    return false;
+                } else {
                     throw new CompilationException(ErrorCode.UNKNOWN_FUNCTION, sourceLoc, signature);
                 }
-            } else {
-                MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
             }
+
+            MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            return true;
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
             throw e;
@@ -2428,7 +2474,7 @@
         }
     }
 
-    protected void doDropSynonym(MetadataProvider metadataProvider, SynonymDropStatement stmtSynDrop,
+    protected boolean doDropSynonym(MetadataProvider metadataProvider, SynonymDropStatement stmtSynDrop,
             DataverseName dataverseName, String synonymName) throws Exception {
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2437,12 +2483,13 @@
             if (synonym == null) {
                 if (stmtSynDrop.getIfExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    return;
+                    return false;
                 }
                 throw new CompilationException(ErrorCode.UNKNOWN_SYNONYM, stmtSynDrop.getSourceLocation(), synonymName);
             }
             MetadataManager.INSTANCE.dropSynonym(mdTxnCtx, dataverseName, synonymName);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            return true;
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
             throw e;
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DataverseDropStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DataverseDropStatement.java
index c20bd32..fa6bc4e 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DataverseDropStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DataverseDropStatement.java
@@ -28,10 +28,16 @@
 
     private DataverseName dataverseName;
     private boolean ifExists;
+    private boolean ifEmpty;
 
     public DataverseDropStatement(DataverseName dataverseName, boolean ifExists) {
+        this(dataverseName, ifExists, false);
+    }
+
+    public DataverseDropStatement(DataverseName dataverseName, boolean ifExists, boolean ifEmpty) {
         this.dataverseName = dataverseName;
         this.ifExists = ifExists;
+        this.ifEmpty = ifEmpty;
     }
 
     @Override
@@ -47,6 +53,10 @@
         return ifExists;
     }
 
+    public boolean getIfEmpty() {
+        return ifEmpty;
+    }
+
     @Override
     public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
         return visitor.visit(this, arg);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 7f50cab..afc0dc5 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -180,6 +180,16 @@
     }
 
     @Override
+    public boolean isDataverseNotEmpty(MetadataTransactionContext ctx, DataverseName dataverseName)
+            throws AlgebricksException {
+        try {
+            return metadataNode.isDataverseNotEmpty(ctx.getTxnId(), dataverseName);
+        } catch (RemoteException e) {
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
+        }
+    }
+
+    @Override
     public List<Dataverse> getDataverses(MetadataTransactionContext ctx) throws AlgebricksException {
         try {
             return metadataNode.getDataverses(ctx.getTxnId());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index d5d181f..169e436 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -596,6 +596,18 @@
     }
 
     @Override
+    public boolean isDataverseNotEmpty(TxnId txnId, DataverseName dataverseName) throws AlgebricksException {
+        return !getDataverseDatatypes(txnId, dataverseName).isEmpty()
+                || !getDataverseDatasets(txnId, dataverseName).isEmpty()
+                || !getDataverseLibraries(txnId, dataverseName).isEmpty()
+                || !getDataverseAdapters(txnId, dataverseName).isEmpty()
+                || !getDataverseFunctions(txnId, dataverseName).isEmpty()
+                || !getDataverseFeedPolicies(txnId, dataverseName).isEmpty()
+                || !getDataverseFeeds(txnId, dataverseName).isEmpty()
+                || !getDataverseSynonyms(txnId, dataverseName).isEmpty();
+    }
+
+    @Override
     public void dropDataset(TxnId txnId, DataverseName dataverseName, String datasetName) throws AlgebricksException {
         dropDataset(txnId, dataverseName, datasetName, false);
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index 09a4a94..da4d2fb 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -150,6 +150,16 @@
     void dropDataverse(MetadataTransactionContext ctx, DataverseName dataverseName) throws AlgebricksException;
 
     /**
+     * Returns {@code true} if the dataverse with given name is not empty
+     * (i.e. contains any datatypes, datasets or any other entities).
+     *  @param ctx
+     *            MetadataTransactionContext of an active metadata transaction.
+     * @param dataverseName
+     *            Name of the dataverse.
+     */
+    boolean isDataverseNotEmpty(MetadataTransactionContext ctx, DataverseName dataverseName) throws AlgebricksException;
+
+    /**
      * Inserts a new dataset into the metadata.
      *
      * @param ctx
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index 0330008..6a450f2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -140,14 +140,24 @@
      *
      * @param txnId
      *            A globally unique id for an active metadata transaction.
-     * @return A list of dataset instances.
+     * @param dataverseName
+     *            Name of the dataverse to drop.
      * @throws AlgebricksException
      *             For example, if the dataverse does not exist.
-     * @throws RemoteException
      */
     void dropDataverse(TxnId txnId, DataverseName dataverseName) throws AlgebricksException, RemoteException;
 
     /**
+     * Returns {@code true} if given dataverse is not empty
+     * (i.e. contains any datatypes, datasets or any other entities).
+     *  @param txnId
+     *            A globally unique id for an active metadata transaction.
+     * @param dataverseName
+     *            Name of the dataverse
+     */
+    boolean isDataverseNotEmpty(TxnId txnId, DataverseName dataverseName) throws AlgebricksException, RemoteException;
+
+    /**
      * Inserts a new dataset into the metadata, acquiring local locks on behalf of
      * the given transaction id.
      *
