[NO ISSUE][*DB][CLUS] Add support for a rebalance required cluster state
Allow extensions to mandate that a rebalance is required in order for
the cluster to go active
Change-Id: I863f8bf3fe1ce8d59522c9a28a1283006ffa414c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3394
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index c2d3303..e13756c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -30,8 +30,8 @@
PENDING, // the metadata node has not yet joined & initialized
RECOVERING, // global recovery has not yet completed
ACTIVE, // cluster is ACTIVE and ready for requests
- REBALANCING, // replication is processing failbacks
- SHUTTING_DOWN // a shutdown request has been received, and is underway
+ SHUTTING_DOWN, // a shutdown request has been received, and is underway
+ REBALANCE_REQUIRED // one or more datasets require rebalance before the cluster is usable
}
WorkType getClusterManagementWorkType();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 0e62851..6c39372 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -110,6 +111,14 @@
throws HyracksDataException, InterruptedException;
/**
+ * Blocks until the cluster state matches supplied predicate, or timeout is exhausted.
+ *
+ * @return the cluster state matching the predicate if it was satisfied before timeout occurred, otherwise null
+ */
+ ClusterState waitForState(Predicate<ClusterState> condition, long timeout, TimeUnit unit)
+ throws HyracksDataException, InterruptedException;
+
+ /**
* Register the specified node partitions with the specified nodeId with this cluster state manager
* then calls {@link IClusterStateManager#refreshState()}
*
@@ -250,4 +259,10 @@
* @return The metadata cluster partitions
*/
ClusterPartition getMetadataPartition();
+
+ /**
+ * Indicate whether one or more datasets must be rebalanced before the cluster becomes ACTIVE
+ * @param rebalanceRequired
+ */
+ void setRebalanceRequired(boolean rebalanceRequired) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 16a479e..7933cd2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -28,6 +28,7 @@
import java.util.SortedMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
@@ -75,6 +76,7 @@
private INcLifecycleCoordinator lifecycleCoordinator;
private ICcApplicationContext appCtx;
private ClusterPartition metadataPartition;
+ private boolean rebalanceRequired;
@Override
public void setCcAppCtx(ICcApplicationContext appCtx) {
@@ -186,45 +188,55 @@
return;
}
// the metadata bootstrap & global recovery must be complete before the cluster can be active
- if (metadataNodeActive) {
- if (state != ClusterState.ACTIVE && state != ClusterState.RECOVERING) {
- setState(ClusterState.PENDING);
- }
- appCtx.getMetadataBootstrap().init();
-
- if (appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) {
- setState(ClusterState.ACTIVE);
- } else {
- // start global recovery
- setState(ClusterState.RECOVERING);
- appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
- }
- } else {
+ if (!metadataNodeActive) {
+ setState(ClusterState.PENDING);
+ return;
+ }
+ if (state != ClusterState.ACTIVE && state != ClusterState.RECOVERING) {
setState(ClusterState.PENDING);
}
+ appCtx.getMetadataBootstrap().init();
+
+ if (!appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) {
+ // start global recovery
+ setState(ClusterState.RECOVERING);
+ appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
+ return;
+ }
+ if (rebalanceRequired) {
+ setState(ClusterState.REBALANCE_REQUIRED);
+ return;
+ }
+ // finally- life is good, set the state to ACTIVE
+ setState(ClusterState.ACTIVE);
}
@Override
- public synchronized void waitForState(ClusterState waitForState) throws HyracksDataException, InterruptedException {
+ public synchronized void waitForState(ClusterState waitForState) throws InterruptedException {
while (state != waitForState) {
wait();
}
}
@Override
- public synchronized boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit)
- throws HyracksDataException, InterruptedException {
+ public boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit) throws InterruptedException {
+ return waitForState(waitForState::equals, timeout, unit) != null;
+ }
+
+ @Override
+ public synchronized ClusterState waitForState(Predicate<ClusterState> predicate, long timeout, TimeUnit unit)
+ throws InterruptedException {
final long startMillis = System.currentTimeMillis();
final long endMillis = startMillis + unit.toMillis(timeout);
- while (state != waitForState) {
+ while (!predicate.test(state)) {
long millisToSleep = endMillis - System.currentTimeMillis();
if (millisToSleep > 0) {
wait(millisToSleep);
} else {
- return false;
+ return null;
}
}
- return true;
+ return state;
}
@Override
@@ -458,6 +470,12 @@
return metadataPartition;
}
+ @Override
+ public synchronized void setRebalanceRequired(boolean rebalanceRequired) throws HyracksDataException {
+ this.rebalanceRequired = rebalanceRequired;
+ refreshState();
+ }
+
private void updateClusterCounters(String nodeId, NcLocalCounters localCounters) {
final IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
resourceIdManager.report(nodeId, localCounters.getMaxResourceId());