Wait For ClusterState ACTIVE Before Notifying ZK
Defer notifying ZooKeeper that the cluster is up until it transitions to
ACTIVE
Change-Id: Ieaaeb2876edad9cfa3f23c2cbe00e058bdc1c8cc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1678
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
index 634824c..3cdeb6f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
@@ -20,6 +20,7 @@
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -38,6 +39,7 @@
import org.apache.asterix.runtime.utils.AppContextInfo;
import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* Base class for language translators. Contains the common validation logic for language
@@ -52,17 +54,15 @@
if (!(ClusterStateManager.INSTANCE.getState().equals(ClusterState.ACTIVE)
&& ClusterStateManager.INSTANCE.isGlobalRecoveryCompleted())) {
int maxWaitCycles = AppContextInfo.INSTANCE.getExternalProperties().getMaxWaitClusterActive();
- int waitCycleCount = 0;
try {
- while (!ClusterStateManager.INSTANCE.getState().equals(ClusterState.ACTIVE)
- && waitCycleCount < maxWaitCycles) {
- Thread.sleep(1000);
- waitCycleCount++;
- }
+ ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE, maxWaitCycles, TimeUnit.SECONDS);
+ } catch (HyracksDataException e) {
+ throw new AsterixException(e);
} catch (InterruptedException e) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Thread interrupted while waiting for cluster to be " + ClusterState.ACTIVE);
}
+ Thread.currentThread().interrupt();
}
if (!ClusterStateManager.INSTANCE.getState().equals(ClusterState.ACTIVE)) {
throw new AsterixException("Cluster is in " + ClusterState.UNUSABLE + " state."
@@ -91,6 +91,7 @@
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Thread interrupted while waiting for cluster to complete global recovery ");
}
+ Thread.currentThread().interrupt();
}
if (!ClusterStateManager.INSTANCE.isGlobalRecoveryCompleted()) {
throw new AsterixException("Cluster Global recovery is not yet complete and the system is in "
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index f3c9744..53fc7ec 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -121,11 +121,7 @@
}
}
// Wait until cluster becomes active
- synchronized (ClusterStateManager.INSTANCE) {
- while (ClusterStateManager.INSTANCE.getState() != ClusterState.ACTIVE) {
- ClusterStateManager.INSTANCE.wait();
- }
- }
+ ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE);
hcc = new HyracksConnection(cc.getConfig().getClientListenAddress(), cc.getConfig().getClientListenPort());
ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index cb2bf64..578c206 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -52,6 +53,7 @@
import org.apache.asterix.app.external.ExternalLibraryUtils;
import org.apache.asterix.app.replication.FaultToleranceStrategyFactory;
import org.apache.asterix.common.api.AsterixThreadFactory;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.config.AsterixExtension;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.ExternalProperties;
@@ -71,6 +73,7 @@
import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
import org.apache.asterix.runtime.job.resource.JobCapacityController;
import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
@@ -194,7 +197,7 @@
jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, AppContextInfo.INSTANCE);
jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
- ((ClusterControllerService) ccServiceCtx.getControllerService()).getExecutor());
+ ccServiceCtx.getControllerService().getExecutor());
// AQL rest APIs.
addServlet(jsonAPIServer, Servlets.AQL_QUERY);
@@ -291,13 +294,16 @@
}
private IStatementExecutorFactory getStatementExecutorFactory() {
- return ccExtensionManager.getStatementExecutorFactory(
- ((ClusterControllerService) ccServiceCtx.getControllerService()).getExecutor());
+ return ccExtensionManager.getStatementExecutorFactory(ccServiceCtx.getControllerService().getExecutor());
}
@Override
public void startupCompleted() throws Exception {
- ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
+ ccServiceCtx.getControllerService().getExecutor().submit((Callable)() -> {
+ ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE);
+ ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
+ return null;
+ });
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index bf03d54..a753db3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.cluster;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.hyracks.api.config.IOption;
@@ -83,4 +84,17 @@
* @return A copy of the current state of the cluster partitions.
*/
ClusterPartition[] getClusterPartitons();
+
+ /**
+ * Blocks until the cluster state becomes {@code state}
+ */
+ void waitForState(ClusterState state) throws HyracksDataException, InterruptedException;
+
+ /**
+ * Blocks until the cluster state becomes {@code state}, or timeout is exhausted.
+ * @return true if the desired state was reached before timeout occurred
+ */
+ boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit)
+ throws HyracksDataException, InterruptedException;
+
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index f65979f..6bfbf77 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -40,11 +41,11 @@
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.control.common.controllers.NCConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.hyracks.control.common.controllers.NCConfig;
/**
* A holder class for properties related to the Asterix cluster.
@@ -110,6 +111,8 @@
public synchronized void setState(ClusterState state) {
this.state = state;
LOGGER.info("Cluster State is now " + state.name());
+ // Notify any waiting threads for the cluster state to change.
+ notifyAll();
}
@Override
@@ -149,27 +152,48 @@
resetClusterPartitionConstraint();
for (ClusterPartition p : clusterPartitions.values()) {
if (!p.isActive()) {
- state = ClusterState.UNUSABLE;
- LOGGER.info("Cluster is in UNUSABLE state");
+ setState(ClusterState.UNUSABLE);
return;
}
}
- state = ClusterState.PENDING;
+ setState(ClusterState.PENDING);
LOGGER.info("Cluster is now " + state);
// if all storage partitions are active as well as the metadata node, then the cluster is active
if (metadataNodeActive) {
AppContextInfo.INSTANCE.getMetadataBootstrap().init();
- state = ClusterState.ACTIVE;
+ setState(ClusterState.ACTIVE);
LOGGER.info("Cluster is now " + state);
- // Notify any waiting threads for the cluster to be active.
notifyAll();
// start global recovery
AppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
}
}
+ @Override
+ public synchronized void waitForState(ClusterState waitForState) throws HyracksDataException, InterruptedException {
+ while (state != waitForState) {
+ wait();
+ }
+ }
+
+ @Override
+ public synchronized boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit)
+ throws HyracksDataException, InterruptedException {
+ final long startMillis = System.currentTimeMillis();
+ final long endMillis = startMillis + unit.toMillis(timeout);
+ while (state != waitForState) {
+ long millisToSleep = endMillis - System.currentTimeMillis();
+ if (millisToSleep > 0) {
+ wait(millisToSleep);
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* Returns the IO devices configured for a Node Controller
*