Wait For ClusterState ACTIVE Before Notifying ZK

Defer notifying ZooKeeper that the cluster is up until it transitions to
ACTIVE

Change-Id: Ieaaeb2876edad9cfa3f23c2cbe00e058bdc1c8cc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1678
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
index 634824c..3cdeb6f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
@@ -20,6 +20,7 @@
 
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -38,6 +39,7 @@
 import org.apache.asterix.runtime.utils.AppContextInfo;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * Base class for language translators. Contains the common validation logic for language
@@ -52,17 +54,15 @@
         if (!(ClusterStateManager.INSTANCE.getState().equals(ClusterState.ACTIVE)
                 && ClusterStateManager.INSTANCE.isGlobalRecoveryCompleted())) {
             int maxWaitCycles = AppContextInfo.INSTANCE.getExternalProperties().getMaxWaitClusterActive();
-            int waitCycleCount = 0;
             try {
-                while (!ClusterStateManager.INSTANCE.getState().equals(ClusterState.ACTIVE)
-                        && waitCycleCount < maxWaitCycles) {
-                    Thread.sleep(1000);
-                    waitCycleCount++;
-                }
+                ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE, maxWaitCycles, TimeUnit.SECONDS);
+            } catch (HyracksDataException e) {
+                throw new AsterixException(e);
             } catch (InterruptedException e) {
                 if (LOGGER.isLoggable(Level.WARNING)) {
                     LOGGER.warning("Thread interrupted while waiting for cluster to be " + ClusterState.ACTIVE);
                 }
+                Thread.currentThread().interrupt();
             }
             if (!ClusterStateManager.INSTANCE.getState().equals(ClusterState.ACTIVE)) {
                 throw new AsterixException("Cluster is in " + ClusterState.UNUSABLE + " state."
@@ -91,6 +91,7 @@
                 if (LOGGER.isLoggable(Level.WARNING)) {
                     LOGGER.warning("Thread interrupted while waiting for cluster to complete global recovery ");
                 }
+                Thread.currentThread().interrupt();
             }
             if (!ClusterStateManager.INSTANCE.isGlobalRecoveryCompleted()) {
                 throw new AsterixException("Cluster Global recovery is not yet complete and the system is in "
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index f3c9744..53fc7ec 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -121,11 +121,7 @@
             }
         }
         // Wait until cluster becomes active
-        synchronized (ClusterStateManager.INSTANCE) {
-            while (ClusterStateManager.INSTANCE.getState() != ClusterState.ACTIVE) {
-                ClusterStateManager.INSTANCE.wait();
-            }
-        }
+        ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE);
         hcc = new HyracksConnection(cc.getConfig().getClientListenAddress(), cc.getConfig().getClientListenPort());
         ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]);
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index cb2bf64..578c206 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -24,6 +24,7 @@
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -52,6 +53,7 @@
 import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.app.replication.FaultToleranceStrategyFactory;
 import org.apache.asterix.common.api.AsterixThreadFactory;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.ExternalProperties;
@@ -71,6 +73,7 @@
 import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
 import org.apache.asterix.runtime.job.resource.JobCapacityController;
 import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -194,7 +197,7 @@
         jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, AppContextInfo.INSTANCE);
         jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
-                ((ClusterControllerService) ccServiceCtx.getControllerService()).getExecutor());
+                ccServiceCtx.getControllerService().getExecutor());
 
         // AQL rest APIs.
         addServlet(jsonAPIServer, Servlets.AQL_QUERY);
@@ -291,13 +294,16 @@
     }
 
     private IStatementExecutorFactory getStatementExecutorFactory() {
-        return ccExtensionManager.getStatementExecutorFactory(
-                ((ClusterControllerService) ccServiceCtx.getControllerService()).getExecutor());
+        return ccExtensionManager.getStatementExecutorFactory(ccServiceCtx.getControllerService().getExecutor());
     }
 
     @Override
     public void startupCompleted() throws Exception {
-        ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
+        ccServiceCtx.getControllerService().getExecutor().submit((Callable)() -> {
+            ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE);
+            ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
+            return null;
+        });
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index bf03d54..a753db3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.cluster;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.hyracks.api.config.IOption;
@@ -83,4 +84,17 @@
      * @return A copy of the current state of the cluster partitions.
      */
     ClusterPartition[] getClusterPartitons();
+
+    /**
+     * Blocks until the cluster state becomes {@code state}
+     */
+    void waitForState(ClusterState state) throws HyracksDataException, InterruptedException;
+
+    /**
+     * Blocks until the cluster state becomes {@code state}, or timeout is exhausted.
+     * @return true if the desired state was reached before timeout occurred
+     */
+    boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit)
+            throws HyracksDataException, InterruptedException;
+
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index f65979f..6bfbf77 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -26,6 +26,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -40,11 +41,11 @@
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.control.common.controllers.NCConfig;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.hyracks.control.common.controllers.NCConfig;
 
 /**
  * A holder class for properties related to the Asterix cluster.
@@ -110,6 +111,8 @@
     public synchronized void setState(ClusterState state) {
         this.state = state;
         LOGGER.info("Cluster State is now " + state.name());
+        // Notify any waiting threads for the cluster state to change.
+        notifyAll();
     }
 
     @Override
@@ -149,27 +152,48 @@
         resetClusterPartitionConstraint();
         for (ClusterPartition p : clusterPartitions.values()) {
             if (!p.isActive()) {
-                state = ClusterState.UNUSABLE;
-                LOGGER.info("Cluster is in UNUSABLE state");
+                setState(ClusterState.UNUSABLE);
                 return;
             }
         }
 
-        state = ClusterState.PENDING;
+        setState(ClusterState.PENDING);
         LOGGER.info("Cluster is now " + state);
 
         // if all storage partitions are active as well as the metadata node, then the cluster is active
         if (metadataNodeActive) {
             AppContextInfo.INSTANCE.getMetadataBootstrap().init();
-            state = ClusterState.ACTIVE;
+            setState(ClusterState.ACTIVE);
             LOGGER.info("Cluster is now " + state);
-            // Notify any waiting threads for the cluster to be active.
             notifyAll();
             // start global recovery
             AppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
         }
     }
 
+    @Override
+    public synchronized void waitForState(ClusterState waitForState) throws HyracksDataException, InterruptedException {
+        while (state != waitForState) {
+            wait();
+        }
+    }
+
+    @Override
+    public synchronized boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit)
+            throws HyracksDataException, InterruptedException {
+        final long startMillis = System.currentTimeMillis();
+        final long endMillis = startMillis + unit.toMillis(timeout);
+        while (state != waitForState) {
+            long millisToSleep = endMillis - System.currentTimeMillis();
+            if (millisToSleep > 0) {
+                wait(millisToSleep);
+            } else {
+                return false;
+            }
+        }
+        return true;
+    }
+
     /**
      * Returns the IO devices configured for a Node Controller
      *