checkpoint post merge from master
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index e56ed92..9fcbb29 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -62,6 +62,9 @@
 import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactoryProvider;
 
 public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAsterixPropertiesProvider {
+
+    public static final AsterixPropertiesAccessor ASTERIX_PROPERTIES_ACCESSOR = createAsterixPropertiesAccessor();
+
     private static final int METADATA_IO_DEVICE_ID = 0;
 
     private final INCApplicationContext ncApplicationContext;
@@ -84,18 +87,26 @@
     private IIOManager ioManager;
     private boolean isShuttingdown;
 
-    public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) {
+    public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) throws AsterixException {
         this.ncApplicationContext = ncApplicationContext;
+        compilerProperties = new AsterixCompilerProperties(ASTERIX_PROPERTIES_ACCESSOR);
+        externalProperties = new AsterixExternalProperties(ASTERIX_PROPERTIES_ACCESSOR);
+        metadataProperties = new AsterixMetadataProperties(ASTERIX_PROPERTIES_ACCESSOR);
+        storageProperties = new AsterixStorageProperties(ASTERIX_PROPERTIES_ACCESSOR);
+        txnProperties = new AsterixTransactionProperties(ASTERIX_PROPERTIES_ACCESSOR);
+    }
+
+    private static AsterixPropertiesAccessor createAsterixPropertiesAccessor() {
+        AsterixPropertiesAccessor propertiesAccessor = null;
+        try {
+            propertiesAccessor = new AsterixPropertiesAccessor();
+        } catch (AsterixException e) {
+            throw new IllegalStateException("Unable to create properties accessor");
+        }
+        return propertiesAccessor;
     }
 
     public void initialize() throws IOException, ACIDException, AsterixException {
-        AsterixPropertiesAccessor propertiesAccessor = new AsterixPropertiesAccessor();
-        compilerProperties = new AsterixCompilerProperties(propertiesAccessor);
-        externalProperties = new AsterixExternalProperties(propertiesAccessor);
-        metadataProperties = new AsterixMetadataProperties(propertiesAccessor);
-        storageProperties = new AsterixStorageProperties(propertiesAccessor);
-        txnProperties = new AsterixTransactionProperties(propertiesAccessor);
-
         Logger.getLogger("edu.uci.ics").setLevel(externalProperties.getLogLevel());
 
         fileMapManager = new AsterixFileMapManager();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 97a536d..ed0cab1 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -33,7 +33,6 @@
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
 import edu.uci.ics.asterix.metadata.bootstrap.AsterixStateProxy;
-import edu.uci.ics.asterix.metadata.feeds.FeedJobLifecycleListener;
 import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
@@ -71,9 +70,6 @@
         AsterixAppContextInfo.getInstance().getCCApplicationContext()
                 .addJobLifecycleListener(FeedLifecycleListener.INSTANCE);
 
-        AsterixAppContextInfo.getInstance().getCCApplicationContext()
-                .addClusterLifecycleListener(FeedLifecycleListener.INSTANCE);
-
         AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
         setupWebServer(externalProperties);
         webServer.start();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index a864e61..941733e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -14,11 +14,22 @@
  */
 package edu.uci.ics.asterix.hyracks.bootstrap;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.event.schema.cluster.Node;
+import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
+import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
+import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse.Status;
+import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWork;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
 import edu.uci.ics.hyracks.api.application.IClusterLifecycleListener;
 
@@ -37,6 +48,7 @@
             LOGGER.info("NC: " + nodeId + " joined");
         }
         AsterixClusterProperties.INSTANCE.addNCConfiguration(nodeId, ncConfiguration);
+
     }
 
     public void notifyNodeFailure(Set<String> deadNodeIds) {
@@ -46,7 +58,58 @@
             }
             AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
         }
+        Set<IClusterEventsSubscriber> subscribers = ClusterManager.INSTANCE.getRegisteredClusterEventSubscribers();
+        Set<IClusterManagementWork> work = new HashSet<IClusterManagementWork>();
+        for (IClusterEventsSubscriber sub : subscribers) {
+            work.addAll(sub.notifyNodeFailure(deadNodeIds));
+        }
+
+        int nodesToAdd = 0;
+        Set<String> nodesToRemove = new HashSet<String>();
+        Set<IClusterManagementWork> nodeAdditionRequests = new HashSet<IClusterManagementWork>();
+        Set<IClusterManagementWork> nodeRemovalRequests = new HashSet<IClusterManagementWork>();
+        for (IClusterManagementWork w : work) {
+            switch (w.getClusterManagementWorkType()) {
+                case ADD_NODE:
+                    if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodes()) {
+                        nodesToAdd = ((AddNodeWork) w).getNumberOfNodes();
+                    }
+                    nodeAdditionRequests.add(w);
+                    break;
+                case REMOVE_NODE:
+                    nodesToRemove.addAll(((RemoveNodeWork) w).getNodesToBeRemoved());
+                    nodeRemovalRequests.add(w);
+                    break;
+            }
+        }
+
+        Set<Node> addedNodes = new HashSet<Node>();
+        for (int i = 0; i < nodesToAdd; i++) {
+            Node node = AsterixClusterProperties.INSTANCE.getAvailableSubstitutionNode();
+            if (node != null) {
+                try {
+                    ClusterManager.INSTANCE.addNode(node);
+                    addedNodes.add(node);
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Added NC at:" + node.getId());
+                    }
+                } catch (AsterixException e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Unable to add NC at:" + node.getId());
+                    }
+                    e.printStackTrace();
+                }
+            } else {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Unable to add NC: no more available nodes");
+                }
+            }
+        }
+
+        for (IClusterManagementWork w : nodeAdditionRequests) {
+            w.getSourceSubscriber().notifyRequestCompletion(
+                    new AddNodeWorkResponse((AddNodeWork) w, Status.SUCCESS, addedNodes));
+        }
 
     }
-
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 069cdb6..e4ef300 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.asterix.common.api.AsterixThreadFactory;
 import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
 import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
 import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
 import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
 import edu.uci.ics.asterix.common.transactions.IRecoveryManager.SystemState;
@@ -63,6 +64,14 @@
         Runtime.getRuntime().addShutdownHook(sHook);
 
         runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext);
+        AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
+                .getMetadataProperties();
+        if (!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId())) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Substitute node joining : " + ncApplicationContext.getNodeId());
+            }
+            updateOnNodeJoin();
+        }
         runtimeContext.initialize();
         ncApplicationContext.setApplicationObject(runtimeContext);
 
@@ -115,25 +124,6 @@
         IAsterixStateProxy proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
         AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
                 .getMetadataProperties();
-        if (!metadataProperties.getNodeNames().contains(nodeId)) {
-            metadataProperties.getNodeNames().add(nodeId);
-            Cluster cluster = AsterixClusterProperties.INSTANCE.getCluster();
-            Node self = null;
-            for (Node node : cluster.getSubstituteNodes().getNode()) {
-                if (node.getId().equalsIgnoreCase(nodeId)) {
-                    String storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
-                    metadataProperties.getStores().put(nodeId, storeDir.split(","));
-                    self = node;
-                    break;
-                }
-            }
-            if (self != null) {
-                cluster.getSubstituteNodes().getNode().remove(self);
-                cluster.getNode().add(self);
-            } else {
-                throw new IllegalStateException("Unknown node joining the cluster");
-            }
-        }
 
         if (systemState == SystemState.NEW_UNIVERSE) {
             if (LOGGER.isLoggable(Level.INFO)) {
@@ -201,6 +191,47 @@
         }
     }
 
+    private void updateOnNodeJoin() {
+        AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
+                .getMetadataProperties();
+        AsterixTransactionProperties txnProperties = ((IAsterixPropertiesProvider) runtimeContext)
+                .getTransactionProperties();
+        if (!metadataProperties.getNodeNames().contains(nodeId)) {
+            metadataProperties.getNodeNames().add(nodeId);
+            Cluster cluster = AsterixClusterProperties.INSTANCE.getCluster();
+            String asterixInstanceName = cluster.getInstanceName();
+            Node self = null;
+            for (Node node : cluster.getSubstituteNodes().getNode()) {
+                String ncId = asterixInstanceName + "_" + node.getId();
+                if (ncId.equalsIgnoreCase(nodeId)) {
+                    String storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
+                    metadataProperties.getStores().put(nodeId, storeDir.split(","));
+
+                    String coredumpPath = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
+                    metadataProperties.getCoredumpPaths().put(nodeId, coredumpPath);
+
+                    String txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir();
+                    txnProperties.getLogDirectories().put(nodeId, txnLogDir);
+
+                    if (LOGGER.isLoggable(Level.INFO)) {
+                        LOGGER.info("Store set to : " + storeDir);
+                        LOGGER.info("Coredump dir set to : " + coredumpPath);
+                        LOGGER.info("Transaction log dir set to :" + txnLogDir);
+                    }
+                    self = node;
+                    break;
+                }
+            }
+            if (self != null) {
+                cluster.getSubstituteNodes().getNode().remove(self);
+                cluster.getNode().add(self);
+            } else {
+                throw new IllegalStateException("Unknown node joining the cluster");
+            }
+        }
+
+    }
+
     /**
      * Shutdown hook that invokes {@link NCApplicationEntryPoint#stop() stop} method.
      */
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java
index 7bab09d..98c7c91 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java
@@ -27,6 +27,9 @@
     private static final String EXTERNAL_APISERVER_KEY = "api.port";
     private static int EXTERNAL_APISERVER_DEFAULT = 19002;
 
+    private static final String EXTERNAL_CC_JAVA_OPTS_KEY = "cc.java.opts";
+    private static String EXTERNAL_CC_JAVA_OPTS_DEFAULT = "-Xmx1024m";
+
     private static final String EXTERNAL_NC_JAVA_OPTS_KEY = "nc.java.opts";
     private static String EXTERNAL_NC_JAVA_OPTS_DEFAULT = "-Xmx1024m";
 
@@ -53,4 +56,9 @@
         return accessor.getProperty(EXTERNAL_NC_JAVA_OPTS_KEY, EXTERNAL_NC_JAVA_OPTS_DEFAULT,
                 PropertyInterpreters.getStringPropertyInterpreter());
     }
+
+    public String getCCJavaParams() {
+        return accessor.getProperty(EXTERNAL_CC_JAVA_OPTS_KEY, EXTERNAL_CC_JAVA_OPTS_DEFAULT,
+                PropertyInterpreters.getStringPropertyInterpreter());
+    }
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
index 73a1f46..1d68dfc 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
@@ -42,9 +42,12 @@
     public Set<String> getNodeNames() {
         return accessor.getNodeNames();
     }
-    
-    public String getCoredumpPath(String nodeId){
+
+    public String getCoredumpPath(String nodeId) {
         return accessor.getCoredumpPath(nodeId);
     }
 
+    public Map<String, String> getCoredumpPaths() {
+        return accessor.getCoredumpConfig();
+    }
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
index 881bae2..7654aa3 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
@@ -115,8 +115,26 @@
         return coredumpConfig.get(nodeId);
     }
 
-    public String getTransactionLogDir(String nodeId) {
-        return transactionLogDirs.get(nodeId);
+    public Map<String, String> getTransactionLogDirs() {
+        return transactionLogDirs;
+    }
+
+    public Map<String, String> getCoredumpConfig() {
+        return coredumpConfig;
+    }
+
+    public void putCoredumpPaths(String nodeId, String coredumpPath) {
+        if (coredumpConfig.containsKey(nodeId)) {
+            throw new IllegalStateException("Cannot override value for coredump path");
+        }
+        coredumpConfig.put(nodeId, coredumpPath);
+    }
+
+    public void putTransactionLogDir(String nodeId, String txnLogDir) {
+        if (transactionLogDirs.containsKey(nodeId)) {
+            throw new IllegalStateException("Cannot override value for txnLogDir");
+        }
+        transactionLogDirs.put(nodeId, txnLogDir);
     }
 
     public <T> T getProperty(String property, T defaultValue, IPropertyInterpreter<T> interpreter) {
@@ -143,4 +161,5 @@
     public String getInstanceName() {
         return instanceName;
     }
+
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java
index 6fa5beb..6094a27 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.asterix.common.config;
 
+import java.util.Map;
+
 public class AsterixTransactionProperties extends AbstractAsterixProperties {
 
     private static final String TXN_LOG_BUFFER_NUMPAGES_KEY = "txn.log.buffer.numpages";
@@ -57,7 +59,11 @@
     }
 
     public String getLogDirectory(String nodeId) {
-        return accessor.getTransactionLogDir(nodeId);
+        return accessor.getTransactionLogDirs().get(nodeId);
+    }
+
+    public Map<String, String> getLogDirectories() {
+        return accessor.getTransactionLogDirs();
     }
 
     public int getLogBufferNumPages() {
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 09b71d5..5192e06 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -3,7 +3,8 @@
 	xmlns:cl="cluster" targetNamespace="cluster" elementFormDefault="qualified">
 
 	<!-- definition of simple types -->
-	<xs:element name="name" type="xs:string" />
+	<xs:element name="instance_name" type="xs:string" />
+	<xs:element name="cluster_name" type="xs:string" />
 	<xs:element name="log_dir" type="xs:string" />
 	<xs:element name="txn_log_dir" type="xs:string" />
 	<xs:element name="id" type="xs:string" />
@@ -40,9 +41,9 @@
 				<xs:element ref="cl:cluster_ip" />
 				<xs:element ref="cl:java_home" minOccurs="0" />
 				<xs:element ref="cl:log_dir" minOccurs="0" />
-				<xs:element ref="cl:client_port"  />
-				<xs:element ref="cl:cluster_port"  />
-				<xs:element ref="cl:http_port"  />
+				<xs:element ref="cl:client_port" />
+				<xs:element ref="cl:cluster_port" />
+				<xs:element ref="cl:http_port" />
 			</xs:sequence>
 		</xs:complexType>
 	</xs:element>
@@ -89,7 +90,8 @@
 	<xs:element name="cluster">
 		<xs:complexType>
 			<xs:sequence>
-				<xs:element ref="cl:name" />
+				<xs:element ref="cl:instance_name" />
+				<xs:element ref="cl:cluster_name" />
 				<xs:element ref="cl:username" />
 				<xs:element ref="cl:env" minOccurs="0" />
 				<xs:element ref="cl:java_home" minOccurs="0" />
@@ -100,7 +102,7 @@
 				<xs:element ref="cl:working_dir" />
 				<xs:element ref="cl:master_node" />
 				<xs:element ref="cl:node" maxOccurs="unbounded" />
-				<xs:element ref="cl:substitute_nodes"/>
+				<xs:element ref="cl:substitute_nodes" />
 			</xs:sequence>
 		</xs:complexType>
 	</xs:element>
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java
index 5fcc5cf..c643c42 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java
@@ -29,97 +29,107 @@
 
 public class VerificationUtil {
 
-    private static final String VERIFY_SCRIPT_PATH = AsterixEventService.getEventHome() + File.separator + "scripts" + File.separator + "verify.sh";
+	private static final String VERIFY_SCRIPT_PATH = AsterixEventService
+			.getEventHome()
+			+ File.separator
+			+ "scripts"
+			+ File.separator
+			+ "verify.sh";
 
-    public static AsterixRuntimeState getAsterixRuntimeState(AsterixInstance instance) throws Exception {
+	public static AsterixRuntimeState getAsterixRuntimeState(
+			AsterixInstance instance) throws Exception {
 
-        Cluster cluster = instance.getCluster();
-        List<String> args = new ArrayList<String>();
-        args.add(instance.getName());
-        args.add(instance.getCluster().getMasterNode().getClusterIp());
-        for (Node node : cluster.getNode()) {
-            args.add(node.getClusterIp());
-            args.add(instance.getName() + "_" + node.getId());
-        }
-        Thread.sleep(2000);
-        String output = AsterixEventServiceUtil.executeLocalScript(VERIFY_SCRIPT_PATH, args);
-        boolean ccRunning = true;
-        List<String> failedNCs = new ArrayList<String>();
-        String[] infoFields;
-        ProcessInfo pInfo;
-        List<ProcessInfo> processes = new ArrayList<ProcessInfo>();
+		Cluster cluster = instance.getCluster();
+		List<String> args = new ArrayList<String>();
+		args.add(instance.getName());
+		args.add(instance.getCluster().getMasterNode().getClusterIp());
+		for (Node node : cluster.getNode()) {
+			args.add(node.getClusterIp());
+			args.add(instance.getName() + "_" + node.getId());
+		}
+		Thread.sleep(2000);
+		String output = AsterixEventServiceUtil.executeLocalScript(
+				VERIFY_SCRIPT_PATH, args);
+		boolean ccRunning = true;
+		List<String> failedNCs = new ArrayList<String>();
+		String[] infoFields;
+		ProcessInfo pInfo;
+		List<ProcessInfo> processes = new ArrayList<ProcessInfo>();
 
-        for (String line : output.split("\n")) {
-            String nodeid = null;
-            infoFields = line.split(":");
-            try {
-                int pid = Integer.parseInt(infoFields[3]);
-                if (infoFields[0].equals("NC")) {
-                    nodeid = infoFields[2].split("_")[1];
-                } else {
-                    nodeid = instance.getCluster().getMasterNode().getId();
-                }
-                pInfo = new ProcessInfo(infoFields[0], infoFields[1], nodeid, pid);
-                processes.add(pInfo);
-            } catch (Exception e) {
-                if (infoFields[0].equalsIgnoreCase("CC")) {
-                    ccRunning = false;
-                } else {
-                    failedNCs.add(infoFields[1]);
-                }
-            }
-        }
-        return new AsterixRuntimeState(processes, failedNCs, ccRunning);
-    }
+		for (String line : output.split("\n")) {
+			String nodeid = null;
+			infoFields = line.split(":");
+			try {
+				int pid = Integer.parseInt(infoFields[3]);
+				if (infoFields[0].equals("NC")) {
+					nodeid = infoFields[2].split("_")[1];
+				} else {
+					nodeid = instance.getCluster().getMasterNode().getId();
+				}
+				pInfo = new ProcessInfo(infoFields[0], infoFields[1], nodeid,
+						pid);
+				processes.add(pInfo);
+			} catch (Exception e) {
+				if (infoFields[0].equalsIgnoreCase("CC")) {
+					ccRunning = false;
+				} else {
+					failedNCs.add(infoFields[1]);
+				}
+			}
+		}
+		return new AsterixRuntimeState(processes, failedNCs, ccRunning);
+	}
 
-    public static void updateInstanceWithRuntimeDescription(AsterixInstance instance, AsterixRuntimeState state,
-            boolean expectedRunning) {
-        StringBuffer summary = new StringBuffer();
-        if (expectedRunning) {
-            if (!state.isCcRunning()) {
-                summary.append("Cluster Controller not running at " + instance.getCluster().getMasterNode().getId()
-                        + "\n");
-                instance.setState(State.UNUSABLE);
-            }
-            if (state.getFailedNCs() != null && !state.getFailedNCs().isEmpty()) {
-                summary.append("Node Controller not running at the following nodes" + "\n");
-                for (String failedNC : state.getFailedNCs()) {
-                    summary.append(failedNC + "\n");
-                }
-                instance.setState(State.UNUSABLE);
-            }
-            if (!(instance.getState().equals(State.UNUSABLE))) {
-                instance.setState(State.ACTIVE);
-            }
-        } else {
-            if (state.getProcesses() != null && state.getProcesses().size() > 0) {
-                summary.append("Following process still running " + "\n");
-                for (ProcessInfo pInfo : state.getProcesses()) {
-                    summary.append(pInfo + "\n");
-                }
-                instance.setState(State.UNUSABLE);
-            } else {
-                instance.setState(State.INACTIVE);
-            }
-        }
-        state.setSummary(summary.toString());
-        instance.setAsterixRuntimeStates(state);
-    }
+	public static void updateInstanceWithRuntimeDescription(
+			AsterixInstance instance, AsterixRuntimeState state,
+			boolean expectedRunning) {
+		StringBuffer summary = new StringBuffer();
+		if (expectedRunning) {
+			if (!state.isCcRunning()) {
+				summary.append("Cluster Controller not running at "
+						+ instance.getCluster().getMasterNode().getId() + "\n");
+				instance.setState(State.UNUSABLE);
+			}
+			if (state.getFailedNCs() != null && !state.getFailedNCs().isEmpty()) {
+				summary.append("Node Controller not running at the following nodes"
+						+ "\n");
+				for (String failedNC : state.getFailedNCs()) {
+					summary.append(failedNC + "\n");
+				}
+				// instance.setState(State.UNUSABLE);
+			}
+			if (!(instance.getState().equals(State.UNUSABLE))) {
+				// instance.setState(State.ACTIVE);
+			}
+		} else {
+			if (state.getProcesses() != null && state.getProcesses().size() > 0) {
+				summary.append("Following process still running " + "\n");
+				for (ProcessInfo pInfo : state.getProcesses()) {
+					summary.append(pInfo + "\n");
+				}
+				// instance.setState(State.UNUSABLE);
+			} else {
+				// instance.setState(State.INACTIVE);
+			}
+		}
+		state.setSummary(summary.toString());
+		instance.setAsterixRuntimeStates(state);
+	}
 
-    public static void verifyBackupRestoreConfiguration(String hdfsUrl, String hadoopVersion, String hdfsBackupDir)
-            throws Exception {
-        StringBuffer errorCheck = new StringBuffer();
-        if (hdfsUrl == null || hdfsUrl.length() == 0) {
-            errorCheck.append("\n HDFS Url not configured");
-        }
-        if (hadoopVersion == null || hadoopVersion.length() == 0) {
-            errorCheck.append("\n HDFS version not configured");
-        }
-        if (hdfsBackupDir == null || hdfsBackupDir.length() == 0) {
-            errorCheck.append("\n HDFS backup directory not configured");
-        }
-        if (errorCheck.length() > 0) {
-            throw new Exception("Incomplete hdfs configuration" + errorCheck);
-        }
-    }
+	public static void verifyBackupRestoreConfiguration(String hdfsUrl,
+			String hadoopVersion, String hdfsBackupDir) throws Exception {
+		StringBuffer errorCheck = new StringBuffer();
+		if (hdfsUrl == null || hdfsUrl.length() == 0) {
+			errorCheck.append("\n HDFS Url not configured");
+		}
+		if (hadoopVersion == null || hadoopVersion.length() == 0) {
+			errorCheck.append("\n HDFS version not configured");
+		}
+		if (hdfsBackupDir == null || hdfsBackupDir.length() == 0) {
+			errorCheck.append("\n HDFS backup directory not configured");
+		}
+		if (errorCheck.length() > 0) {
+			throw new Exception("Incomplete hdfs configuration" + errorCheck);
+		}
+	}
 }
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/model/AsterixInstance.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/model/AsterixInstance.java
index b5e2460..87f14b0 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/model/AsterixInstance.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/model/AsterixInstance.java
@@ -30,7 +30,7 @@
     private static final long serialVersionUID = 1L;
 
     private static final int WEB_INTERFACE_PORT_DEFAULT = 19001;
-    
+
     public enum State {
         ACTIVE,
         INACTIVE,
@@ -61,7 +61,6 @@
         this.asterixVersion = asterixVersion;
         this.createdTimestamp = new Date();
         this.backupInfo = new ArrayList<BackupInfo>();
-
     }
 
     public Date getModifiedTimestamp() {
@@ -212,4 +211,5 @@
     public void setAsterixConfiguration(AsterixConfiguration asterixConfiguration) {
         this.asterixConfiguration = asterixConfiguration;
     }
+
 }
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
index 1bca9e9..b9ed170 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
@@ -95,11 +95,25 @@
             JAXBException, EventException {
 
         String modifiedZipPath = injectAsterixPropertyFile(AsterixEventService.getAsterixZip(), asterixInstance);
-        modifiedZipPath = injectAsterixLogPropertyFile(modifiedZipPath, asterixInstance);
-        modifiedZipPath = injectAsterixClusterConfigurationFile(modifiedZipPath, asterixInstance);
+        injectAsterixClusterConfigurationFile(modifiedZipPath, asterixInstance);
     }
 
     public static void createClusterProperties(Cluster cluster, AsterixConfiguration asterixConfiguration) {
+
+        String ccJavaOpts = null;
+        String ncJavaOpts = null;
+        for (edu.uci.ics.asterix.common.configuration.Property property : asterixConfiguration.getProperty()) {
+            if (property.getName().equalsIgnoreCase(EventUtil.CC_JAVA_OPTS)) {
+                ccJavaOpts = property.getValue();
+            } else if (property.getName().equalsIgnoreCase(EventUtil.NC_JAVA_OPTS)) {
+                ncJavaOpts = property.getValue();
+            }
+        }
+
+        poulateClusterEnvironmentProperties(cluster, ccJavaOpts, ncJavaOpts);
+    }
+
+    public static void poulateClusterEnvironmentProperties(Cluster cluster, String ccJavaOpts, String ncJavaOpts) {
         List<Property> clusterProperties = null;
         if (cluster.getEnv() != null && cluster.getEnv().getProperty() != null) {
             clusterProperties = cluster.getEnv().getProperty();
@@ -107,13 +121,9 @@
         } else {
             clusterProperties = new ArrayList<Property>();
         }
-        for (edu.uci.ics.asterix.common.configuration.Property property : asterixConfiguration.getProperty()) {
-            if (property.getName().equalsIgnoreCase(EventUtil.CC_JAVA_OPTS)) {
-                clusterProperties.add(new Property(EventUtil.CC_JAVA_OPTS, property.getValue()));
-            } else if (property.getName().equalsIgnoreCase(EventUtil.NC_JAVA_OPTS)) {
-                clusterProperties.add(new Property(EventUtil.NC_JAVA_OPTS, property.getValue()));
-            }
-        }
+
+        clusterProperties.add(new Property(EventUtil.CC_JAVA_OPTS, ccJavaOpts));
+        clusterProperties.add(new Property(EventUtil.NC_JAVA_OPTS, ncJavaOpts));
         clusterProperties.add(new Property("ASTERIX_HOME", cluster.getWorkingDir().getDir() + File.separator
                 + "asterix"));
         clusterProperties.add(new Property("LOG_DIR", cluster.getLogDir()));
@@ -136,10 +146,6 @@
         cluster.setEnv(new Env(clusterProperties));
     }
 
-    public static void poulateClusterEnvironmentProperties(Cluster cluster) {
-
-    }
-
     private static String injectAsterixPropertyFile(String origZipFile, AsterixInstance asterixInstance)
             throws IOException, JAXBException {
         writeAsterixConfigurationFile(asterixInstance);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index a789a20..0223700 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -50,7 +50,6 @@
     protected boolean alterRequested = false;
     private Map<String, Object> modifiedConfiguration = null;
     private long tupleCount = 0;
-    private FeedPolicyEnforcer policyEnforcer;
 
     public abstract IPullBasedFeedClient getFeedClient(int partition) throws Exception;
 
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java
index 7784d79..5b024ec 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java
@@ -51,13 +51,15 @@
         AsterixEventServiceUtil.validateAsterixInstanceNotExists(asterixInstanceName);
         CreateConfig createConfig = (CreateConfig) config;
         cluster = EventUtil.getCluster(createConfig.clusterPath);
+        cluster.setInstanceName(asterixInstanceName);
         asterixConfiguration = InstallerUtil.getAsterixConfiguration(createConfig.asterixConfPath);
         AsterixInstance asterixInstance = AsterixEventServiceUtil.createAsterixInstance(asterixInstanceName, cluster,
                 asterixConfiguration);
         AsterixEventServiceUtil.evaluateConflictWithOtherInstances(asterixInstance);
         AsterixEventServiceUtil.createAsterixZip(asterixInstance);
         AsterixEventServiceUtil.createClusterProperties(cluster, asterixConfiguration);
-        AsterixEventServiceClient eventrixClient = AsterixEventService.getAsterixEventServiceClient(cluster, true, false);
+        AsterixEventServiceClient eventrixClient = AsterixEventService.getAsterixEventServiceClient(cluster, true,
+                false);
 
         Patterns asterixBinarytrasnferPattern = PatternCreator.INSTANCE.getAsterixBinaryTransferPattern(
                 asterixInstanceName, cluster);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java
new file mode 100644
index 0000000..39f9927
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java
@@ -0,0 +1,37 @@
+package edu.uci.ics.asterix.metadata.api;
+
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Set;
+
+import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
+
+public interface IClusterEventsSubscriber {
+
+    /**
+     * @param deadNodeIds
+     * @return
+     */
+    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds);
+
+    /**
+     * @param joinedNodeId
+     * @return
+     */
+    public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId);
+
+    public void notifyRequestCompletion(IClusterManagementWorkResponse response);
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManagementWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManagementWork.java
new file mode 100644
index 0000000..65ac354
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManagementWork.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.api;
+
+public interface IClusterManagementWork {
+
+    public enum WorkType {
+        ADD_NODE,
+        REMOVE_NODE
+    }
+
+    public WorkType getClusterManagementWorkType();
+
+    public int getWorkId();
+
+    public IClusterEventsSubscriber getSourceSubscriber();
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java
index 386d89e..ea07a62 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java
@@ -1,5 +1,7 @@
 package edu.uci.ics.asterix.metadata.api;
 
+import java.util.Set;
+
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.event.schema.cluster.Node;
 
@@ -17,4 +19,20 @@
      */
     public void removeNode(Node node) throws AsterixException;
 
+    /**
+     * @param subscriber
+     */
+    public void registerSubscriber(IClusterEventsSubscriber subscriber);
+
+    /**
+     * @param sunscriber
+     * @return
+     */
+    public boolean deregisterSubscriber(IClusterEventsSubscriber sunscriber);
+
+    /**
+     * @return
+     */
+    public Set<IClusterEventsSubscriber> getRegisteredClusterEventSubscribers();
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
new file mode 100644
index 0000000..bde860f
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.cluster;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+
+public abstract class AbstractClusterManagementWork implements IClusterManagementWork {
+
+    protected final IClusterEventsSubscriber subscriber;
+
+    @Override
+    public int getWorkId() {
+        return WorkIdGenerator.getNextWorkId();
+    }
+
+    public AbstractClusterManagementWork(IClusterEventsSubscriber subscriber) {
+        this.subscriber = subscriber;
+    }
+
+    private static class WorkIdGenerator {
+        private static AtomicInteger workId = new AtomicInteger(0);
+
+        public static int getNextWorkId() {
+            return workId.incrementAndGet();
+        }
+
+    }
+
+    @Override
+    public IClusterEventsSubscriber getSourceSubscriber() {
+        return subscriber;
+    }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
new file mode 100644
index 0000000..4344ac8
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.asterix.metadata.cluster;
+
+import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
+
+public class AddNodeWork extends AbstractClusterManagementWork {
+
+    private final int numberOfNodes;
+
+    @Override
+    public WorkType getClusterManagementWorkType() {
+        return WorkType.ADD_NODE;
+    }
+
+    public AddNodeWork(int numberOfNodes, IClusterEventsSubscriber subscriber) {
+        super(subscriber);
+        this.numberOfNodes = numberOfNodes;
+    }
+
+    public int getNumberOfNodes() {
+        return numberOfNodes;
+    }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWorkResponse.java
new file mode 100644
index 0000000..fca9990
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWorkResponse.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.cluster;
+
+import java.util.Set;
+
+import edu.uci.ics.asterix.event.schema.cluster.Node;
+
+public class AddNodeWorkResponse extends ClusterManagementWorkResponse {
+
+    private final Set<Node> nodesAdded;
+
+    public AddNodeWorkResponse(AddNodeWork w, Status status, Set<Node> nodesAdded) {
+        super(w, status);
+        this.nodesAdded = nodesAdded;
+    }
+
+    public Set<Node> getNodesAdded() {
+        return nodesAdded;
+    }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
new file mode 100644
index 0000000..e720c9f
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.asterix.metadata.cluster;
+
+import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+
+public class ClusterManagementWorkResponse implements IClusterManagementWorkResponse {
+
+    protected final IClusterManagementWork work;
+
+    protected final Status status;
+
+    public ClusterManagementWorkResponse(IClusterManagementWork w, Status status) {
+        this.work = w;
+        this.status = status;
+    }
+
+    public IClusterManagementWork getWork() {
+        return work;
+    }
+
+    public Status getStatus() {
+        return status;
+    }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
index b73b861..dc072d4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
@@ -2,32 +2,48 @@
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.Unmarshaller;
 
+import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.event.management.AsterixEventServiceClient;
+import edu.uci.ics.asterix.event.model.AsterixInstance;
 import edu.uci.ics.asterix.event.schema.cluster.Cluster;
 import edu.uci.ics.asterix.event.schema.cluster.Node;
 import edu.uci.ics.asterix.event.schema.pattern.Pattern;
 import edu.uci.ics.asterix.event.schema.pattern.Patterns;
 import edu.uci.ics.asterix.event.service.AsterixEventService;
+import edu.uci.ics.asterix.event.service.AsterixEventServiceUtil;
+import edu.uci.ics.asterix.event.service.ILookupService;
+import edu.uci.ics.asterix.event.service.ServiceProvider;
 import edu.uci.ics.asterix.event.util.PatternCreator;
 import edu.uci.ics.asterix.installer.schema.conf.Configuration;
+import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
 import edu.uci.ics.asterix.metadata.api.IClusterManager;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
 
 public class ClusterManager implements IClusterManager {
 
+    private static final Logger LOGGER = Logger.getLogger(AsterixEventServiceClient.class.getName());
+
     public static ClusterManager INSTANCE = new ClusterManager();
 
     private static String eventsDir = System.getenv("user.dir") + File.separator + "eventrix";
 
     private static AsterixEventServiceClient client;
 
+    private static ILookupService lookupService;
+
+    private static final Set<IClusterEventsSubscriber> eventSubscribers = new HashSet<IClusterEventsSubscriber>();
+
     private ClusterManager() {
         Cluster asterixCluster = AsterixClusterProperties.INSTANCE.getCluster();
         String asterixDir = System.getProperty("user.dir") + File.separator + "asterix";
@@ -41,6 +57,19 @@
             configuration = (Configuration) unmarshaller.unmarshal(configFile);
             AsterixEventService.initialize(configuration, asterixDir, eventHome);
             client = AsterixEventService.getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE.getCluster());
+
+            lookupService = ServiceProvider.INSTANCE.getLookupService();
+            if (!lookupService.isRunning(configuration)) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Lookup service not running. Starting lookup service ...");
+                }
+                lookupService.startService(configuration);
+            } else {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Lookup service running");
+                }
+            }
+
         } catch (Exception e) {
             throw new IllegalStateException("Unable to initialize cluster manager" + e);
         }
@@ -57,6 +86,10 @@
             cluster.getNode().add(node);
             client.submit(prepareNode);
 
+            AsterixExternalProperties externalProps = AsterixAppContextInfo.getInstance().getExternalProperties();
+            AsterixEventServiceUtil.poulateClusterEnvironmentProperties(cluster, externalProps.getCCJavaParams(),
+                    externalProps.getNCJavaParams());
+
             pattern.clear();
             String ccHost = cluster.getMasterNode().getClusterIp();
             String hostId = node.getId();
@@ -66,6 +99,12 @@
             pattern.add(startNC);
             Patterns startNCPattern = new Patterns(pattern);
             client.submit(startNCPattern);
+
+            AsterixInstance instance = lookupService.getAsterixInstance(cluster.getInstanceName());
+            instance.getCluster().getNode().add(node);
+            instance.getCluster().getSubstituteNodes().getNode().remove(node);
+            lookupService.updateAsterixInstance(instance);
+
         } catch (Exception e) {
             throw new AsterixException(e);
         }
@@ -79,7 +118,21 @@
 
     private List<Pattern> getRemoveNodePattern(Node node) {
         List<Pattern> pattern = new ArrayList<Pattern>();
-
         return pattern;
     }
+
+    @Override
+    public void registerSubscriber(IClusterEventsSubscriber subscriber) {
+        eventSubscribers.add(subscriber);
+    }
+
+    @Override
+    public boolean deregisterSubscriber(IClusterEventsSubscriber subscriber) {
+        return eventSubscribers.remove(subscriber);
+    }
+
+    @Override
+    public Set<IClusterEventsSubscriber> getRegisteredClusterEventSubscribers() {
+        return eventSubscribers;
+    }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
new file mode 100644
index 0000000..ae3ce7e
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.metadata.cluster;
+
+public interface IClusterManagementWorkResponse {
+
+    public enum Status {
+        SUCCESS,
+        FAILURE
+    }
+    
+    
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
new file mode 100644
index 0000000..0b15df7
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
@@ -0,0 +1,25 @@
+package edu.uci.ics.asterix.metadata.cluster;
+
+import java.util.Set;
+
+import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
+
+public class RemoveNodeWork extends AbstractClusterManagementWork {
+
+    private final Set<String> nodesToBeRemoved;
+
+    @Override
+    public WorkType getClusterManagementWorkType() {
+        return WorkType.REMOVE_NODE;
+    }
+
+    public RemoveNodeWork(Set<String> nodesToBeRemoved, IClusterEventsSubscriber subscriber) {
+        super(subscriber);
+        this.nodesToBeRemoved = nodesToBeRemoved;
+    }
+
+    public Set<String> getNodesToBeRemoved() {
+        return nodesToBeRemoved;
+    }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWorkResponse.java
new file mode 100644
index 0000000..48445ef
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWorkResponse.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.cluster;
+
+import java.util.Set;
+
+public class RemoveNodeWorkResponse extends ClusterManagementWorkResponse {
+
+    private final Set<String> nodesRemoved;
+
+    public RemoveNodeWorkResponse(AddNodeWork w, Status status, Set<String> nodesRemoved) {
+        super(w, status);
+        this.nodesRemoved = nodesRemoved;
+    }
+
+    public Set<String> getNodesAdded() {
+        return nodesRemoved;
+    }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
index 9556ae0..3aa6b05 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
@@ -18,7 +18,7 @@
 
     public static final FeedPolicy[] policies = new FeedPolicy[] { MISSTION_CRITICAL, ADVANCED, BASIC_MONITORED, BASIC };
 
-    public static final FeedPolicy DEFAULT_POLICY = BASIC_MONITORED;
+    public static final FeedPolicy DEFAULT_POLICY = ADVANCED;
 
     public static final String CONFIG_FEED_POLICY_KEY = "policy";
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
index ffa6c7e..bd154e6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
@@ -20,6 +20,7 @@
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -30,14 +31,19 @@
 import edu.uci.ics.asterix.metadata.MetadataException;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
+import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailure.FailureType;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
-import edu.uci.ics.hyracks.api.application.IClusterLifecycleListener;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
@@ -51,9 +57,7 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 
-//import edu.uci.ics.hyracks.api.job.JobInfo;
-
-public class FeedLifecycleListener implements IJobLifecycleListener, IClusterLifecycleListener, Serializable {
+public class FeedLifecycleListener implements IJobLifecycleListener, IClusterEventsSubscriber, Serializable {
 
     private static final long serialVersionUID = 1L;
 
@@ -69,6 +73,7 @@
         feedFailureNotificationHandler = new FeedFailureHandler(failureEventInbox);
         new Thread(feedJobNotificationHandler).start();
         new Thread(feedFailureNotificationHandler).start();
+        ClusterManager.INSTANCE.registerSubscriber(this);
     }
 
     private final FeedJobNotificationHandler feedJobNotificationHandler;
@@ -313,13 +318,7 @@
     }
 
     @Override
-    public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void notifyNodeFailure(Set<String> deadNodeIds) {
+    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
         Collection<FeedInfo> feedInfos = feedJobNotificationHandler.registeredFeeds.values();
         FeedFailureReport failureReport = new FeedFailureReport();
         for (FeedInfo feedInfo : feedInfos) {
@@ -342,7 +341,40 @@
                 }
             }
         }
-        failureEventInbox.add(failureReport);
+        return handleFailure(failureReport);
+    }
+
+    private Set<IClusterManagementWork> handleFailure(FeedFailureReport failureReport) {
+        Set<IClusterManagementWork> work = new HashSet<IClusterManagementWork>();
+        Map<String, Map<FeedInfo, List<FailureType>>> failureMap = new HashMap<String, Map<FeedInfo, List<FailureType>>>();
+        for (Map.Entry<FeedInfo, List<FeedFailure>> entry : failureReport.failures.entrySet()) {
+            FeedInfo feedInfo = entry.getKey();
+            List<FeedFailure> feedFailures = entry.getValue();
+            for (FeedFailure feedFailure : feedFailures) {
+                switch (feedFailure.failureType) {
+                    case COMPUTE_NODE:
+                    case INGESTION_NODE:
+                        Map<FeedInfo, List<FailureType>> failuresBecauseOfThisNode = failureMap.get(feedFailure.nodeId);
+                        if (failuresBecauseOfThisNode == null) {
+                            failuresBecauseOfThisNode = new HashMap<FeedInfo, List<FailureType>>();
+                            failuresBecauseOfThisNode.put(feedInfo, new ArrayList<FailureType>());
+                            failureMap.put(feedFailure.nodeId, failuresBecauseOfThisNode);
+                        }
+                        List<FailureType> feedF = failuresBecauseOfThisNode.get(feedInfo);
+                        if (feedF == null) {
+                            feedF = new ArrayList<FailureType>();
+                            failuresBecauseOfThisNode.put(feedInfo, feedF);
+                        }
+                        feedF.add(feedFailure.failureType);
+                        break;
+                    case STORAGE_NODE:
+                }
+            }
+        }
+
+        AddNodeWork addNodesWork = new AddNodeWork(failureMap.keySet().size(), this);
+        work.add(addNodesWork);
+        return work;
     }
 
     public static class FeedFailure {
@@ -361,4 +393,16 @@
             this.nodeId = nodeId;
         }
     }
+
+    @Override
+    public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
+        // TODO Auto-generated method stub
+
+    }
 }
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
index 7daf04c..6285385 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
@@ -14,12 +14,17 @@
  */
 package edu.uci.ics.asterix.om.util;
 
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+
 import edu.uci.ics.asterix.event.schema.cluster.Cluster;
 import edu.uci.ics.asterix.event.schema.cluster.Node;
 
@@ -29,78 +34,84 @@
 
 public class AsterixClusterProperties {
 
-	private static final Logger LOGGER = Logger
-			.getLogger(AsterixClusterProperties.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(AsterixClusterProperties.class.getName());
 
-	private static final String IO_DEVICES = "iodevices";
+    private static final String IO_DEVICES = "iodevices";
 
-	public static final AsterixClusterProperties INSTANCE = new AsterixClusterProperties();
+    public static final AsterixClusterProperties INSTANCE = new AsterixClusterProperties();
 
-	private Map<String, Map<String, String>> ncConfiguration = new HashMap<String, Map<String, String>>();
+    private Map<String, Map<String, String>> ncConfiguration = new HashMap<String, Map<String, String>>();
 
-	private final Cluster cluster;
+    public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
+    private final Cluster cluster;
+   
+    private AsterixClusterProperties() {
+        InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
+        try {
+            JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
+            Unmarshaller unmarshaller = ctx.createUnmarshaller();
+            cluster = (Cluster) unmarshaller.unmarshal(is);
+        
+        } catch (JAXBException e) {
+            throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE);
+        }
+    }
 
-	private AsterixClusterProperties() {
-		cluster = null;
-	}
+    public enum State {
+        ACTIVE,
+        UNUSABLE
+    }
 
-	public enum State {
-		ACTIVE, UNUSABLE
-	}
+    private State state = State.UNUSABLE;
 
-	private State state = State.UNUSABLE;
+    public void removeNCConfiguration(String nodeId) {
+        state = State.UNUSABLE;
+        ncConfiguration.remove(nodeId);
+    }
 
-	public void removeNCConfiguration(String nodeId) {
-		state = State.UNUSABLE;
-		ncConfiguration.remove(nodeId);
-	}
+    public void addNCConfiguration(String nodeId, Map<String, String> configuration) {
+        ncConfiguration.put(nodeId, configuration);
+        if (ncConfiguration.keySet().size() == AsterixAppContextInfo.getInstance().getMetadataProperties()
+                .getNodeNames().size()) {
+            state = State.ACTIVE;
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(" Registering configuration parameters for node id" + nodeId);
+        }
+    }
 
-	public void addNCConfiguration(String nodeId,
-			Map<String, String> configuration) {
-		ncConfiguration.put(nodeId, configuration);
-		if (ncConfiguration.keySet().size() == AsterixAppContextInfo
-				.getInstance().getMetadataProperties().getNodeNames().size()) {
-			state = State.ACTIVE;
-		}
-		if (LOGGER.isLoggable(Level.INFO)) {
-			LOGGER.info(" Registering configuration parameters for node id"
-					+ nodeId);
-		}
-	}
+    /**
+     * Returns the number of IO devices configured for a Node Controller
+     * 
+     * @param nodeId
+     *            unique identifier of the Node Controller
+     * @return number of IO devices. -1 if the node id is not valid. A node id
+     *         is not valid if it does not correspond to the set of registered
+     *         Node Controllers.
+     */
+    public int getNumberOfIODevices(String nodeId) {
+        Map<String, String> ncConfig = ncConfiguration.get(nodeId);
+        if (ncConfig == null) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Configuration parameters for nodeId" + nodeId
+                        + " not found. The node has not joined yet or has left.");
+            }
+            return -1;
+        }
+        return ncConfig.get(IO_DEVICES).split(",").length;
+    }
 
-	/**
-	 * Returns the number of IO devices configured for a Node Controller
-	 * 
-	 * @param nodeId
-	 *            unique identifier of the Node Controller
-	 * @return number of IO devices. -1 if the node id is not valid. A node id
-	 *         is not valid if it does not correspond to the set of registered
-	 *         Node Controllers.
-	 */
-	public int getNumberOfIODevices(String nodeId) {
-		Map<String, String> ncConfig = ncConfiguration.get(nodeId);
-		if (ncConfig == null) {
-			if (LOGGER.isLoggable(Level.WARNING)) {
-				LOGGER.warning("Configuration parameters for nodeId"
-						+ nodeId
-						+ " not found. The node has not joined yet or has left.");
-			}
-			return -1;
-		}
-		return ncConfig.get(IO_DEVICES).split(",").length;
-	}
+    public State getState() {
+        return state;
+    }
 
-	public State getState() {
-		return state;
-	}
+    public Cluster getCluster() {
+        return cluster;
+    }
 
-	public Cluster getCluster() {
-		return cluster;
-	}
-
-	public Node getAvailableSubstitutionNode() {
-		List<Node> subNodes = cluster.getSubstituteNodes().getNode();
-		return subNodes.isEmpty() ? null : subNodes.remove(0);
-	}
+    public Node getAvailableSubstitutionNode() {
+        List<Node> subNodes = cluster.getSubstituteNodes().getNode();
+        return subNodes.isEmpty() ? null : subNodes.remove(0);
+    }
 
 }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
index 33fa168..63ae8f2 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
@@ -29,9 +29,9 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 /**
- * TPS can be configured between 1 and 20,000  
+ * TPS can be configured between 1 and 20,000
+ * 
  * @author ramang
- *
  */
 public class SyntheticTwitterFeedAdapter extends PullBasedAdapter {
 
@@ -101,13 +101,20 @@
         private ARecordType outputRecordType;
         private int partition;
         private int tweetCount = 0;
+        private int tweetCountBeforeException = 0;
+        private int exceptionPeriod = -1;
 
-        public SyntheticTwitterFeedClient(Map<String, Object> configuration, ARecordType outputRecordType,
-                int partition) throws AsterixException {
+        public SyntheticTwitterFeedClient(Map<String, Object> configuration, ARecordType outputRecordType, int partition)
+                throws AsterixException {
             this.outputRecordType = outputRecordType;
-            String value = (String)configuration.get(KEY_DURATION);
+            String value = (String) configuration.get(KEY_DURATION);
             duration = value != null ? Integer.parseInt(value) : 60;
-            initializeTweetRate((String)configuration.get(KEY_TPS));
+            initializeTweetRate((String) configuration.get(KEY_TPS));
+            value = (String) configuration.get(KEY_EXCEPTION_PERIOD);
+            if (value != null) {
+                exceptionPeriod = Integer.parseInt(value);
+            }
+
             InitializationInfo info = new InitializationInfo();
             info.timeDurationInSecs = duration;
             DataGenerator.initialize(info);
@@ -206,6 +213,12 @@
                     tweetCount = 0;
                 }
             }
+            tweetCountBeforeException++;
+
+            if (tweetCountBeforeException == exceptionPeriod) {
+                tweetCountBeforeException = 0;
+                throw new AsterixException("Delibrate exception");
+            }
             return InflowState.DATA_AVAILABLE;
         }