This change fixes the sporadic connection refused errors 

This can mostly be seen during asterix-installer tests.
The fix basically lets managix wait and listens
through zookeeper for a signal from the cluster controller that
the cluster is up and ready.
Once the cluster controller sends the signal, Managix can proceed.

Change-Id: Ib730f50ab2fb492f3cf973d1cf2f03b34e24e5b3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/366
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 11ea546..5aae42a 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -24,8 +24,6 @@
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
-import org.eclipse.jetty.util.component.AbstractLifeCycle;
-
 import org.apache.asterix.api.http.servlet.APIServlet;
 import org.apache.asterix.api.http.servlet.AQLAPIServlet;
 import org.apache.asterix.api.http.servlet.ConnectorAPIServlet;
@@ -39,9 +37,11 @@
 import org.apache.asterix.api.http.servlet.VersionAPIServlet;
 import org.apache.asterix.common.config.AsterixBuildProperties;
 import org.apache.asterix.common.api.AsterixThreadFactory;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.feeds.api.ICentralFeedManager;
+import org.apache.asterix.event.service.ILookupService;
 import org.apache.asterix.feeds.CentralFeedManager;
 import org.apache.asterix.feeds.FeedLifecycleListener;
 import org.apache.asterix.metadata.MetadataManager;
@@ -49,6 +49,7 @@
 import org.apache.asterix.metadata.bootstrap.AsterixStateProxy;
 import org.apache.asterix.metadata.cluster.ClusterManager;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.HyracksConnection;
@@ -56,8 +57,8 @@
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 
 public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
-    private static final Logger LOGGER = Logger.getLogger(CCApplicationEntryPoint.class.getName());
 
+    private static final Logger LOGGER = Logger.getLogger(CCApplicationEntryPoint.class.getName());
     private static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION";
     private static final String ASTERIX_BUILD_PROP_ATTR = "org.apache.asterix.PROPS";
 
@@ -99,10 +100,6 @@
         setupFeedServer(externalProperties);
         feedServer.start();
 
-        waitUntilServerStart(webServer);
-        waitUntilServerStart(jsonAPIServer);
-        waitUntilServerStart(feedServer);
-
         ExternalLibraryBootstrap.setUpExternaLibraries(false);
         centralFeedManager = CentralFeedManager.getInstance();
         centralFeedManager.start();
@@ -114,25 +111,20 @@
         ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE);
     }
 
-    private void waitUntilServerStart(AbstractLifeCycle webServer) throws Exception {
-        while (!webServer.isStarted()) {
-            if (webServer.isFailed()) {
-                throw new Exception("Server failed to start");
-            }
-            wait(1000);
-        }
-    }
-
     @Override
     public void stop() throws Exception {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Stopping Asterix cluster controller");
         }
         AsterixStateProxy.unregisterRemoteObject();
-
+        // Stop servers
         webServer.stop();
         jsonAPIServer.stop();
         feedServer.stop();
+        // Make sure servers are stopped before proceeding
+        webServer.join();
+        jsonAPIServer.join();
+        feedServer.join();
     }
 
     private IHyracksClientConnection getNewHyracksClientConnection() throws Exception {
@@ -165,8 +157,6 @@
         context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         context.setAttribute(ASTERIX_BUILD_PROP_ATTR, AsterixAppContextInfo.getInstance());
 
-
-
         jsonAPIServer.setHandler(context);
         context.addServlet(new ServletHolder(new QueryAPIServlet()), "/query");
         context.addServlet(new ServletHolder(new QueryStatusAPIServlet()), "/query/status");
@@ -191,6 +181,16 @@
         feedServer.setHandler(context);
         context.addServlet(new ServletHolder(new FeedServlet()), "/");
 
-        // add paths here
     }
-}
+
+    @Override
+    public void startupCompleted() throws Exception {
+        // Notify Zookeeper that the startup is complete
+        ILookupService zookeeperService = ClusterManager.getLookupService();
+        if (zookeeperService != null) {
+            // Our asterix app runtimes tests don't use zookeeper
+            zookeeperService.reportClusterState(AsterixClusterProperties.INSTANCE.getCluster().getInstanceName(),
+                    ClusterState.ACTIVE);
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index 07d4b21..75f1f82 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -26,6 +26,7 @@
     }
 
     public enum ClusterState {
+        STARTING,
         ACTIVE,
         UNUSABLE
     }
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/model/AsterixInstance.java b/asterix-events/src/main/java/org/apache/asterix/event/model/AsterixInstance.java
index f6cfdcd..e6c6986 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/model/AsterixInstance.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/model/AsterixInstance.java
@@ -23,7 +23,6 @@
 import java.util.Date;
 import java.util.List;
 
-import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.common.configuration.AsterixConfiguration;
 import org.apache.asterix.common.configuration.Property;
 import org.apache.asterix.event.schema.cluster.Cluster;
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
index 06b0f9a..33ba787 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
@@ -30,13 +30,9 @@
 import java.io.InputStream;
 import java.io.StringWriter;
 import java.net.InetAddress;
-import java.net.URL;
-import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
-import java.util.Map;
-import java.util.Properties;
 import java.util.Random;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
@@ -50,7 +46,6 @@
 import javax.xml.bind.Marshaller;
 
 import org.apache.commons.io.IOUtils;
-
 import org.apache.asterix.common.configuration.AsterixConfiguration;
 import org.apache.asterix.common.configuration.Coredump;
 import org.apache.asterix.common.configuration.Store;
@@ -179,26 +174,6 @@
         new File(asterixInstanceDir + File.separator + ASTERIX_CONFIGURATION_FILE).delete();
     }
 
-    private static void injectAsterixLogPropertyFile(String asterixInstanceDir, AsterixInstance asterixInstance)
-            throws IOException, EventException {
-        final String asterixJarPath = asterixJarPath(asterixInstance, asterixInstanceDir);
-        File sourceJar1 = new File(asterixJarPath);
-        Properties txnLogProperties = new Properties();
-        URLClassLoader urlClassLoader = new URLClassLoader(new URL[] { sourceJar1.toURI().toURL() });
-        InputStream in = urlClassLoader.getResourceAsStream(TXN_LOG_CONFIGURATION_FILE);
-        if (in != null) {
-            txnLogProperties.load(in);
-        }
-
-        writeAsterixLogConfigurationFile(asterixInstance, txnLogProperties);
-
-        File sourceJar2 = new File(asterixJarPath);
-        File replacementFile = new File(asterixInstanceDir + File.separator + "log.properties");
-        replaceInJar(sourceJar2, TXN_LOG_CONFIGURATION_FILE, replacementFile);
-
-        new File(asterixInstanceDir + File.separator + "log.properties").delete();
-    }
-
     private static void injectAsterixClusterConfigurationFile(String asterixInstanceDir, AsterixInstance asterixInstance)
             throws IOException, EventException, JAXBException {
         File sourceJar = new File(asterixJarPath(asterixInstance, asterixInstanceDir));
@@ -325,35 +300,7 @@
         os.close();
     }
 
-    private static void writeAsterixLogConfigurationFile(AsterixInstance asterixInstance, Properties logProperties)
-            throws IOException, EventException {
-        String asterixInstanceName = asterixInstance.getName();
-        Cluster cluster = asterixInstance.getCluster();
-        StringBuffer conf = new StringBuffer();
-        for (Map.Entry<Object, Object> p : logProperties.entrySet()) {
-            conf.append(p.getKey() + "=" + p.getValue() + "\n");
-        }
 
-        for (Node node : cluster.getNode()) {
-            String txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir();
-            if (txnLogDir == null) {
-                throw new EventException("Transaction log directory (txn_log_dir) not configured for node: "
-                        + node.getId());
-            }
-            conf.append(asterixInstanceName + "_" + node.getId() + "." + TXN_LOG_DIR_KEY_SUFFIX + "=" + txnLogDir
-                    + "\n");
-        }
-        List<org.apache.asterix.common.configuration.Property> properties = asterixInstance.getAsterixConfiguration()
-                .getProperty();
-        for (org.apache.asterix.common.configuration.Property p : properties) {
-            if (p.getName().trim().toLowerCase().contains("log")) {
-                conf.append(p.getValue() + "=" + p.getValue());
-            }
-        }
-        dumpToFile(AsterixEventService.getAsterixDir() + File.separator + asterixInstanceName + File.separator
-                + "log.properties", conf.toString());
-
-    }
 
     public static void unzip(String sourceFile, String destDir) throws IOException {
         BufferedOutputStream dest = null;
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java b/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java
new file mode 100644
index 0000000..fdd84af
--- /dev/null
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.asterix.event.service;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+//A zookeeper watcher that watches the change in the state of the cluster
+public class ClusterStateWatcher implements Watcher {
+    private static Integer mutex;
+    private static ZooKeeper zk;
+    private String clusterStatePath;
+    private boolean done = false;
+    private ClusterState clusterState = ClusterState.STARTING;
+    private boolean failed = false;
+    private Exception failureCause = null;
+    private static Logger LOGGER = Logger.getLogger(ClusterStateWatcher.class.getName());
+
+    public ClusterStateWatcher(ZooKeeper zk, String clusterName) {
+        if (mutex == null) {
+            mutex = new Integer(-1);
+        }
+        this.clusterStatePath = ZooKeeperService.ASTERIX_INSTANCE_BASE_PATH + File.separator + clusterName
+                + ZooKeeperService.ASTERIX_INSTANCE_STATE_PATH;
+        ClusterStateWatcher.zk = zk;
+    }
+
+    public ClusterState waitForClusterStart() throws Exception {
+        while (true) {
+            synchronized (mutex) {
+                if (done) {
+                    if (failed) {
+                        LOGGER.error("An error took place in the startup sequence. Check the CC logs.");
+                        throw failureCause;
+                    } else {
+                        return clusterState;
+                    }
+                } else {
+                    mutex.wait();
+                }
+            }
+        }
+    }
+
+    private void monitorStateChange() {
+        try {
+            while (true) {
+                synchronized (mutex) {
+                    // Get the cluster state
+                    List<String> list = zk.getChildren(clusterStatePath, this);
+                    if (list.size() == 0) {
+                        // Cluster state not found, wait to be awaken by Zookeeper
+                        mutex.wait();
+                    } else {
+                        // Cluster state found
+                        byte[] b = zk.getData(clusterStatePath + ZooKeeperService.ASTERIX_INSTANCE_STATE_REPORT, false,
+                                null);
+                        zk.delete(clusterStatePath + ZooKeeperService.ASTERIX_INSTANCE_STATE_REPORT, 0);
+                        clusterState = ClusterState.values()[(int) b[0]];
+                        done = true;
+                        mutex.notifyAll();
+                        return;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            // Exception was thrown, let Managix know that a failure took place
+            failed = true;
+            done = true;
+            failureCause = e;
+        }
+    }
+
+    public void startMonitoringThread() {
+        Runnable monitoringThread = new Runnable() {
+            @Override
+            public void run() {
+                monitorStateChange();
+            }
+        };
+        // Start the monitoring thread
+        (new Thread(monitoringThread)).start();
+    }
+
+    @Override
+    synchronized public void process(WatchedEvent event) {
+        synchronized (mutex) {
+            mutex.notifyAll();
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/ILookupService.java b/asterix-events/src/main/java/org/apache/asterix/event/service/ILookupService.java
index 78d6e0f..34252e3 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/ILookupService.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/ILookupService.java
@@ -20,6 +20,7 @@
 
 import java.util.List;
 
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.event.model.AsterixInstance;
 import org.apache.asterix.installer.schema.conf.Configuration;
 
@@ -42,4 +43,8 @@
     public List<AsterixInstance> getAsterixInstances() throws Exception;
 
     public void updateAsterixInstance(AsterixInstance updatedInstance) throws Exception;
+
+    public void reportClusterState(String instanceName, ClusterState active) throws Exception;
+
+    public ClusterStateWatcher startWatchingClusterState(String asterixInstanceName);
 }
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java b/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java
index ea9056c..96fb6ec 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/ZooKeeperService.java
@@ -34,10 +34,11 @@
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
-
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.event.error.EventException;
 import org.apache.asterix.event.model.AsterixInstance;
 import org.apache.asterix.installer.schema.conf.Configuration;
@@ -55,8 +56,10 @@
     private boolean isRunning = false;
     private ZooKeeper zk;
     private String zkConnectionString;
-    private static final String ASTERIX_INSTANCE_BASE_PATH = "/Asterix";
-    private static final int DEFAULT_NODE_VERSION = -1;
+    public static final String ASTERIX_INSTANCE_BASE_PATH = File.separator + "Asterix";
+    public static final String ASTERIX_INSTANCE_STATE_PATH = File.separator + "state";
+    public static final String ASTERIX_INSTANCE_STATE_REPORT = File.separator + "clusterState";
+    public static final int DEFAULT_NODE_VERSION = -1;
     private LinkedBlockingQueue<String> msgQ = new LinkedBlockingQueue<String>();
     private ZooKeeperWatcher watcher = new ZooKeeperWatcher(msgQ);
 
@@ -144,6 +147,9 @@
         ObjectOutputStream o = new ObjectOutputStream(b);
         o.writeObject(asterixInstance);
         zk.create(instanceBasePath, b.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        // Create a place to put the state of the cluster in
+        String instanceStatePath = instanceBasePath + ASTERIX_INSTANCE_STATE_PATH;
+        zk.create(instanceStatePath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
     private void createRootIfNotExist() throws Exception {
@@ -153,6 +159,9 @@
                 zk.create(ASTERIX_INSTANCE_BASE_PATH, "root".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             }
         } catch (Exception e) {
+            // Is this the right way to handle the exception (try again? forever?)
+            LOGGER.error("An error took place when creating the root in Zookeeper");
+            e.printStackTrace();
             createRootIfNotExist();
         }
     }
@@ -167,14 +176,22 @@
         return readAsterixInstanceObject(asterixInstanceBytes);
     }
 
-    public boolean exists(String asterixInstanceName) throws Exception {
-        return zk.exists(ASTERIX_INSTANCE_BASE_PATH + File.separator + asterixInstanceName, false) != null;
+    public boolean exists(String path) throws Exception {
+        return zk.exists(ASTERIX_INSTANCE_BASE_PATH + File.separator + path, false) != null;
     }
 
     public void removeAsterixInstance(String name) throws Exception {
         if (!exists(name)) {
             throw new EventException("Asterix instance by name " + name + " does not exists.");
         }
+        if (exists(name + ASTERIX_INSTANCE_STATE_PATH)) {
+            if (exists(name + ASTERIX_INSTANCE_STATE_PATH + File.separator + "clusterState")) {
+                zk.delete(ASTERIX_INSTANCE_BASE_PATH + File.separator + name + ASTERIX_INSTANCE_STATE_PATH
+                        + ASTERIX_INSTANCE_STATE_REPORT, DEFAULT_NODE_VERSION);
+            }
+            zk.delete(ASTERIX_INSTANCE_BASE_PATH + File.separator + name + ASTERIX_INSTANCE_STATE_PATH,
+                    DEFAULT_NODE_VERSION);
+        }
         zk.delete(ASTERIX_INSTANCE_BASE_PATH + File.separator + name, DEFAULT_NODE_VERSION);
     }
 
@@ -202,6 +219,25 @@
         writeAsterixInstance(updatedInstance);
     }
 
+    @Override
+    public ClusterStateWatcher startWatchingClusterState(String instanceName) {
+        ClusterStateWatcher watcher = new ClusterStateWatcher(zk, instanceName);
+        watcher.startMonitoringThread();
+        return watcher;
+    }
+
+    @Override
+    public void reportClusterState(String instanceName, ClusterState state) throws Exception {
+        String clusterStatePath = ZooKeeperService.ASTERIX_INSTANCE_BASE_PATH + File.separator + instanceName
+                + ASTERIX_INSTANCE_STATE_PATH;
+        Integer value = state.ordinal();
+        byte[] stateValue = new byte[] { value.byteValue() };
+        // Create a place to put the state of the cluster in
+        zk.create(clusterStatePath + ASTERIX_INSTANCE_STATE_REPORT, stateValue, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        return;
+    }
+
 }
 
 class ZooKeeperWatcher implements Watcher {
@@ -214,10 +250,8 @@
     }
 
     public void process(WatchedEvent wEvent) {
-        switch (wEvent.getState()) {
-            case SyncConnected:
-                msgQ.add("connected");
-                break;
+        if (wEvent.getState() == KeeperState.SyncConnected) {
+            msgQ.add("connected");
         }
     }
 
diff --git a/asterix-events/src/main/resources/events/cc_start/cc_start.sh b/asterix-events/src/main/resources/events/cc_start/cc_start.sh
index b667f16..24d3432 100644
--- a/asterix-events/src/main/resources/events/cc_start/cc_start.sh
+++ b/asterix-events/src/main/resources/events/cc_start/cc_start.sh
@@ -22,4 +22,4 @@
   mkdir -p $LOG_DIR
 fi
 cd $WORKING_DIR
-$ASTERIX_HOME/bin/asterixcc -client-net-ip-address $CLIENT_NET_IP -client-net-port $CLIENT_NET_PORT -cluster-net-ip-address $CLUSTER_NET_IP -cluster-net-port $CLUSTER_NET_PORT -http-port $HTTP_PORT  &> $LOG_DIR/cc.log
+$ASTERIX_HOME/bin/asterixcc -client-net-ip-address $CLIENT_NET_IP -client-net-port $CLIENT_NET_PORT -cluster-net-ip-address $CLUSTER_NET_IP -cluster-net-port $CLUSTER_NET_PORT -http-port $HTTP_PORT &> $LOG_DIR/cc.log
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/CreateCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/CreateCommand.java
index 10fe766..6db0648 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/CreateCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/CreateCommand.java
@@ -21,7 +21,7 @@
 import java.io.File;
 
 import org.kohsuke.args4j.Option;
-
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.configuration.AsterixConfiguration;
 import org.apache.asterix.event.error.VerificationUtil;
 import org.apache.asterix.event.management.AsterixEventServiceClient;
@@ -32,6 +32,7 @@
 import org.apache.asterix.event.schema.pattern.Patterns;
 import org.apache.asterix.event.service.AsterixEventService;
 import org.apache.asterix.event.service.AsterixEventServiceUtil;
+import org.apache.asterix.event.service.ClusterStateWatcher;
 import org.apache.asterix.event.service.ServiceProvider;
 import org.apache.asterix.event.util.PatternCreator;
 import org.apache.asterix.installer.driver.InstallerDriver;
@@ -64,17 +65,25 @@
         AsterixEventServiceUtil.createClusterProperties(cluster, asterixConfiguration);
         AsterixEventServiceClient eventrixClient = AsterixEventService.getAsterixEventServiceClient(cluster, true,
                 false);
-
+        // Store the cluster initially in Zookeeper and start watching
+        ServiceProvider.INSTANCE.getLookupService().writeAsterixInstance(asterixInstance);
+        ClusterStateWatcher stateWatcher = ServiceProvider.INSTANCE.getLookupService().startWatchingClusterState(
+                asterixInstanceName);
         Patterns asterixBinarytrasnferPattern = PatternCreator.INSTANCE.getAsterixBinaryTransferPattern(
                 asterixInstanceName, cluster);
         eventrixClient.submit(asterixBinarytrasnferPattern);
-
         Patterns patterns = PatternCreator.INSTANCE.getStartAsterixPattern(asterixInstanceName, cluster, true);
         eventrixClient.submit(patterns);
 
+        // Check the cluster state
+        ClusterState clusterState = stateWatcher.waitForClusterStart();
+        if (clusterState != ClusterState.ACTIVE) {
+            throw new Exception("CC failed to start");
+        }
+
         AsterixRuntimeState runtimeState = VerificationUtil.getAsterixRuntimeState(asterixInstance);
         VerificationUtil.updateInstanceWithRuntimeDescription(asterixInstance, runtimeState, true);
-        ServiceProvider.INSTANCE.getLookupService().writeAsterixInstance(asterixInstance);
+        ServiceProvider.INSTANCE.getLookupService().updateAsterixInstance(asterixInstance);
         AsterixEventServiceUtil.deleteDirectory(InstallerDriver.getManagixHome() + File.separator
                 + InstallerDriver.ASTERIX_DIR + File.separator + asterixInstanceName);
         LOGGER.info(asterixInstance.getDescription(false));
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/StartCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/StartCommand.java
index 5c53e42..551bc11 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/StartCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/StartCommand.java
@@ -21,7 +21,7 @@
 import java.io.File;
 
 import org.kohsuke.args4j.Option;
-
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.event.error.VerificationUtil;
 import org.apache.asterix.event.management.AsterixEventServiceClient;
 import org.apache.asterix.event.model.AsterixInstance;
@@ -30,6 +30,7 @@
 import org.apache.asterix.event.schema.pattern.Patterns;
 import org.apache.asterix.event.service.AsterixEventService;
 import org.apache.asterix.event.service.AsterixEventServiceUtil;
+import org.apache.asterix.event.service.ClusterStateWatcher;
 import org.apache.asterix.event.service.ServiceProvider;
 import org.apache.asterix.event.util.PatternCreator;
 import org.apache.asterix.installer.driver.InstallerDriver;
@@ -47,9 +48,18 @@
         Patterns asterixBinaryTransferPattern = PatternCreator.INSTANCE.getAsterixBinaryTransferPattern(
                 asterixInstanceName, instance.getCluster());
         client.submit(asterixBinaryTransferPattern);
+        // Start the watcher
+        ClusterStateWatcher stateWatcher = ServiceProvider.INSTANCE.getLookupService().startWatchingClusterState(
+                asterixInstanceName);
         AsterixEventServiceUtil.createClusterProperties(instance.getCluster(), instance.getAsterixConfiguration());
-        Patterns patterns = PatternCreator.INSTANCE.getStartAsterixPattern(asterixInstanceName, instance.getCluster(), false);
+        Patterns patterns = PatternCreator.INSTANCE.getStartAsterixPattern(asterixInstanceName, instance.getCluster(),
+                false);
         client.submit(patterns);
+        // Check the cluster state
+        ClusterState clusterState = stateWatcher.waitForClusterStart();
+        if (clusterState != ClusterState.ACTIVE) {
+            throw new Exception("CC failed to start");
+        }
         AsterixEventServiceUtil.deleteDirectory(InstallerDriver.getManagixHome() + File.separator
                 + InstallerDriver.ASTERIX_DIR + File.separator + asterixInstanceName);
         AsterixRuntimeState runtimeState = VerificationUtil.getAsterixRuntimeState(instance);
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
index 3b19372..52ee4b4 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
@@ -169,6 +169,10 @@
             case INACTIVE:
                 command = "start -n " + ASTERIX_INSTANCE_NAME;
                 break;
+            case UNUSABLE:
+                command = "delete -n " + ASTERIX_INSTANCE_NAME;
+                cmdHandler.processCommand(command.split(" "));
+                throw new Exception("Cluster state was Unusable");
         }
         cmdHandler.processCommand(command.split(" "));
     }
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
index 2d2c612..3e37694 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
@@ -54,8 +54,6 @@
 
     public static ClusterManager INSTANCE = new ClusterManager();
 
-    private static String eventsDir = System.getenv("user.dir") + File.separator + "eventrix";
-
     private static AsterixEventServiceClient client;
 
     private static ILookupService lookupService;
@@ -117,7 +115,8 @@
             String hostId = node.getId();
             String nodeControllerId = asterixInstanceName + "_" + node.getId();
             String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
-            Pattern startNC = PatternCreator.INSTANCE.createNCStartPattern(ccHost, hostId, nodeControllerId, iodevices, false);
+            Pattern startNC = PatternCreator.INSTANCE.createNCStartPattern(ccHost, hostId, nodeControllerId, iodevices,
+                    false);
             pattern.add(startNC);
             Patterns startNCPattern = new Patterns(pattern);
             client.submit(startNCPattern);
@@ -170,4 +169,8 @@
     public Set<IClusterEventsSubscriber> getRegisteredClusterEventSubscribers() {
         return eventSubscribers;
     }
+
+    public static ILookupService getLookupService() {
+        return lookupService;
+    }
 }