[NO ISSUE][*DB][CLUS] Add support for a rebalance required cluster state

Allow extensions to mandate that a rebalance is required in order for
the cluster to go active

Change-Id: I863f8bf3fe1ce8d59522c9a28a1283006ffa414c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3394
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index c2d3303..e13756c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -30,8 +30,8 @@
         PENDING, // the metadata node has not yet joined & initialized
         RECOVERING, // global recovery has not yet completed
         ACTIVE, // cluster is ACTIVE and ready for requests
-        REBALANCING, // replication is processing failbacks
-        SHUTTING_DOWN // a shutdown request has been received, and is underway
+        SHUTTING_DOWN, // a shutdown request has been received, and is underway
+        REBALANCE_REQUIRED // one or more datasets require rebalance before the cluster is usable
     }
 
     WorkType getClusterManagementWorkType();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 0e62851..6c39372 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -21,6 +21,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -110,6 +111,14 @@
             throws HyracksDataException, InterruptedException;
 
     /**
+     * Blocks until the cluster state matches supplied predicate, or timeout is exhausted.
+     *
+     * @return the cluster state matching the predicate if it was satisfied before timeout occurred, otherwise null
+     */
+    ClusterState waitForState(Predicate<ClusterState> condition, long timeout, TimeUnit unit)
+            throws HyracksDataException, InterruptedException;
+
+    /**
      * Register the specified node partitions with the specified nodeId with this cluster state manager
      * then calls {@link IClusterStateManager#refreshState()}
      *
@@ -250,4 +259,10 @@
      * @return The metadata cluster partitions
      */
     ClusterPartition getMetadataPartition();
+
+    /**
+     * Indicate whether one or more datasets must be rebalanced before the cluster becomes ACTIVE
+     * @param rebalanceRequired
+     */
+    void setRebalanceRequired(boolean rebalanceRequired) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 16a479e..7933cd2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -28,6 +28,7 @@
 import java.util.SortedMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
@@ -75,6 +76,7 @@
     private INcLifecycleCoordinator lifecycleCoordinator;
     private ICcApplicationContext appCtx;
     private ClusterPartition metadataPartition;
+    private boolean rebalanceRequired;
 
     @Override
     public void setCcAppCtx(ICcApplicationContext appCtx) {
@@ -186,45 +188,55 @@
             return;
         }
         // the metadata bootstrap & global recovery must be complete before the cluster can be active
-        if (metadataNodeActive) {
-            if (state != ClusterState.ACTIVE && state != ClusterState.RECOVERING) {
-                setState(ClusterState.PENDING);
-            }
-            appCtx.getMetadataBootstrap().init();
-
-            if (appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) {
-                setState(ClusterState.ACTIVE);
-            } else {
-                // start global recovery
-                setState(ClusterState.RECOVERING);
-                appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
-            }
-        } else {
+        if (!metadataNodeActive) {
+            setState(ClusterState.PENDING);
+            return;
+        }
+        if (state != ClusterState.ACTIVE && state != ClusterState.RECOVERING) {
             setState(ClusterState.PENDING);
         }
+        appCtx.getMetadataBootstrap().init();
+
+        if (!appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) {
+            // start global recovery
+            setState(ClusterState.RECOVERING);
+            appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
+            return;
+        }
+        if (rebalanceRequired) {
+            setState(ClusterState.REBALANCE_REQUIRED);
+            return;
+        }
+        // finally- life is good, set the state to ACTIVE
+        setState(ClusterState.ACTIVE);
     }
 
     @Override
-    public synchronized void waitForState(ClusterState waitForState) throws HyracksDataException, InterruptedException {
+    public synchronized void waitForState(ClusterState waitForState) throws InterruptedException {
         while (state != waitForState) {
             wait();
         }
     }
 
     @Override
-    public synchronized boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit)
-            throws HyracksDataException, InterruptedException {
+    public boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit) throws InterruptedException {
+        return waitForState(waitForState::equals, timeout, unit) != null;
+    }
+
+    @Override
+    public synchronized ClusterState waitForState(Predicate<ClusterState> predicate, long timeout, TimeUnit unit)
+            throws InterruptedException {
         final long startMillis = System.currentTimeMillis();
         final long endMillis = startMillis + unit.toMillis(timeout);
-        while (state != waitForState) {
+        while (!predicate.test(state)) {
             long millisToSleep = endMillis - System.currentTimeMillis();
             if (millisToSleep > 0) {
                 wait(millisToSleep);
             } else {
-                return false;
+                return null;
             }
         }
-        return true;
+        return state;
     }
 
     @Override
@@ -458,6 +470,12 @@
         return metadataPartition;
     }
 
+    @Override
+    public synchronized void setRebalanceRequired(boolean rebalanceRequired) throws HyracksDataException {
+        this.rebalanceRequired = rebalanceRequired;
+        refreshState();
+    }
+
     private void updateClusterCounters(String nodeId, NcLocalCounters localCounters) {
         final IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
         resourceIdManager.report(nodeId, localCounters.getMaxResourceId());