[NO ISSUE][*DB] Provide ability for IDatasetRebalanceCallback to decline a rebalance

Change-Id: I1b2cdb56d70ce47c7b15061cc5ecc4af36ed8f61
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17608
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Murtadha Al Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
index e8683c9..c366977 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
@@ -30,7 +30,7 @@
 public interface IDatasetRebalanceCallback {
 
     /**
-     * The action to perform before the target dataset is populated.
+     * The check to perform before the target dataset is populated.
      *
      * @param metadataProvider,
      *            the metadata provider.
@@ -40,9 +40,13 @@
      *            the target dataset.
      * @param hcc,
      *            the hyracks client connection.
+     *
+     * @return <code>true</code> if the rebalance of the dataset should proceed, otherwise <code>false</code> to skip.
+     *         If the dataset is skipped, the active metadata transaction context, if any, can be expected to be aborted.
+     *
      * @throws HyracksDataException
      */
-    void beforeRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
+    boolean canRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
             IHyracksClientConnection hcc) throws HyracksDataException;
 
     /**
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
index 680adbf..7085567 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
@@ -33,9 +33,10 @@
     }
 
     @Override
-    public void beforeRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
+    public boolean canRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
             IHyracksClientConnection hcc) {
         // Does nothing.
+        return true;
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 61562d8..601cd02 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -20,6 +20,8 @@
 
 import static org.apache.asterix.app.translator.QueryTranslator.abort;
 import static org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
+import static org.apache.asterix.metadata.utils.DatasetUtil.getFullyQualifiedDisplayName;
 import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 
 import java.rmi.RemoteException;
@@ -90,13 +92,15 @@
      *            the metadata provider.
      * @param hcc,
      *            the reusable hyracks connection.
+     * @return <code>false</code> if the rebalance was safely skipped
      * @throws Exception
      */
-    public static void rebalance(DataverseName dataverseName, String datasetName, Set<String> targetNcNames,
+    public static boolean rebalance(DataverseName dataverseName, String datasetName, Set<String> targetNcNames,
             MetadataProvider metadataProvider, IHyracksClientConnection hcc,
             IDatasetRebalanceCallback datasetRebalanceCallback, boolean forceRebalance) throws Exception {
         Dataset sourceDataset;
         Dataset targetDataset;
+        boolean success = true;
         // Executes the first Metadata transaction.
         // Generates the rebalance target files. While doing that, hold read locks on the dataset so
         // that no one can drop the rebalance source dataset.
@@ -108,13 +112,13 @@
 
             // If the source dataset doesn't exist, then it's a no-op.
             if (sourceDataset == null) {
-                return;
+                return true;
             }
 
             Set<String> sourceNodes = new HashSet<>(metadataProvider.findNodes(sourceDataset.getNodeGroupName()));
 
             if (!forceRebalance && sourceNodes.equals(targetNcNames)) {
-                return;
+                return true;
             }
 
             if (!targetNcNames.isEmpty()) {
@@ -125,20 +129,25 @@
                 // The target dataset for rebalance.
                 targetDataset = sourceDataset.getTargetDatasetForRebalance(nodeGroupName);
 
-                LOGGER.info("Rebalancing dataset {} from node group {} with nodes {} to node group {} with nodes {}",
-                        sourceDataset.getDatasetName(), sourceDataset.getNodeGroupName(), sourceNodes,
-                        targetDataset.getNodeGroupName(), targetNcNames);
+                LOGGER.info("Rebalancing {} {} from node group {} with nodes {} to node group {} with nodes {}",
+                        dataset(), getFullyQualifiedDisplayName(sourceDataset), sourceDataset.getNodeGroupName(),
+                        sourceNodes, targetDataset.getNodeGroupName(), targetNcNames);
                 // Rebalances the source dataset into the target dataset.
                 if (sourceDataset.getDatasetType() != DatasetType.EXTERNAL) {
-                    rebalance(sourceDataset, targetDataset, metadataProvider, hcc, datasetRebalanceCallback);
+                    success = rebalance(sourceDataset, targetDataset, metadataProvider, hcc, datasetRebalanceCallback);
                 }
             } else {
                 targetDataset = null;
                 // if this the last NC in the cluster, just drop the dataset
                 purgeDataset(sourceDataset, metadataProvider, hcc);
             }
-            // Complete the metadata transaction.
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            if (success) {
+                // Complete the metadata transaction.
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            } else {
+                // Abort the metadata transaction, since we failed to rebalance the dataset
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
             throw e;
@@ -146,7 +155,10 @@
 
         if (targetNcNames.isEmpty()) {
             // Nothing else to do since the dataset was dropped.
-            return;
+            return true;
+        } else if (!success) {
+            LOGGER.info("Dataset {} rebalance was skipped, see above log for reason", datasetName);
+            return false;
         }
         // Up to this point, since the bulk part of a rebalance operation is done,
         // the following two operations will retry after interrupt and finally rethrow InterruptedException,
@@ -165,6 +177,7 @@
             runMetadataTransaction(metadataProvider, () -> dropSourceDataset(sourceDataset, metadataProvider, hcc));
         });
         LOGGER.info("Dataset {} rebalance completed successfully", datasetName);
+        return true;
     }
 
     @FunctionalInterface
@@ -214,13 +227,16 @@
     }
 
     // Rebalances from the source to the target.
-    private static void rebalance(Dataset source, Dataset target, MetadataProvider metadataProvider,
+    private static boolean rebalance(Dataset source, Dataset target, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IDatasetRebalanceCallback datasetRebalanceCallback) throws Exception {
         // Drops the target dataset files (if any) to make rebalance idempotent.
         dropDatasetFiles(target, metadataProvider, hcc);
 
         // Performs the specified operation before the target dataset is populated.
-        datasetRebalanceCallback.beforeRebalance(metadataProvider, source, target, hcc);
+        if (!datasetRebalanceCallback.canRebalance(metadataProvider, source, target, hcc)) {
+            // the callback indicates that this rebalance should be skipped; short circuit the remaining steps
+            return false;
+        }
 
         // Creates the rebalance target.
         createRebalanceTarget(target, metadataProvider, hcc);
@@ -233,6 +249,8 @@
 
         // Performs the specified operation after the target dataset is populated.
         datasetRebalanceCallback.afterRebalance(metadataProvider, source, target, hcc);
+
+        return true;
     }
 
     // Switches the metadata entity from the source dataset to the target dataset.
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
index a8d027f..a50fd4a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
@@ -20,6 +20,7 @@
 package org.apache.asterix.metadata.entities;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -42,7 +43,7 @@
         this.nodeNames = nodeNames;
     }
 
-    public static NodeGroup createOrdered(String groupName, List<String> nodeNames) {
+    public static NodeGroup createOrdered(String groupName, Collection<String> nodeNames) {
         List<String> sortedNodeNames = new ArrayList<>(nodeNames);
         Collections.sort(sortedNodeNames);
         return new NodeGroup(groupName, sortedNodeNames);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 6529794..b2e6817 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -21,7 +21,6 @@
 import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
 
 import java.io.DataOutput;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -613,10 +612,10 @@
         appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup);
         NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroup);
         if (ng != null) {
-            nodeGroup = nodeGroup + "_" + UUID.randomUUID().toString();
+            nodeGroup = nodeGroup + "_" + UUID.randomUUID();
             appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup);
         }
-        MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, NodeGroup.createOrdered(nodeGroup, new ArrayList<>(ncNames)));
+        MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, NodeGroup.createOrdered(nodeGroup, ncNames));
         return nodeGroup;
     }