Add Shutdown via API to Hyracks

This change adds a method to HyracksConnection called stopCluster().
When the CC recieves a message from this, it asks all NC tasks to close
and acknowledge that they have recieved the message and are closing.
If all NCs have closed, or a 10 second timeout elapses, the CC then
exits with a 0 return code if all NCs closed, or a 1 if some did
not acknowledge the shutdown request.

Change-Id: Iaf3d395dc7964e114d4929830f40063f58e0d5da
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/76
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Vinayak Borkar <vinayakb@gmail.com>
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 9ff741e..2c76fc1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -40,7 +40,8 @@
         WAIT_FOR_COMPLETION,
         GET_NODE_CONTROLLERS_INFO,
         CLI_DEPLOY_BINARY,
-        CLI_UNDEPLOY_BINARY
+        CLI_UNDEPLOY_BINARY,
+        CLUSTER_SHUTDOWN
     }
 
     public abstract static class Function implements Serializable {
@@ -279,4 +280,14 @@
             return deploymentId;
         }
     }
+
+    public static class ClusterShutdownFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CLUSTER_SHUTDOWN;
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 98f27f2..176c930 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.api.topology.ClusterTopology;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.RPCInterface;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
 
 public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterface {
     private final IIPCHandle ipcHandle;
@@ -111,4 +112,19 @@
                 jobId);
         return (JobInfo) rpci.call(ipcHandle, gjsf);
     }
+
+    @Override
+    public void stopCluster() throws Exception {
+        HyracksClientInterfaceFunctions.ClusterShutdownFunction csdf = new HyracksClientInterfaceFunctions.ClusterShutdownFunction();
+        rpci.call(ipcHandle, csdf);
+        //give the CC some time to do final settling after it returns our request
+        for (int i = 3; ipcHandle.isConnected() && i > 0; i--) {
+            synchronized (this) {
+                wait(3000l); //3sec
+            }
+        }
+        if (ipcHandle.isConnected()) {
+            throw new IPCException("CC refused to release connection after 9 seconds");
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 1916360..e6bfe47 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -182,4 +182,8 @@
     public JobInfo getJobInfo(JobId jobId) throws Exception {
         return hci.getJobInfo(jobId);
     }
+    @Override
+    public void stopCluster() throws Exception{
+        hci.stopCluster();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index 1e44e91..6d1e5a2 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -179,4 +179,9 @@
     public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
             EnumSet<JobFlag> jobFlags) throws Exception;
 
+    /**
+     * Shuts down all NCs and then the CC.
+     */
+    public void stopCluster() throws Exception;
+
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index d0eeada..0b35dd7 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -50,4 +50,6 @@
 
     public JobInfo getJobInfo(JobId jobId) throws Exception;
 
+    public void stopCluster() throws Exception;
+
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index e0bc9e2..c6ea57e 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -54,6 +54,7 @@
 import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
 import edu.uci.ics.hyracks.control.cc.work.CliDeployBinaryWork;
 import edu.uci.ics.hyracks.control.cc.work.CliUnDeployBinaryWork;
+import edu.uci.ics.hyracks.control.cc.work.ClusterShutdownWork;
 import edu.uci.ics.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
 import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
 import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
@@ -66,6 +67,7 @@
 import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
 import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
 import edu.uci.ics.hyracks.control.cc.work.NotifyDeployBinaryWork;
+import edu.uci.ics.hyracks.control.cc.work.NotifyShutdownWork;
 import edu.uci.ics.hyracks.control.cc.work.NotifyStateDumpResponse;
 import edu.uci.ics.hyracks.control.cc.work.RegisterNodeWork;
 import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
@@ -85,8 +87,10 @@
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
 import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
 import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.Function;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
 import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
 import edu.uci.ics.hyracks.control.common.logs.LogFile;
+import edu.uci.ics.hyracks.control.common.shutdown.ShutdownRun;
 import edu.uci.ics.hyracks.control.common.work.IPCResponder;
 import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 import edu.uci.ics.hyracks.control.common.work.WorkQueue;
@@ -143,6 +147,8 @@
 
     private final Map<String, StateDumpRun> stateDumpRunMap;
 
+    private ShutdownRun shutdownCallback;
+
     public ClusterControllerService(final CCConfig ccConfig) throws Exception {
         this.ccConfig = ccConfig;
         File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
@@ -250,11 +256,13 @@
     @Override
     public void stop() throws Exception {
         LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
-        executor.shutdownNow();
         webServer.stop();
         sweeper.cancel();
         workQueue.stop();
+        executor.shutdownNow();
+        clusterIPC.stop();
         jobLog.close();
+        clientIPC.stop();
         LOGGER.log(Level.INFO, "Stopped ClusterControllerService");
     }
 
@@ -429,6 +437,10 @@
                             new IPCResponder<DeploymentId>(handle, mid)));
                     return;
                 }
+                case CLUSTER_SHUTDOWN: {
+                    workQueue.schedule(new ClusterShutdownWork(ClusterControllerService.this, new IPCResponder<Boolean>(handle,mid)));
+                    return;
+                }
             }
             try {
                 handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId()));
@@ -562,6 +574,11 @@
                             dsrf.getStateDumpId(), dsrf.getState()));
                     return;
                 }
+                case SHUTDOWN_RESPONSE: {
+                    CCNCFunctions.ShutdownResponseFunction sdrf = (ShutdownResponseFunction) fn;
+                    workQueue.schedule(new NotifyShutdownWork(ClusterControllerService.this, sdrf.getNodeId()));
+                    return;
+                }
             }
             LOGGER.warning("Unknown function: " + fn.getFunctionId());
         }
@@ -606,4 +623,12 @@
     public synchronized void removeDeploymentRun(DeploymentId deploymentKey) {
         deploymentRunMap.remove(deploymentKey);
     }
+
+    public synchronized void setShutdownRun(ShutdownRun sRun){
+        shutdownCallback = sRun;
+    }
+    public synchronized ShutdownRun getShutdownRun(){
+        return shutdownCallback;
+    }
+
 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ClusterShutdownWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ClusterShutdownWork.java
new file mode 100644
index 0000000..c090e5e
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ClusterShutdownWork.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.shutdown.ShutdownRun;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public class ClusterShutdownWork extends SynchronizableWork {
+
+    private ClusterControllerService ccs;
+    private IResultCallback<Boolean> callback;
+    private static Logger LOGGER = Logger.getLogger(ClusterShutdownWork.class.getName());
+
+    public ClusterShutdownWork(ClusterControllerService ncs, IResultCallback<Boolean> callback) {
+        this.ccs = ncs;
+        this.callback = callback;
+    }
+
+    @Override
+    public void doRun() {
+        try {
+            if (ccs.getShutdownRun() != null) {
+                throw new IPCException("Shutdown in Progress");
+            }
+            Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap();
+            Set<String> nodeIds = new TreeSet<String>();
+            nodeIds.addAll(nodeControllerStateMap.keySet());
+            /**
+             * set up our listener for the node ACKs
+             */
+            final ShutdownRun shutdownStatus = new ShutdownRun(nodeIds);
+            // set up the CC to listen for it
+            ccs.setShutdownRun(shutdownStatus);
+            /**
+             * Shutdown all the nodes...
+             */
+            for (NodeControllerState ncs : nodeControllerStateMap.values()) {
+                ncs.getNodeController().shutDown();
+            }
+            ccs.getExecutor().execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        /**
+                         * wait for all our acks
+                         */
+                        boolean cleanShutdown = shutdownStatus.waitForCompletion();
+                        if (cleanShutdown) {
+                            callback.setValue(new Boolean(true));
+                            ccs.stop();
+                            LOGGER.info("JVM Exiting.. Bye!");
+                            Runtime rt = Runtime.getRuntime();
+                            rt.exit(0);
+                        }
+                        /**
+                         * best effort - just exit, user will have to kill misbehaving NCs
+                         */
+                        else {
+                            LOGGER.severe("Clean shutdown of NCs timed out- CC bailing out!");
+                            StringBuilder unresponsive = new StringBuilder();
+                            for (String s : shutdownStatus.getRemainingNodes()) {
+                                unresponsive.append(s + " ");
+                            }
+                            LOGGER.severe("Unresponsive Nodes: " + unresponsive);
+                            callback.setValue(new Boolean(false));
+                            ccs.stop();
+                            LOGGER.info("JVM Exiting.. Bye!");
+                            Runtime rt = Runtime.getRuntime();
+                            rt.exit(1);
+                        }
+                    } catch (Exception e) {
+                        callback.setException(e);
+                    }
+                }
+            });
+        } catch (Exception e) {
+            callback.setException(e);
+        }
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyShutdownWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyShutdownWork.java
new file mode 100644
index 0000000..aadfc09
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyShutdownWork.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
+import edu.uci.ics.hyracks.control.common.shutdown.ShutdownRun;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class NotifyShutdownWork extends SynchronizableWork {
+
+    private final ClusterControllerService ccs;
+    private final String nodeId;
+    private static Logger LOGGER = Logger.getLogger(NotifyShutdownWork.class.getName());
+
+    public NotifyShutdownWork(ClusterControllerService ccs, String nodeId) {
+        this.ccs = ccs;
+        this.nodeId = nodeId;
+
+    }
+
+    @Override
+    public void doRun() {
+        /** triggered remotely by a NC to notify that the NC is shutting down */
+        ShutdownRun sRun = ccs.getShutdownRun();
+        LOGGER.info("Recieved shutdown acknowledgement from NC ID:" + nodeId);
+        sRun.notifyShutdown(nodeId);
+    }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 659dc23..3c67e17 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -46,6 +46,8 @@
 
     public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception;
 
+    public void notifyShutdown(String nodeId) throws Exception;
+
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
 
     public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
@@ -56,8 +58,8 @@
 
     public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
 
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, boolean emptyResult, int partition,
-            int nPartitions, NetworkAddress networkAddress) throws Exception;
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+            boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws Exception;
 
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index ec95d58..d0631cd 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -46,4 +46,6 @@
     public void undeployBinary(DeploymentId deploymentId) throws Exception;
 
     public void dumpState(String stateDumpId) throws Exception;
+
+    public void shutDown() throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 1417df9..a1eb22e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -91,6 +91,8 @@
         DEPLOY_BINARY,
         NOTIFY_DEPLOY_BINARY,
         UNDEPLOY_BINARY,
+        SHUTDOWN_REQUEST,
+        SHUTDOWN_RESPONSE,
 
         STATE_DUMP_REQUEST,
         STATE_DUMP_RESPONSE,
@@ -983,6 +985,36 @@
         }
     }
 
+    public static class RequestShutdownFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.SHUTDOWN_REQUEST;
+        }
+
+    }
+
+    public static class NotifyShutdownFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+
+        public NotifyShutdownFunction(String nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.SHUTDOWN_RESPONSE;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+    }
+
     public static class NotifyDeployBinaryFunction extends Function {
         private static final long serialVersionUID = 1L;
 
@@ -1068,6 +1100,33 @@
         }
     }
 
+    public static class ShutdownRequestFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.SHUTDOWN_REQUEST;
+        }
+    }
+
+    public static class ShutdownResponseFunction extends Function {
+
+        private final String nodeId;
+
+        public ShutdownResponseFunction(String nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.SHUTDOWN_RESPONSE;
+        }
+    }
+
     public static class SerializerDeserializer implements IPayloadSerializerDeserializer {
         private final JavaSerializationBasedPayloadSerializerDeserializer javaSerde;
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 6bb1a93..14ed22e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -144,5 +144,10 @@
                 state);
         ipcHandle.send(-1, fn, null);
     }
+    @Override
+    public void notifyShutdown(String nodeId) throws Exception{
+        CCNCFunctions.ShutdownResponseFunction sdrf = new CCNCFunctions.ShutdownResponseFunction(nodeId);
+        ipcHandle.send(-1,sdrf,null);
+    }
 
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 2484a98..a9d3d2b 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -84,4 +84,10 @@
         CCNCFunctions.StateDumpRequestFunction dsf = new CCNCFunctions.StateDumpRequestFunction(stateDumpId);
         ipcHandle.send(-1, dsf, null);
     }
+
+    @Override
+    public void shutDown() throws Exception {
+        CCNCFunctions.ShutdownRequestFunction sdrf = new CCNCFunctions.ShutdownRequestFunction();
+        ipcHandle.send(-1, sdrf, null);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/shutdown/IShutdownStatusConditionVariable.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/shutdown/IShutdownStatusConditionVariable.java
new file mode 100644
index 0000000..a379197
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/shutdown/IShutdownStatusConditionVariable.java
@@ -0,0 +1,10 @@
+package edu.uci.ics.hyracks.control.common.shutdown;
+
+public interface IShutdownStatusConditionVariable {
+    /**
+     * @return true if all nodes ack shutdown
+     * @throws Exception
+     */
+    public boolean waitForCompletion() throws Exception;
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/shutdown/ShutdownRun.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/shutdown/ShutdownRun.java
new file mode 100644
index 0000000..c8b824f
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/shutdown/ShutdownRun.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.common.shutdown;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+public class ShutdownRun implements IShutdownStatusConditionVariable{
+
+    private final Set<String> shutdownNodeIds = new TreeSet<String>();
+    private boolean shutdownSuccess = false;
+    private static final int SHUTDOWN_TIMER_MS = 10000; //10 seconds
+
+    public ShutdownRun(Set<String> nodeIds) {
+        shutdownNodeIds.addAll(nodeIds);
+    }
+
+    /**
+     * Notify that a node is shutting down.
+     *
+     * @param nodeId
+     * @param status
+     */
+    public synchronized void notifyShutdown(String nodeId) {
+        shutdownNodeIds.remove(nodeId);
+        if (shutdownNodeIds.size() == 0) {
+            shutdownSuccess = true;
+            notifyAll();
+        }
+    }
+
+    @Override
+    public synchronized boolean waitForCompletion() throws Exception {
+        /*
+         * Either be woken up when we're done, or default to fail.
+         */
+        wait(SHUTDOWN_TIMER_MS);
+        return shutdownSuccess;
+    }
+    
+    public synchronized Set<String> getRemainingNodes(){
+        return shutdownNodeIds;
+    }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 5eec7bb..e9f55fb 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -82,6 +82,7 @@
 import edu.uci.ics.hyracks.control.nc.work.CleanupJobletWork;
 import edu.uci.ics.hyracks.control.nc.work.DeployBinaryWork;
 import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
+import edu.uci.ics.hyracks.control.nc.work.ShutdownWork;
 import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
 import edu.uci.ics.hyracks.control.nc.work.StateDumpWork;
 import edu.uci.ics.hyracks.control.nc.work.UnDeployBinaryWork;
@@ -549,6 +550,10 @@
                     queue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
                     return;
                 }
+                case SHUTDOWN_REQUEST: {
+                    queue.schedule(new ShutdownWork(NodeControllerService.this));
+                    return;
+                }
             }
             throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ShutdownWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ShutdownWork.java
new file mode 100644
index 0000000..93efd79
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ShutdownWork.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.net.URL;
+import java.util.List;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class ShutdownWork extends AbstractWork {
+
+    private final NodeControllerService ncs;
+    private static Logger LOGGER = Logger.getLogger(ShutdownWork.class.getName());
+
+    public ShutdownWork(NodeControllerService ncs) {
+        this.ncs = ncs;
+    }
+
+    @Override
+    public void run() {
+        try {
+            IClusterController ccs = ncs.getClusterController();
+            ccs.notifyShutdown(ncs.getId());
+            LOGGER.info("JVM Exiting.. Bye!");
+            //run the shutdown in a new thread, so we don't block this last work task
+            Thread t = new Thread() {
+                public void run() {
+                    try {
+                        ncs.stop();
+                    } catch (Exception e) {
+                        LOGGER.severe(e.getMessage());
+                    } finally {
+                        Runtime rt = Runtime.getRuntime();
+                        rt.exit(0);
+                    }
+                }
+            };
+            t.start();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}
diff --git a/hyracks/hyracks-examples/hyracks-shutdown-test/pom.xml b/hyracks/hyracks-examples/hyracks-shutdown-test/pom.xml
new file mode 100644
index 0000000..3f404de
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-shutdown-test/pom.xml
@@ -0,0 +1,182 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ ! 
+ !     http://www.apache.org/licenses/LICENSE-2.0
+ ! 
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks.examples.text</groupId>
+  <artifactId>hyracks-shutdown-test</artifactId>
+  <name>hyracks-shutdown-test</name>
+
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks-examples</artifactId>
+    <version>0.2.12-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>appassembler-maven-plugin</artifactId>
+        <version>1.3</version>
+        <executions>
+          <execution>
+            <configuration>
+              <programs>
+                <program>
+                  <mainClass>edu.uci.ics.hyracks.control.cc.CCDriver</mainClass>
+                  <name>hyrackscc</name>
+                </program>
+                <program>
+                  <mainClass>edu.uci.ics.hyracks.control.nc.NCDriver</mainClass>
+                  <name>hyracksnc</name>
+                </program>
+              </programs>
+              <repositoryLayout>flat</repositoryLayout>
+              <repositoryName>lib</repositoryName>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>assemble</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.2-beta-5</version>
+        <executions>
+          <execution>
+            <configuration>
+              <descriptors>
+                <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+              </descriptors>
+            </configuration>
+            <phase>package</phase>
+            <goals>
+              <goal>attached</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+      <groupId>edu.uci.ics.hyracks</groupId>
+      <artifactId>hyracks-virtualcluster-maven-plugin</artifactId>
+      <version>0.2.12-SNAPSHOT</version>
+        <configuration>
+          <hyracksServerHome>${basedir}/target/hyracks-shutdown-test-${project.version}-binary-assembly</hyracksServerHome>
+          <jvmOptions>${jvm.extraargs}</jvmOptions>
+        </configuration>
+        <executions>
+          <execution>
+            <id>hyracks-cc-start</id>
+            <phase>pre-integration-test</phase>
+            <goals>
+              <goal>start-cc</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>hyracks-nc1-start</id>
+            <phase>pre-integration-test</phase>
+            <goals>
+              <goal>start-nc</goal>
+            </goals>
+            <configuration>
+              <nodeId>NC1</nodeId>
+              <dataIpAddress>127.0.0.1</dataIpAddress>
+              <ccHost>localhost</ccHost>
+            </configuration>
+          </execution>
+          <execution>
+            <id>hyracks-nc2-start</id>
+            <phase>pre-integration-test</phase>
+            <goals>
+              <goal>start-nc</goal>
+            </goals>
+            <configuration>
+              <nodeId>NC2</nodeId>
+              <dataIpAddress>127.0.0.1</dataIpAddress>
+              <ccHost>localhost</ccHost>
+            </configuration>
+          </execution>
+          <execution>
+            <id>stop-services</id>
+            <phase>post-integration-test</phase>
+            <goals>
+              <goal>stop-services</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-compiler-plugin</artifactId>
+      <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+      <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-failsafe-plugin</artifactId>
+      <version>2.8.1</version>
+      <executions>
+        <execution>
+          <id>it</id>
+          <phase>integration-test</phase>
+          <goals>
+            <goal>integration-test</goal>
+          </goals>
+        </execution>
+      </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <dependency>
+      <!-- Dependency management inherited from top-level hyracks -->
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>edu.uci.ics.hyracks</groupId>
+      <artifactId>texthelper</artifactId>
+      <version>0.2.12-SNAPSHOT</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>edu.uci.ics.hyracks</groupId>
+      <artifactId>hyracks-control-cc</artifactId>
+      <version>0.2.12-SNAPSHOT</version>
+      <type>jar</type>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>edu.uci.ics.hyracks</groupId>
+      <artifactId>hyracks-control-nc</artifactId>
+      <version>0.2.12-SNAPSHOT</version>
+      <type>jar</type>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>edu.uci.ics.hyracks</groupId>
+      <artifactId>textclient</artifactId>
+      <version>0.2.12-SNAPSHOT</version>
+      <type>jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/hyracks/hyracks-examples/hyracks-shutdown-test/src/main/assembly/binary-assembly.xml b/hyracks/hyracks-examples/hyracks-shutdown-test/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..ae362ca
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-shutdown-test/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,37 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ ! 
+ !     http://www.apache.org/licenses/LICENSE-2.0
+ ! 
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<assembly>
+  <id>binary-assembly</id>
+  <formats>
+    <format>zip</format>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/appassembler/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>target/appassembler/lib</directory>
+      <outputDirectory>lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>docs</directory>
+      <outputDirectory>docs</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/edu/uci/ics/hyracks/examples/shutdown/test/ClusterShutdownIT.java b/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/edu/uci/ics/hyracks/examples/shutdown/test/ClusterShutdownIT.java
new file mode 100644
index 0000000..ee9bad5
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/edu/uci/ics/hyracks/examples/shutdown/test/ClusterShutdownIT.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.shutdown.test;
+
+import java.net.ServerSocket;
+import java.util.logging.Logger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public class ClusterShutdownIT {
+    private static Logger LOGGER = Logger.getLogger(ClusterShutdownIT.class.getName());
+    @Rule
+    public ExpectedException closeTwice = ExpectedException.none();
+    @Test
+    public void runShutdown() throws Exception {
+        IHyracksClientConnection hcc = new HyracksConnection("localhost", 1098);
+        hcc.stopCluster();
+        //what happens here...
+        closeTwice.expect(IPCException.class);
+        closeTwice.expectMessage("Cannot send on a closed handle");
+        hcc.stopCluster();
+        ServerSocket c = null;
+        ServerSocket s = null;
+        try {
+            c = new ServerSocket(1098);
+            //we should be able to bind to this 
+            s = new ServerSocket(1099);
+            //and we should be able to bind to this too 
+        } catch (Exception e) {
+            LOGGER.severe(e.getMessage());
+            throw e;
+        } finally {
+            s.close();
+            c.close();
+        }
+    }
+
+}
diff --git a/hyracks/hyracks-examples/pom.xml b/hyracks/hyracks-examples/pom.xml
index c65e77b..95381eb 100644
--- a/hyracks/hyracks-examples/pom.xml
+++ b/hyracks/hyracks-examples/pom.xml
@@ -30,5 +30,6 @@
     <module>btree-example</module>
     <module>hyracks-integration-tests</module>
     <module>hadoop-compat-example</module>
+    <module>hyracks-shutdown-test</module>
   </modules>
 </project>
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
index 214cc50..bb43a8c 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
@@ -26,4 +26,6 @@
     public void setAttachment(Object attachment);
 
     public Object getAttachment();
+    
+    public boolean isConnected();
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java
index 155e1ac..e2881ae 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java
@@ -17,6 +17,8 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
 public class RPCInterface implements IIPCI {
     private final Map<Long, Request> reqMap;
 
@@ -26,9 +28,13 @@
 
     public Object call(IIPCHandle handle, Object request) throws Exception {
         Request req;
+        long mid;
         synchronized (this) {
-            req = new Request();
-            long mid = handle.send(-1, request, null);
+            if (!handle.isConnected()) {
+                throw new IPCException("Cannot send on a closed handle");
+            }
+            req = new Request(handle, this);
+            mid = handle.send(-1, request, null);
             reqMap.put(mid, req);
         }
         return req.getResponse();
@@ -48,14 +54,19 @@
         }
     }
 
+    protected synchronized void removeRequest(Request r) {
+        reqMap.remove(r);
+    }
+
     private static class Request {
+
         private boolean pending;
 
         private Object result;
 
         private Exception exception;
 
-        Request() {
+        Request(IIPCHandle carrier, RPCInterface parent) {
             pending = true;
             result = null;
             exception = null;
@@ -82,5 +93,6 @@
             }
             return result;
         }
+
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index 0f76343..9e7198b 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -51,6 +51,9 @@
     public void start() {
         cMgr.start();
     }
+    public void stop() throws IOException{
+        cMgr.stop();
+    }
 
     public IIPCHandle getHandle(InetSocketAddress remoteAddress) throws IPCException {
         try {
@@ -61,7 +64,7 @@
             throw new IPCException(e);
         }
     }
-
+    
     IPayloadSerializerDeserializer getSerializerDeserializer() {
         return serde;
     }