This change fixes the sporadic connection refused errors
This can mostly be seen during asterix-installer tests.
The fix basically lets managix wait and listens
through zookeeper for a signal from the cluster controller that
the cluster is up and ready.
Once the cluster controller sends the signal, Managix can proceed.
Change-Id: Ib730f50ab2fb492f3cf973d1cf2f03b34e24e5b3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/366
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 11ea546..5aae42a 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -24,8 +24,6 @@
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
-import org.eclipse.jetty.util.component.AbstractLifeCycle;
-
import org.apache.asterix.api.http.servlet.APIServlet;
import org.apache.asterix.api.http.servlet.AQLAPIServlet;
import org.apache.asterix.api.http.servlet.ConnectorAPIServlet;
@@ -39,9 +37,11 @@
import org.apache.asterix.api.http.servlet.VersionAPIServlet;
import org.apache.asterix.common.config.AsterixBuildProperties;
import org.apache.asterix.common.api.AsterixThreadFactory;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.config.AsterixExternalProperties;
import org.apache.asterix.common.config.AsterixMetadataProperties;
import org.apache.asterix.common.feeds.api.ICentralFeedManager;
+import org.apache.asterix.event.service.ILookupService;
import org.apache.asterix.feeds.CentralFeedManager;
import org.apache.asterix.feeds.FeedLifecycleListener;
import org.apache.asterix.metadata.MetadataManager;
@@ -49,6 +49,7 @@
import org.apache.asterix.metadata.bootstrap.AsterixStateProxy;
import org.apache.asterix.metadata.cluster.ClusterManager;
import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
import org.apache.hyracks.api.client.HyracksConnection;
@@ -56,8 +57,8 @@
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
- private static final Logger LOGGER = Logger.getLogger(CCApplicationEntryPoint.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(CCApplicationEntryPoint.class.getName());
private static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION";
private static final String ASTERIX_BUILD_PROP_ATTR = "org.apache.asterix.PROPS";
@@ -99,10 +100,6 @@
setupFeedServer(externalProperties);
feedServer.start();
- waitUntilServerStart(webServer);
- waitUntilServerStart(jsonAPIServer);
- waitUntilServerStart(feedServer);
-
ExternalLibraryBootstrap.setUpExternaLibraries(false);
centralFeedManager = CentralFeedManager.getInstance();
centralFeedManager.start();
@@ -114,25 +111,20 @@
ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
}
- private void waitUntilServerStart(AbstractLifeCycle webServer) throws Exception {
- while (!webServer.isStarted()) {
- if (webServer.isFailed()) {
- throw new Exception("Server failed to start");
- }
- wait(1000);
- }
- }
-
@Override
public void stop() throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Stopping Asterix cluster controller");
}
AsterixStateProxy.unregisterRemoteObject();
-
+ // Stop servers
webServer.stop();
jsonAPIServer.stop();
feedServer.stop();
+ // Make sure servers are stopped before proceeding
+ webServer.join();
+ jsonAPIServer.join();
+ feedServer.join();
}
private IHyracksClientConnection getNewHyracksClientConnection() throws Exception {
@@ -165,8 +157,6 @@
context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
context.setAttribute(ASTERIX_BUILD_PROP_ATTR, AsterixAppContextInfo.getInstance());
-
-
jsonAPIServer.setHandler(context);
context.addServlet(new ServletHolder(new QueryAPIServlet()), "/query");
context.addServlet(new ServletHolder(new QueryStatusAPIServlet()), "/query/status");
@@ -191,6 +181,16 @@
feedServer.setHandler(context);
context.addServlet(new ServletHolder(new FeedServlet()), "/");
- // add paths here
}
-}
+
+ @Override
+ public void startupCompleted() throws Exception {
+ // Notify Zookeeper that the startup is complete
+ ILookupService zookeeperService = ClusterManager.getLookupService();
+ if (zookeeperService != null) {
+ // Our asterix app runtimes tests don't use zookeeper
+ zookeeperService.reportClusterState(AsterixClusterProperties.INSTANCE.getCluster().getInstanceName(),
+ ClusterState.ACTIVE);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index 07d4b21..75f1f82 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -26,6 +26,7 @@
}
public enum ClusterState {
+ STARTING,
ACTIVE,
UNUSABLE
}
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/model/AsterixInstance.java b/asterix-events/src/main/java/org/apache/asterix/event/model/AsterixInstance.java
index f6cfdcd..e6c6986 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/model/AsterixInstance.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/model/AsterixInstance.java
@@ -23,7 +23,6 @@
import java.util.Date;
import java.util.List;
-import org.apache.asterix.common.config.AsterixExternalProperties;
import org.apache.asterix.common.configuration.AsterixConfiguration;
import org.apache.asterix.common.configuration.Property;
import org.apache.asterix.event.schema.cluster.Cluster;
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
index 06b0f9a..33ba787 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
@@ -30,13 +30,9 @@
import java.io.InputStream;
import java.io.StringWriter;
import java.net.InetAddress;
-import java.net.URL;
-import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
-import java.util.Map;
-import java.util.Properties;
import java.util.Random;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
@@ -50,7 +46,6 @@
import javax.xml.bind.Marshaller;
import org.apache.commons.io.IOUtils;
-
import org.apache.asterix.common.configuration.AsterixConfiguration;
import org.apache.asterix.common.configuration.Coredump;
import org.apache.asterix.common.configuration.Store;
@@ -179,26 +174,6 @@
new File(asterixInstanceDir + File.separator + ASTERIX_CONFIGURATION_FILE).delete();
}
- private static void injectAsterixLogPropertyFile(String asterixInstanceDir, AsterixInstance asterixInstance)
- throws IOException, EventException {
- final String asterixJarPath = asterixJarPath(asterixInstance, asterixInstanceDir);
- File sourceJar1 = new File(asterixJarPath);
- Properties txnLogProperties = new Properties();
- URLClassLoader urlClassLoader = new URLClassLoader(new URL[] { sourceJar1.toURI().toURL() });
- InputStream in = urlClassLoader.getResourceAsStream(TXN_LOG_CONFIGURATION_FILE);
- if (in != null) {
- txnLogProperties.load(in);
- }
-
- writeAsterixLogConfigurationFile(asterixInstance, txnLogProperties);
-
- File sourceJar2 = new File(asterixJarPath);
- File replacementFile = new File(asterixInstanceDir + File.separator + "log.properties");
- replaceInJar(sourceJar2, TXN_LOG_CONFIGURATION_FILE, replacementFile);
-
- new File(asterixInstanceDir + File.separator + "log.properties").delete();
- }
-
private static void injectAsterixClusterConfigurationFile(String asterixInstanceDir, AsterixInstance asterixInstance)
throws IOException, EventException, JAXBException {
File sourceJar = new File(asterixJarPath(asterixInstance, asterixInstanceDir));
@@ -325,35 +300,7 @@
os.close();
}
- private static void writeAsterixLogConfigurationFile(AsterixInstance asterixInstance, Properties logProperties)
- throws IOException, EventException {
- String asterixInstanceName = asterixInstance.getName();
- Cluster cluster = asterixInstance.getCluster();
- StringBuffer conf = new StringBuffer();
- for (Map.Entry<Object, Object> p : logProperties.entrySet()) {
- conf.append(p.getKey() + "=" + p.getValue() + "\n");
- }
- for (Node node : cluster.getNode()) {
- String txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir();
- if (txnLogDir == null) {
- throw new EventException("Transaction log directory (txn_log_dir) not configured for node: "
- + node.getId());
- }
- conf.append(asterixInstanceName + "_" + node.getId() + "." + TXN_LOG_DIR_KEY_SUFFIX + "=" + txnLogDir
- + "\n");
- }
- List<org.apache.asterix.common.configuration.Property> properties = asterixInstance.getAsterixConfiguration()
- .getProperty();
- for (org.apache.asterix.common.configuration.Property p : properties) {
- if (p.getName().trim().toLowerCase().contains("log")) {
- conf.append(p.getValue() + "=" + p.getValue());
- }
- }
- dumpToFile(AsterixEventService.getAsterixDir() + File.separator + asterixInstanceName + File.separator
- + "log.properties", conf.toString());
-
- }
public static void unzip(String sourceFile, String destDir) throws IOException {
BufferedOutputStream dest = null;
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java b/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java
new file mode 100644
index 0000000..fdd84af
--- /dev/null
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.event.service;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+//A zookeeper watcher that watches the change in the state of the cluster
+public class ClusterStateWatcher implements Watcher {
+ private static Integer mutex;
+ private static ZooKeeper zk;
+ private String clusterStatePath;
+ private boolean done = false;
+ private ClusterState clusterState = ClusterState.STARTING;
+ private boolean failed = false;
+ private Exception failureCause = null;
+ private static Logger LOGGER = Logger.getLogger(ClusterStateWatcher.class.getName());
+
+ public ClusterStateWatcher(ZooKeeper zk, String clusterName) {
+ if (mutex == null) {
+ mutex = new Integer(-1);
+ }
+ this.clusterStatePath = ZooKeeperService.ASTERIX_INSTANCE_BASE_PATH + File.separator + clusterName
+ + ZooKeeperService.ASTERIX_INSTANCE_STATE_PATH;
+ ClusterStateWatcher.zk = zk;
+ }
+
+ public ClusterState waitForClusterStart() throws Exception {
+ while (true) {
+ synchronized (mutex) {
+ if (done) {
+ if (failed) {
+ LOGGER.error("An error took place in the startup sequence. Check the CC logs.");
+ throw failureCause;
+ } else {
+ return clusterState;
+ }
+ } else {
+ mutex.wait();
+ }
+ }
+ }
+ }
+
+ private void monitorStateChange() {
+ try {
+ while (true) {
+ synchronized (mutex) {
+ // Get the cluster state
+ List<String> list = zk.getChildren(clusterStatePath, this);
+ if (list.size() == 0) {
+ // Cluster state not found, wait to be awaken by Zookeeper
+ mutex.wait();
+ } else {
+ // Cluster state found
+ byte[] b = zk.getData(clusterStatePath + ZooKeeperService.ASTERIX_INSTANCE_STATE_REPORT, false,
+ null);
+ zk.delete(clusterStatePath + ZooKeeperService.ASTERIX_INSTANCE_STATE_REPORT, 0);
+ clusterState = ClusterState.values()[(int) b[0]];
+ done = true;
+ mutex.notifyAll();
+ return;
+ }
+ }
+ }
+ } catch (Exception e) {
+ // Exception was thrown, let Managix know that a failure took place
+ failed = true;
+ done = true;
+ failureCause = e;
+ }
+ }
+
+ public void startMonitoringThread() {
+ Runnable monitoringThread = new Runnable() {
+ @Override
+ public void run() {
+ monitorStateChange();
+ }
+ };
+ // Start the monitoring thread
+ (new Thread(monitoringThread)).start();
+ }
+
+ @Override
+ synchronized public void process(WatchedEvent event) {
+ synchronized (mutex) {
+ mutex.notifyAll();
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/ILookupService.java b/asterix-events/src/main/java/org/apache/asterix/event/service/ILookupService.java
index 78d6e0f..34252e3 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/ILookupService.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/ILookupService.java
@@ -20,6 +20,7 @@
import java.util.List;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.event.model.AsterixInstance;
import org.apache.asterix.installer.schema.conf.Configuration;
@@ -42,4 +43,8 @@
public List<AsterixInstance> getAsterixInstances() throws Exception;
public void updateAsterixInstance(AsterixInstance updatedInstance) throws Exception;
+
+ public void reportClusterState(String instanceName, ClusterState active) throws Exception;
+
+ public ClusterStateWatcher startWatchingClusterState(String asterixInstanceName);
}
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java b/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java
index ea9056c..96fb6ec 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java
@@ -34,10 +34,11 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
-
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.event.error.EventException;
import org.apache.asterix.event.model.AsterixInstance;
import org.apache.asterix.installer.schema.conf.Configuration;
@@ -55,8 +56,10 @@
private boolean isRunning = false;
private ZooKeeper zk;
private String zkConnectionString;
- private static final String ASTERIX_INSTANCE_BASE_PATH = "/Asterix";
- private static final int DEFAULT_NODE_VERSION = -1;
+ public static final String ASTERIX_INSTANCE_BASE_PATH = File.separator + "Asterix";
+ public static final String ASTERIX_INSTANCE_STATE_PATH = File.separator + "state";
+ public static final String ASTERIX_INSTANCE_STATE_REPORT = File.separator + "clusterState";
+ public static final int DEFAULT_NODE_VERSION = -1;
private LinkedBlockingQueue<String> msgQ = new LinkedBlockingQueue<String>();
private ZooKeeperWatcher watcher = new ZooKeeperWatcher(msgQ);
@@ -144,6 +147,9 @@
ObjectOutputStream o = new ObjectOutputStream(b);
o.writeObject(asterixInstance);
zk.create(instanceBasePath, b.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ // Create a place to put the state of the cluster in
+ String instanceStatePath = instanceBasePath + ASTERIX_INSTANCE_STATE_PATH;
+ zk.create(instanceStatePath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
private void createRootIfNotExist() throws Exception {
@@ -153,6 +159,9 @@
zk.create(ASTERIX_INSTANCE_BASE_PATH, "root".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
+ // Is this the right way to handle the exception (try again? forever?)
+ LOGGER.error("An error took place when creating the root in Zookeeper");
+ e.printStackTrace();
createRootIfNotExist();
}
}
@@ -167,14 +176,22 @@
return readAsterixInstanceObject(asterixInstanceBytes);
}
- public boolean exists(String asterixInstanceName) throws Exception {
- return zk.exists(ASTERIX_INSTANCE_BASE_PATH + File.separator + asterixInstanceName, false) != null;
+ public boolean exists(String path) throws Exception {
+ return zk.exists(ASTERIX_INSTANCE_BASE_PATH + File.separator + path, false) != null;
}
public void removeAsterixInstance(String name) throws Exception {
if (!exists(name)) {
throw new EventException("Asterix instance by name " + name + " does not exists.");
}
+ if (exists(name + ASTERIX_INSTANCE_STATE_PATH)) {
+ if (exists(name + ASTERIX_INSTANCE_STATE_PATH + File.separator + "clusterState")) {
+ zk.delete(ASTERIX_INSTANCE_BASE_PATH + File.separator + name + ASTERIX_INSTANCE_STATE_PATH
+ + ASTERIX_INSTANCE_STATE_REPORT, DEFAULT_NODE_VERSION);
+ }
+ zk.delete(ASTERIX_INSTANCE_BASE_PATH + File.separator + name + ASTERIX_INSTANCE_STATE_PATH,
+ DEFAULT_NODE_VERSION);
+ }
zk.delete(ASTERIX_INSTANCE_BASE_PATH + File.separator + name, DEFAULT_NODE_VERSION);
}
@@ -202,6 +219,25 @@
writeAsterixInstance(updatedInstance);
}
+ @Override
+ public ClusterStateWatcher startWatchingClusterState(String instanceName) {
+ ClusterStateWatcher watcher = new ClusterStateWatcher(zk, instanceName);
+ watcher.startMonitoringThread();
+ return watcher;
+ }
+
+ @Override
+ public void reportClusterState(String instanceName, ClusterState state) throws Exception {
+ String clusterStatePath = ZooKeeperService.ASTERIX_INSTANCE_BASE_PATH + File.separator + instanceName
+ + ASTERIX_INSTANCE_STATE_PATH;
+ Integer value = state.ordinal();
+ byte[] stateValue = new byte[] { value.byteValue() };
+ // Create a place to put the state of the cluster in
+ zk.create(clusterStatePath + ASTERIX_INSTANCE_STATE_REPORT, stateValue, Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ return;
+ }
+
}
class ZooKeeperWatcher implements Watcher {
@@ -214,10 +250,8 @@
}
public void process(WatchedEvent wEvent) {
- switch (wEvent.getState()) {
- case SyncConnected:
- msgQ.add("connected");
- break;
+ if (wEvent.getState() == KeeperState.SyncConnected) {
+ msgQ.add("connected");
}
}
diff --git a/asterix-events/src/main/resources/events/cc_start/cc_start.sh b/asterix-events/src/main/resources/events/cc_start/cc_start.sh
index b667f16..24d3432 100644
--- a/asterix-events/src/main/resources/events/cc_start/cc_start.sh
+++ b/asterix-events/src/main/resources/events/cc_start/cc_start.sh
@@ -22,4 +22,4 @@
mkdir -p $LOG_DIR
fi
cd $WORKING_DIR
-$ASTERIX_HOME/bin/asterixcc -client-net-ip-address $CLIENT_NET_IP -client-net-port $CLIENT_NET_PORT -cluster-net-ip-address $CLUSTER_NET_IP -cluster-net-port $CLUSTER_NET_PORT -http-port $HTTP_PORT &> $LOG_DIR/cc.log
+$ASTERIX_HOME/bin/asterixcc -client-net-ip-address $CLIENT_NET_IP -client-net-port $CLIENT_NET_PORT -cluster-net-ip-address $CLUSTER_NET_IP -cluster-net-port $CLUSTER_NET_PORT -http-port $HTTP_PORT &> $LOG_DIR/cc.log
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/CreateCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/CreateCommand.java
index 10fe766..6db0648 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/CreateCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/CreateCommand.java
@@ -21,7 +21,7 @@
import java.io.File;
import org.kohsuke.args4j.Option;
-
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.configuration.AsterixConfiguration;
import org.apache.asterix.event.error.VerificationUtil;
import org.apache.asterix.event.management.AsterixEventServiceClient;
@@ -32,6 +32,7 @@
import org.apache.asterix.event.schema.pattern.Patterns;
import org.apache.asterix.event.service.AsterixEventService;
import org.apache.asterix.event.service.AsterixEventServiceUtil;
+import org.apache.asterix.event.service.ClusterStateWatcher;
import org.apache.asterix.event.service.ServiceProvider;
import org.apache.asterix.event.util.PatternCreator;
import org.apache.asterix.installer.driver.InstallerDriver;
@@ -64,17 +65,25 @@
AsterixEventServiceUtil.createClusterProperties(cluster, asterixConfiguration);
AsterixEventServiceClient eventrixClient = AsterixEventService.getAsterixEventServiceClient(cluster, true,
false);
-
+ // Store the cluster initially in Zookeeper and start watching
+ ServiceProvider.INSTANCE.getLookupService().writeAsterixInstance(asterixInstance);
+ ClusterStateWatcher stateWatcher = ServiceProvider.INSTANCE.getLookupService().startWatchingClusterState(
+ asterixInstanceName);
Patterns asterixBinarytrasnferPattern = PatternCreator.INSTANCE.getAsterixBinaryTransferPattern(
asterixInstanceName, cluster);
eventrixClient.submit(asterixBinarytrasnferPattern);
-
Patterns patterns = PatternCreator.INSTANCE.getStartAsterixPattern(asterixInstanceName, cluster, true);
eventrixClient.submit(patterns);
+ // Check the cluster state
+ ClusterState clusterState = stateWatcher.waitForClusterStart();
+ if (clusterState != ClusterState.ACTIVE) {
+ throw new Exception("CC failed to start");
+ }
+
AsterixRuntimeState runtimeState = VerificationUtil.getAsterixRuntimeState(asterixInstance);
VerificationUtil.updateInstanceWithRuntimeDescription(asterixInstance, runtimeState, true);
- ServiceProvider.INSTANCE.getLookupService().writeAsterixInstance(asterixInstance);
+ ServiceProvider.INSTANCE.getLookupService().updateAsterixInstance(asterixInstance);
AsterixEventServiceUtil.deleteDirectory(InstallerDriver.getManagixHome() + File.separator
+ InstallerDriver.ASTERIX_DIR + File.separator + asterixInstanceName);
LOGGER.info(asterixInstance.getDescription(false));
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/StartCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/StartCommand.java
index 5c53e42..551bc11 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/StartCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/StartCommand.java
@@ -21,7 +21,7 @@
import java.io.File;
import org.kohsuke.args4j.Option;
-
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.event.error.VerificationUtil;
import org.apache.asterix.event.management.AsterixEventServiceClient;
import org.apache.asterix.event.model.AsterixInstance;
@@ -30,6 +30,7 @@
import org.apache.asterix.event.schema.pattern.Patterns;
import org.apache.asterix.event.service.AsterixEventService;
import org.apache.asterix.event.service.AsterixEventServiceUtil;
+import org.apache.asterix.event.service.ClusterStateWatcher;
import org.apache.asterix.event.service.ServiceProvider;
import org.apache.asterix.event.util.PatternCreator;
import org.apache.asterix.installer.driver.InstallerDriver;
@@ -47,9 +48,18 @@
Patterns asterixBinaryTransferPattern = PatternCreator.INSTANCE.getAsterixBinaryTransferPattern(
asterixInstanceName, instance.getCluster());
client.submit(asterixBinaryTransferPattern);
+ // Start the watcher
+ ClusterStateWatcher stateWatcher = ServiceProvider.INSTANCE.getLookupService().startWatchingClusterState(
+ asterixInstanceName);
AsterixEventServiceUtil.createClusterProperties(instance.getCluster(), instance.getAsterixConfiguration());
- Patterns patterns = PatternCreator.INSTANCE.getStartAsterixPattern(asterixInstanceName, instance.getCluster(), false);
+ Patterns patterns = PatternCreator.INSTANCE.getStartAsterixPattern(asterixInstanceName, instance.getCluster(),
+ false);
client.submit(patterns);
+ // Check the cluster state
+ ClusterState clusterState = stateWatcher.waitForClusterStart();
+ if (clusterState != ClusterState.ACTIVE) {
+ throw new Exception("CC failed to start");
+ }
AsterixEventServiceUtil.deleteDirectory(InstallerDriver.getManagixHome() + File.separator
+ InstallerDriver.ASTERIX_DIR + File.separator + asterixInstanceName);
AsterixRuntimeState runtimeState = VerificationUtil.getAsterixRuntimeState(instance);
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
index 3b19372..52ee4b4 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
@@ -169,6 +169,10 @@
case INACTIVE:
command = "start -n " + ASTERIX_INSTANCE_NAME;
break;
+ case UNUSABLE:
+ command = "delete -n " + ASTERIX_INSTANCE_NAME;
+ cmdHandler.processCommand(command.split(" "));
+ throw new Exception("Cluster state was Unusable");
}
cmdHandler.processCommand(command.split(" "));
}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
index 2d2c612..3e37694 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
@@ -54,8 +54,6 @@
public static ClusterManager INSTANCE = new ClusterManager();
- private static String eventsDir = System.getenv("user.dir") + File.separator + "eventrix";
-
private static AsterixEventServiceClient client;
private static ILookupService lookupService;
@@ -117,7 +115,8 @@
String hostId = node.getId();
String nodeControllerId = asterixInstanceName + "_" + node.getId();
String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
- Pattern startNC = PatternCreator.INSTANCE.createNCStartPattern(ccHost, hostId, nodeControllerId, iodevices, false);
+ Pattern startNC = PatternCreator.INSTANCE.createNCStartPattern(ccHost, hostId, nodeControllerId, iodevices,
+ false);
pattern.add(startNC);
Patterns startNCPattern = new Patterns(pattern);
client.submit(startNCPattern);
@@ -170,4 +169,8 @@
public Set<IClusterEventsSubscriber> getRegisteredClusterEventSubscribers() {
return eventSubscribers;
}
+
+ public static ILookupService getLookupService() {
+ return lookupService;
+ }
}