[NO ISSUE][*DB] Provide ability for IDatasetRebalanceCallback to decline a rebalance
Change-Id: I1b2cdb56d70ce47c7b15061cc5ecc4af36ed8f61
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17608
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Murtadha Al Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
index e8683c9..c366977 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
@@ -30,7 +30,7 @@
public interface IDatasetRebalanceCallback {
/**
- * The action to perform before the target dataset is populated.
+ * The check to perform before the target dataset is populated.
*
* @param metadataProvider,
* the metadata provider.
@@ -40,9 +40,13 @@
* the target dataset.
* @param hcc,
* the hyracks client connection.
+ *
+ * @return <code>true</code> if the rebalance of the dataset should proceed, otherwise <code>false</code> to skip.
+ * If the dataset is skipped, the active metadata transaction context, if any, can be expected to be aborted.
+ *
* @throws HyracksDataException
*/
- void beforeRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
+ boolean canRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
IHyracksClientConnection hcc) throws HyracksDataException;
/**
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
index 680adbf..7085567 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
@@ -33,9 +33,10 @@
}
@Override
- public void beforeRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
+ public boolean canRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
IHyracksClientConnection hcc) {
// Does nothing.
+ return true;
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 61562d8..601cd02 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -20,6 +20,8 @@
import static org.apache.asterix.app.translator.QueryTranslator.abort;
import static org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
+import static org.apache.asterix.metadata.utils.DatasetUtil.getFullyQualifiedDisplayName;
import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
import java.rmi.RemoteException;
@@ -90,13 +92,15 @@
* the metadata provider.
* @param hcc,
* the reusable hyracks connection.
+ * @return <code>false</code> if the rebalance was safely skipped
* @throws Exception
*/
- public static void rebalance(DataverseName dataverseName, String datasetName, Set<String> targetNcNames,
+ public static boolean rebalance(DataverseName dataverseName, String datasetName, Set<String> targetNcNames,
MetadataProvider metadataProvider, IHyracksClientConnection hcc,
IDatasetRebalanceCallback datasetRebalanceCallback, boolean forceRebalance) throws Exception {
Dataset sourceDataset;
Dataset targetDataset;
+ boolean success = true;
// Executes the first Metadata transaction.
// Generates the rebalance target files. While doing that, hold read locks on the dataset so
// that no one can drop the rebalance source dataset.
@@ -108,13 +112,13 @@
// If the source dataset doesn't exist, then it's a no-op.
if (sourceDataset == null) {
- return;
+ return true;
}
Set<String> sourceNodes = new HashSet<>(metadataProvider.findNodes(sourceDataset.getNodeGroupName()));
if (!forceRebalance && sourceNodes.equals(targetNcNames)) {
- return;
+ return true;
}
if (!targetNcNames.isEmpty()) {
@@ -125,20 +129,25 @@
// The target dataset for rebalance.
targetDataset = sourceDataset.getTargetDatasetForRebalance(nodeGroupName);
- LOGGER.info("Rebalancing dataset {} from node group {} with nodes {} to node group {} with nodes {}",
- sourceDataset.getDatasetName(), sourceDataset.getNodeGroupName(), sourceNodes,
- targetDataset.getNodeGroupName(), targetNcNames);
+ LOGGER.info("Rebalancing {} {} from node group {} with nodes {} to node group {} with nodes {}",
+ dataset(), getFullyQualifiedDisplayName(sourceDataset), sourceDataset.getNodeGroupName(),
+ sourceNodes, targetDataset.getNodeGroupName(), targetNcNames);
// Rebalances the source dataset into the target dataset.
if (sourceDataset.getDatasetType() != DatasetType.EXTERNAL) {
- rebalance(sourceDataset, targetDataset, metadataProvider, hcc, datasetRebalanceCallback);
+ success = rebalance(sourceDataset, targetDataset, metadataProvider, hcc, datasetRebalanceCallback);
}
} else {
targetDataset = null;
// if this the last NC in the cluster, just drop the dataset
purgeDataset(sourceDataset, metadataProvider, hcc);
}
- // Complete the metadata transaction.
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ if (success) {
+ // Complete the metadata transaction.
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } else {
+ // Abort the metadata transaction, since we failed to rebalance the dataset
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
} catch (Exception e) {
abort(e, e, mdTxnCtx);
throw e;
@@ -146,7 +155,10 @@
if (targetNcNames.isEmpty()) {
// Nothing else to do since the dataset was dropped.
- return;
+ return true;
+ } else if (!success) {
+ LOGGER.info("Dataset {} rebalance was skipped, see above log for reason", datasetName);
+ return false;
}
// Up to this point, since the bulk part of a rebalance operation is done,
// the following two operations will retry after interrupt and finally rethrow InterruptedException,
@@ -165,6 +177,7 @@
runMetadataTransaction(metadataProvider, () -> dropSourceDataset(sourceDataset, metadataProvider, hcc));
});
LOGGER.info("Dataset {} rebalance completed successfully", datasetName);
+ return true;
}
@FunctionalInterface
@@ -214,13 +227,16 @@
}
// Rebalances from the source to the target.
- private static void rebalance(Dataset source, Dataset target, MetadataProvider metadataProvider,
+ private static boolean rebalance(Dataset source, Dataset target, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IDatasetRebalanceCallback datasetRebalanceCallback) throws Exception {
// Drops the target dataset files (if any) to make rebalance idempotent.
dropDatasetFiles(target, metadataProvider, hcc);
// Performs the specified operation before the target dataset is populated.
- datasetRebalanceCallback.beforeRebalance(metadataProvider, source, target, hcc);
+ if (!datasetRebalanceCallback.canRebalance(metadataProvider, source, target, hcc)) {
+ // the callback indicates that this rebalance should be skipped; short circuit the remaining steps
+ return false;
+ }
// Creates the rebalance target.
createRebalanceTarget(target, metadataProvider, hcc);
@@ -233,6 +249,8 @@
// Performs the specified operation after the target dataset is populated.
datasetRebalanceCallback.afterRebalance(metadataProvider, source, target, hcc);
+
+ return true;
}
// Switches the metadata entity from the source dataset to the target dataset.
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
index a8d027f..a50fd4a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
@@ -20,6 +20,7 @@
package org.apache.asterix.metadata.entities;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -42,7 +43,7 @@
this.nodeNames = nodeNames;
}
- public static NodeGroup createOrdered(String groupName, List<String> nodeNames) {
+ public static NodeGroup createOrdered(String groupName, Collection<String> nodeNames) {
List<String> sortedNodeNames = new ArrayList<>(nodeNames);
Collections.sort(sortedNodeNames);
return new NodeGroup(groupName, sortedNodeNames);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 6529794..b2e6817 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -21,7 +21,6 @@
import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
import java.io.DataOutput;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -613,10 +612,10 @@
appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup);
NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroup);
if (ng != null) {
- nodeGroup = nodeGroup + "_" + UUID.randomUUID().toString();
+ nodeGroup = nodeGroup + "_" + UUID.randomUUID();
appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup);
}
- MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, NodeGroup.createOrdered(nodeGroup, new ArrayList<>(ncNames)));
+ MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, NodeGroup.createOrdered(nodeGroup, ncNames));
return nodeGroup;
}