[ASTERIXDB-2005][CLUS] Treat dataset rebalance to no nodes as drop

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Drop dataset when rebalanced to empty target nodes

Change-Id: I46f687e6006cba952dbfa1fa7771c7c2b7c13472
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1906
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 4174685..ceaf4cf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -112,17 +112,21 @@
                 return;
             }
 
-            // Creates a node group for rebalance.
-            String nodeGroupName = DatasetUtil.createNodeGroupForNewDataset(sourceDataset.getDataverseName(),
-                    sourceDataset.getDatasetName(), sourceDataset.getRebalanceCount() + 1, targetNcNames,
-                    metadataProvider);
+            if (!targetNcNames.isEmpty()) {
+                // Creates a node group for rebalance.
+                String nodeGroupName = DatasetUtil
+                        .createNodeGroupForNewDataset(sourceDataset.getDataverseName(), sourceDataset.getDatasetName(),
+                                sourceDataset.getRebalanceCount() + 1, targetNcNames, metadataProvider);
+                // The target dataset for rebalance.
+                targetDataset = sourceDataset.getTargetDatasetForRebalance(nodeGroupName);
 
-            // The target dataset for rebalance.
-            targetDataset = sourceDataset.getTargetDatasetForRebalance(nodeGroupName);
-
-            // Rebalances the source dataset into the target dataset.
-            rebalance(sourceDataset, targetDataset, metadataProvider, hcc, datasetRebalanceCallback);
-
+                // Rebalances the source dataset into the target dataset.
+                rebalance(sourceDataset, targetDataset, metadataProvider, hcc, datasetRebalanceCallback);
+            } else {
+                targetDataset = null;
+                // if this the last NC in the cluster, just drop the dataset
+                purgeDataset(sourceDataset, metadataProvider, hcc);
+            }
             // Complete the metadata transaction.
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
@@ -130,6 +134,10 @@
             throw e;
         }
 
+        if (targetNcNames.isEmpty()) {
+            // Nothing else to do since the dataset was dropped.
+            return;
+        }
         // Up to this point, since the bulk part of a rebalance operation is done,
         // the following two operations will retry after interrupt and finally rethrow InterruptedException,
         // which means that they will always succeed and could possibly throw InterruptedException as the last step.
@@ -243,7 +251,11 @@
         // been detached at this point.
         dropDatasetFiles(source, metadataProvider, hcc);
 
-        // Drops the metadata entry of source dataset's node group.
+        tryDropDatasetNodegroup(source, metadataProvider);
+    }
+
+    // Drops the metadata entry of source dataset's node group.
+    private static void tryDropDatasetNodegroup(Dataset source, MetadataProvider metadataProvider) throws Exception {
         ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
         String sourceNodeGroup = source.getNodeGroupName();
         appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), sourceNodeGroup);
@@ -373,4 +385,19 @@
         }
         return pkIndexes;
     }
+
+    private static void purgeDataset(Dataset dataset, MetadataProvider metadataProvider, IHyracksClientConnection hcc)
+            throws Exception {
+        runWithRetryAfterInterrupt(() -> {
+            // drop dataset files
+            dropDatasetFiles(dataset, metadataProvider, hcc);
+
+            // drop dataset entry from metadata
+            runMetadataTransaction(metadataProvider, () -> MetadataManager.INSTANCE
+                    .dropDataset(metadataProvider.getMetadataTxnContext(), dataset.getDataverseName(),
+                            dataset.getDatasetName()));
+            // try to drop the dataset's node group
+            runMetadataTransaction(metadataProvider, () -> tryDropDatasetNodegroup(dataset, metadataProvider));
+        });
+    }
 }