Added mechanism for an application to subscribe to cluster membership events
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
index c4b7802..733c7d3 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -27,11 +27,9 @@
 public interface ICCApplicationContext extends IApplicationContext {
     /**
      * Sets the state that must be distributed by the infrastructure to all the
-     * NC application contexts. Any state set by calling this method in
-     * the {@link ICCApplicationEntryPoint#start(ICCApplicationContext, String[])} call
-     * is made available to all the {@link INCApplicationContext} objects at each Node Controller.
-     * The state is then available to be inspected by the application at the NC during or
-     * after the {@link INCBootstrap#start()} call.
+     * NC application contexts. Any state set by calling this method in the {@link ICCApplicationEntryPoint#start(ICCApplicationContext, String[])} call is made available to all the {@link INCApplicationContext} objects
+     * at each Node Controller. The state is then available to be inspected by
+     * the application at the NC during or after the {@link INCBootstrap#start()} call.
      * 
      * @param state
      *            The distributed state
@@ -47,6 +45,14 @@
     public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
 
     /**
+     * A listener that listens to Cluster Lifecycle events at the Cluster
+     * Controller.
+     * 
+     * @param jobLifecycleListener
+     */
+    public void addClusterLifecycleListener(IClusterLifecycleListener clusterLifecycleListener);
+
+    /**
      * Get the Cluster Controller Context.
      * 
      * @return The Cluster Controller Context.
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index 7e1581a..f884c6b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -19,9 +19,11 @@
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.application.IClusterLifecycleListener;
 import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
@@ -41,6 +43,7 @@
     protected IResultCallback<Object> deinitializationCallback;
 
     private List<IJobLifecycleListener> jobLifecycleListeners;
+    private List<IClusterLifecycleListener> clusterLifecycleListeners;
 
     public CCApplicationContext(ServerContext serverCtx, ICCContext ccContext) throws IOException {
         super(serverCtx);
@@ -48,6 +51,7 @@
         initPendingNodeIds = new HashSet<String>();
         deinitPendingNodeIds = new HashSet<String>();
         jobLifecycleListeners = new ArrayList<IJobLifecycleListener>();
+        clusterLifecycleListeners = new ArrayList<IClusterLifecycleListener>();
     }
 
     public ICCContext getCCContext() {
@@ -82,4 +86,21 @@
             l.notifyJobCreation(jobId, acggf);
         }
     }
+
+    @Override
+    public void addClusterLifecycleListener(IClusterLifecycleListener clusterLifecycleListener) {
+        clusterLifecycleListeners.add(clusterLifecycleListener);
+    }
+
+    public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) throws HyracksException {
+        for (IClusterLifecycleListener l : clusterLifecycleListeners) {
+            l.notifyNodeJoin(nodeId, ncConfiguration);
+        }
+    }
+
+    public void notifyNodeFailure(Set<String> deadNodeIds) {
+        for (IClusterLifecycleListener l : clusterLifecycleListeners) {
+            l.notifyNodeFailure(deadNodeIds);
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
index f7dd1d2..03d43ed 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -47,6 +48,7 @@
 
         IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
         CCNCFunctions.NodeRegistrationResult result = null;
+        Map<String, String> ncConfiguration = null;
         try {
             INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
 
@@ -58,6 +60,8 @@
             nodeMap.put(id, state);
             Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIpAddressNodeNameMap();
             String ipAddress = state.getNCConfig().dataIPAddress;
+            ncConfiguration = new HashMap<String, String>();
+            state.getNCConfig().toMap(ncConfiguration);
             Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
             if (nodes == null) {
                 nodes = new HashSet<String>();
@@ -75,5 +79,6 @@
             result = new CCNCFunctions.NodeRegistrationResult(null, e);
         }
         ncIPCHandle.send(-1, result, null);
+        ccs.getApplicationContext().notifyNodeJoin(id, ncConfiguration);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java
index 7255503..c82e264 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -65,6 +65,9 @@
                 }
             }
         }
+        if (deadNodes != null && deadNodes.size() > 0) {
+            ccs.getApplicationContext().notifyNodeFailure(deadNodes);
+        }
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index ec29592..69844da 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -16,86 +16,107 @@
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Map;
 
 import org.kohsuke.args4j.Argument;
 import org.kohsuke.args4j.Option;
 import org.kohsuke.args4j.spi.StopOptionHandler;
 
 public class NCConfig implements Serializable {
-    private static final long serialVersionUID = 1L;
+	private static final long serialVersionUID = 1L;
 
-    @Option(name = "-cc-host", usage = "Cluster Controller host name", required = true)
-    public String ccHost;
+	@Option(name = "-cc-host", usage = "Cluster Controller host name", required = true)
+	public String ccHost;
 
-    @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)")
-    public int ccPort = 1099;
+	@Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)")
+	public int ccPort = 1099;
 
-    @Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener", required = true)
-    public String clusterNetIPAddress;
+	@Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener", required = true)
+	public String clusterNetIPAddress;
 
-    @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster", required = true)
-    public String nodeId;
+	@Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster", required = true)
+	public String nodeId;
 
-    @Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
-    public String dataIPAddress;
+	@Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
+	public String dataIPAddress;
 
-    @Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
-    public String datasetIPAddress;
+	@Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
+	public String datasetIPAddress;
 
-    @Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
-    public String ioDevices = System.getProperty("java.io.tmpdir");
+	@Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
+	public String ioDevices = System.getProperty("java.io.tmpdir");
 
-    @Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
-    public int nNetThreads = 1;
+	@Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
+	public int nNetThreads = 1;
 
-    @Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
-    public int maxMemory = -1;
+	@Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
+	public int maxMemory = -1;
 
-    @Option(name = "-result-history-size", usage = "Limits the number of jobs whose results should be remembered by the system to the specified value. (default: 10)")
-    public int resultHistorySize = 100;
+	@Option(name = "-result-history-size", usage = "Limits the number of jobs whose results should be remembered by the system to the specified value. (default: 10)")
+	public int resultHistorySize = 100;
 
-    @Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
-    public int resultManagerMemory = -1;
+	@Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
+	public int resultManagerMemory = -1;
 
-    @Option(name = "-app-nc-main-class", usage = "Application NC Main Class")
-    public String appNCMainClass;
+	@Option(name = "-app-nc-main-class", usage = "Application NC Main Class")
+	public String appNCMainClass;
 
-    @Argument
-    @Option(name = "--", handler = StopOptionHandler.class)
-    public List<String> appArgs;
+	@Argument
+	@Option(name = "--", handler = StopOptionHandler.class)
+	public List<String> appArgs;
 
-    public void toCommandLine(List<String> cList) {
-        cList.add("-cc-host");
-        cList.add(ccHost);
-        cList.add("-cc-port");
-        cList.add(String.valueOf(ccPort));
-        cList.add("-cluster-net-ip-address");
-        cList.add(clusterNetIPAddress);
-        cList.add("-node-id");
-        cList.add(nodeId);
-        cList.add("-data-ip-address");
-        cList.add(dataIPAddress);
-        cList.add(datasetIPAddress);
-        cList.add("-iodevices");
-        cList.add(ioDevices);
-        cList.add("-net-thread-count");
-        cList.add(String.valueOf(nNetThreads));
-        cList.add("-max-memory");
-        cList.add(String.valueOf(maxMemory));
-        cList.add("-result-history-size");
-        cList.add(String.valueOf(resultHistorySize));
-        cList.add("-result-manager-memory");
-        cList.add(String.valueOf(resultManagerMemory));
+	public void toCommandLine(List<String> cList) {
+		cList.add("-cc-host");
+		cList.add(ccHost);
+		cList.add("-cc-port");
+		cList.add(String.valueOf(ccPort));
+		cList.add("-cluster-net-ip-address");
+		cList.add(clusterNetIPAddress);
+		cList.add("-node-id");
+		cList.add(nodeId);
+		cList.add("-data-ip-address");
+		cList.add(dataIPAddress);
+		cList.add(datasetIPAddress);
+		cList.add("-iodevices");
+		cList.add(ioDevices);
+		cList.add("-net-thread-count");
+		cList.add(String.valueOf(nNetThreads));
+		cList.add("-max-memory");
+		cList.add(String.valueOf(maxMemory));
+		cList.add("-result-history-size");
+		cList.add(String.valueOf(resultHistorySize));
+		cList.add("-result-manager-memory");
+		cList.add(String.valueOf(resultManagerMemory));
 
-        if (appNCMainClass != null) {
-            cList.add("-app-nc-main-class");
-            cList.add(appNCMainClass);
-        }
-        if (appArgs != null && !appArgs.isEmpty()) {
-            cList.add("--");
-            for (String appArg : appArgs) {
-                cList.add(appArg);
-            }
-        }
-    }
+		if (appNCMainClass != null) {
+			cList.add("-app-nc-main-class");
+			cList.add(appNCMainClass);
+		}
+		if (appArgs != null && !appArgs.isEmpty()) {
+			cList.add("--");
+			for (String appArg : appArgs) {
+				cList.add(appArg);
+			}
+		}
+	}
+
+	public void toMap(Map<String, String> configuration) {
+		configuration.put("cc-host", ccHost);
+		configuration.put("cc-port", (String.valueOf(ccPort)));
+		configuration.put("cluster-net-ip-address", clusterNetIPAddress);
+		configuration.put("node-id", nodeId);
+		configuration.put("data-ip-address", dataIPAddress);
+		configuration.put("iodevices", ioDevices);
+		configuration.put("net-thread-count", String.valueOf(nNetThreads));
+		configuration.put("max-memory", String.valueOf(maxMemory));
+		configuration.put("result-history-size",
+				String.valueOf(resultHistorySize));
+		configuration.put("result-manager-memory",
+				String.valueOf(resultManagerMemory));
+
+		if (appNCMainClass != null) {
+			configuration.put("app-nc-main-class", appNCMainClass);
+		}
+
+	}
 }