Add Shutdown via API to Hyracks
This change adds a method to HyracksConnection called stopCluster().
When the CC recieves a message from this, it asks all NC tasks to close
and acknowledge that they have recieved the message and are closing.
If all NCs have closed, or a 10 second timeout elapses, the CC then
exits with a 0 return code if all NCs closed, or a 1 if some did
not acknowledge the shutdown request.
Change-Id: Iaf3d395dc7964e114d4929830f40063f58e0d5da
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/76
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Vinayak Borkar <vinayakb@gmail.com>
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 9ff741e..2c76fc1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -40,7 +40,8 @@
WAIT_FOR_COMPLETION,
GET_NODE_CONTROLLERS_INFO,
CLI_DEPLOY_BINARY,
- CLI_UNDEPLOY_BINARY
+ CLI_UNDEPLOY_BINARY,
+ CLUSTER_SHUTDOWN
}
public abstract static class Function implements Serializable {
@@ -279,4 +280,14 @@
return deploymentId;
}
}
+
+ public static class ClusterShutdownFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CLUSTER_SHUTDOWN;
+ }
+ }
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 98f27f2..176c930 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.api.RPCInterface;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterface {
private final IIPCHandle ipcHandle;
@@ -111,4 +112,19 @@
jobId);
return (JobInfo) rpci.call(ipcHandle, gjsf);
}
+
+ @Override
+ public void stopCluster() throws Exception {
+ HyracksClientInterfaceFunctions.ClusterShutdownFunction csdf = new HyracksClientInterfaceFunctions.ClusterShutdownFunction();
+ rpci.call(ipcHandle, csdf);
+ //give the CC some time to do final settling after it returns our request
+ for (int i = 3; ipcHandle.isConnected() && i > 0; i--) {
+ synchronized (this) {
+ wait(3000l); //3sec
+ }
+ }
+ if (ipcHandle.isConnected()) {
+ throw new IPCException("CC refused to release connection after 9 seconds");
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 1916360..e6bfe47 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -182,4 +182,8 @@
public JobInfo getJobInfo(JobId jobId) throws Exception {
return hci.getJobInfo(jobId);
}
+ @Override
+ public void stopCluster() throws Exception{
+ hci.stopCluster();
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index 1e44e91..6d1e5a2 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -179,4 +179,9 @@
public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
EnumSet<JobFlag> jobFlags) throws Exception;
+ /**
+ * Shuts down all NCs and then the CC.
+ */
+ public void stopCluster() throws Exception;
+
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index d0eeada..0b35dd7 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -50,4 +50,6 @@
public JobInfo getJobInfo(JobId jobId) throws Exception;
+ public void stopCluster() throws Exception;
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index e0bc9e2..c6ea57e 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -54,6 +54,7 @@
import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
import edu.uci.ics.hyracks.control.cc.work.CliDeployBinaryWork;
import edu.uci.ics.hyracks.control.cc.work.CliUnDeployBinaryWork;
+import edu.uci.ics.hyracks.control.cc.work.ClusterShutdownWork;
import edu.uci.ics.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
@@ -66,6 +67,7 @@
import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
import edu.uci.ics.hyracks.control.cc.work.NotifyDeployBinaryWork;
+import edu.uci.ics.hyracks.control.cc.work.NotifyShutdownWork;
import edu.uci.ics.hyracks.control.cc.work.NotifyStateDumpResponse;
import edu.uci.ics.hyracks.control.cc.work.RegisterNodeWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
@@ -85,8 +87,10 @@
import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.Function;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
import edu.uci.ics.hyracks.control.common.logs.LogFile;
+import edu.uci.ics.hyracks.control.common.shutdown.ShutdownRun;
import edu.uci.ics.hyracks.control.common.work.IPCResponder;
import edu.uci.ics.hyracks.control.common.work.IResultCallback;
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
@@ -143,6 +147,8 @@
private final Map<String, StateDumpRun> stateDumpRunMap;
+ private ShutdownRun shutdownCallback;
+
public ClusterControllerService(final CCConfig ccConfig) throws Exception {
this.ccConfig = ccConfig;
File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
@@ -250,11 +256,13 @@
@Override
public void stop() throws Exception {
LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
- executor.shutdownNow();
webServer.stop();
sweeper.cancel();
workQueue.stop();
+ executor.shutdownNow();
+ clusterIPC.stop();
jobLog.close();
+ clientIPC.stop();
LOGGER.log(Level.INFO, "Stopped ClusterControllerService");
}
@@ -429,6 +437,10 @@
new IPCResponder<DeploymentId>(handle, mid)));
return;
}
+ case CLUSTER_SHUTDOWN: {
+ workQueue.schedule(new ClusterShutdownWork(ClusterControllerService.this, new IPCResponder<Boolean>(handle,mid)));
+ return;
+ }
}
try {
handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId()));
@@ -562,6 +574,11 @@
dsrf.getStateDumpId(), dsrf.getState()));
return;
}
+ case SHUTDOWN_RESPONSE: {
+ CCNCFunctions.ShutdownResponseFunction sdrf = (ShutdownResponseFunction) fn;
+ workQueue.schedule(new NotifyShutdownWork(ClusterControllerService.this, sdrf.getNodeId()));
+ return;
+ }
}
LOGGER.warning("Unknown function: " + fn.getFunctionId());
}
@@ -606,4 +623,12 @@
public synchronized void removeDeploymentRun(DeploymentId deploymentKey) {
deploymentRunMap.remove(deploymentKey);
}
+
+ public synchronized void setShutdownRun(ShutdownRun sRun){
+ shutdownCallback = sRun;
+ }
+ public synchronized ShutdownRun getShutdownRun(){
+ return shutdownCallback;
+ }
+
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ClusterShutdownWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ClusterShutdownWork.java
new file mode 100644
index 0000000..c090e5e
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ClusterShutdownWork.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.shutdown.ShutdownRun;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public class ClusterShutdownWork extends SynchronizableWork {
+
+ private ClusterControllerService ccs;
+ private IResultCallback<Boolean> callback;
+ private static Logger LOGGER = Logger.getLogger(ClusterShutdownWork.class.getName());
+
+ public ClusterShutdownWork(ClusterControllerService ncs, IResultCallback<Boolean> callback) {
+ this.ccs = ncs;
+ this.callback = callback;
+ }
+
+ @Override
+ public void doRun() {
+ try {
+ if (ccs.getShutdownRun() != null) {
+ throw new IPCException("Shutdown in Progress");
+ }
+ Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap();
+ Set<String> nodeIds = new TreeSet<String>();
+ nodeIds.addAll(nodeControllerStateMap.keySet());
+ /**
+ * set up our listener for the node ACKs
+ */
+ final ShutdownRun shutdownStatus = new ShutdownRun(nodeIds);
+ // set up the CC to listen for it
+ ccs.setShutdownRun(shutdownStatus);
+ /**
+ * Shutdown all the nodes...
+ */
+ for (NodeControllerState ncs : nodeControllerStateMap.values()) {
+ ncs.getNodeController().shutDown();
+ }
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ /**
+ * wait for all our acks
+ */
+ boolean cleanShutdown = shutdownStatus.waitForCompletion();
+ if (cleanShutdown) {
+ callback.setValue(new Boolean(true));
+ ccs.stop();
+ LOGGER.info("JVM Exiting.. Bye!");
+ Runtime rt = Runtime.getRuntime();
+ rt.exit(0);
+ }
+ /**
+ * best effort - just exit, user will have to kill misbehaving NCs
+ */
+ else {
+ LOGGER.severe("Clean shutdown of NCs timed out- CC bailing out!");
+ StringBuilder unresponsive = new StringBuilder();
+ for (String s : shutdownStatus.getRemainingNodes()) {
+ unresponsive.append(s + " ");
+ }
+ LOGGER.severe("Unresponsive Nodes: " + unresponsive);
+ callback.setValue(new Boolean(false));
+ ccs.stop();
+ LOGGER.info("JVM Exiting.. Bye!");
+ Runtime rt = Runtime.getRuntime();
+ rt.exit(1);
+ }
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+ });
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyShutdownWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyShutdownWork.java
new file mode 100644
index 0000000..aadfc09
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyShutdownWork.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
+import edu.uci.ics.hyracks.control.common.shutdown.ShutdownRun;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class NotifyShutdownWork extends SynchronizableWork {
+
+ private final ClusterControllerService ccs;
+ private final String nodeId;
+ private static Logger LOGGER = Logger.getLogger(NotifyShutdownWork.class.getName());
+
+ public NotifyShutdownWork(ClusterControllerService ccs, String nodeId) {
+ this.ccs = ccs;
+ this.nodeId = nodeId;
+
+ }
+
+ @Override
+ public void doRun() {
+ /** triggered remotely by a NC to notify that the NC is shutting down */
+ ShutdownRun sRun = ccs.getShutdownRun();
+ LOGGER.info("Recieved shutdown acknowledgement from NC ID:" + nodeId);
+ sRun.notifyShutdown(nodeId);
+ }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 659dc23..3c67e17 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -46,6 +46,8 @@
public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception;
+ public void notifyShutdown(String nodeId) throws Exception;
+
public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
@@ -56,8 +58,8 @@
public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, boolean emptyResult, int partition,
- int nPartitions, NetworkAddress networkAddress) throws Exception;
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws Exception;
public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index ec95d58..d0631cd 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -46,4 +46,6 @@
public void undeployBinary(DeploymentId deploymentId) throws Exception;
public void dumpState(String stateDumpId) throws Exception;
+
+ public void shutDown() throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 1417df9..a1eb22e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -91,6 +91,8 @@
DEPLOY_BINARY,
NOTIFY_DEPLOY_BINARY,
UNDEPLOY_BINARY,
+ SHUTDOWN_REQUEST,
+ SHUTDOWN_RESPONSE,
STATE_DUMP_REQUEST,
STATE_DUMP_RESPONSE,
@@ -983,6 +985,36 @@
}
}
+ public static class RequestShutdownFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.SHUTDOWN_REQUEST;
+ }
+
+ }
+
+ public static class NotifyShutdownFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+
+ public NotifyShutdownFunction(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.SHUTDOWN_RESPONSE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ }
+
public static class NotifyDeployBinaryFunction extends Function {
private static final long serialVersionUID = 1L;
@@ -1068,6 +1100,33 @@
}
}
+ public static class ShutdownRequestFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.SHUTDOWN_REQUEST;
+ }
+ }
+
+ public static class ShutdownResponseFunction extends Function {
+
+ private final String nodeId;
+
+ public ShutdownResponseFunction(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.SHUTDOWN_RESPONSE;
+ }
+ }
+
public static class SerializerDeserializer implements IPayloadSerializerDeserializer {
private final JavaSerializationBasedPayloadSerializerDeserializer javaSerde;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 6bb1a93..14ed22e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -144,5 +144,10 @@
state);
ipcHandle.send(-1, fn, null);
}
+ @Override
+ public void notifyShutdown(String nodeId) throws Exception{
+ CCNCFunctions.ShutdownResponseFunction sdrf = new CCNCFunctions.ShutdownResponseFunction(nodeId);
+ ipcHandle.send(-1,sdrf,null);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 2484a98..a9d3d2b 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -84,4 +84,10 @@
CCNCFunctions.StateDumpRequestFunction dsf = new CCNCFunctions.StateDumpRequestFunction(stateDumpId);
ipcHandle.send(-1, dsf, null);
}
+
+ @Override
+ public void shutDown() throws Exception {
+ CCNCFunctions.ShutdownRequestFunction sdrf = new CCNCFunctions.ShutdownRequestFunction();
+ ipcHandle.send(-1, sdrf, null);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/shutdown/IShutdownStatusConditionVariable.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/shutdown/IShutdownStatusConditionVariable.java
new file mode 100644
index 0000000..a379197
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/shutdown/IShutdownStatusConditionVariable.java
@@ -0,0 +1,10 @@
+package edu.uci.ics.hyracks.control.common.shutdown;
+
+public interface IShutdownStatusConditionVariable {
+ /**
+ * @return true if all nodes ack shutdown
+ * @throws Exception
+ */
+ public boolean waitForCompletion() throws Exception;
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/shutdown/ShutdownRun.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/shutdown/ShutdownRun.java
new file mode 100644
index 0000000..c8b824f
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/shutdown/ShutdownRun.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.common.shutdown;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+public class ShutdownRun implements IShutdownStatusConditionVariable{
+
+ private final Set<String> shutdownNodeIds = new TreeSet<String>();
+ private boolean shutdownSuccess = false;
+ private static final int SHUTDOWN_TIMER_MS = 10000; //10 seconds
+
+ public ShutdownRun(Set<String> nodeIds) {
+ shutdownNodeIds.addAll(nodeIds);
+ }
+
+ /**
+ * Notify that a node is shutting down.
+ *
+ * @param nodeId
+ * @param status
+ */
+ public synchronized void notifyShutdown(String nodeId) {
+ shutdownNodeIds.remove(nodeId);
+ if (shutdownNodeIds.size() == 0) {
+ shutdownSuccess = true;
+ notifyAll();
+ }
+ }
+
+ @Override
+ public synchronized boolean waitForCompletion() throws Exception {
+ /*
+ * Either be woken up when we're done, or default to fail.
+ */
+ wait(SHUTDOWN_TIMER_MS);
+ return shutdownSuccess;
+ }
+
+ public synchronized Set<String> getRemainingNodes(){
+ return shutdownNodeIds;
+ }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 5eec7bb..e9f55fb 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -82,6 +82,7 @@
import edu.uci.ics.hyracks.control.nc.work.CleanupJobletWork;
import edu.uci.ics.hyracks.control.nc.work.DeployBinaryWork;
import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
+import edu.uci.ics.hyracks.control.nc.work.ShutdownWork;
import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
import edu.uci.ics.hyracks.control.nc.work.StateDumpWork;
import edu.uci.ics.hyracks.control.nc.work.UnDeployBinaryWork;
@@ -549,6 +550,10 @@
queue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
return;
}
+ case SHUTDOWN_REQUEST: {
+ queue.schedule(new ShutdownWork(NodeControllerService.this));
+ return;
+ }
}
throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ShutdownWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ShutdownWork.java
new file mode 100644
index 0000000..93efd79
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ShutdownWork.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.net.URL;
+import java.util.List;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class ShutdownWork extends AbstractWork {
+
+ private final NodeControllerService ncs;
+ private static Logger LOGGER = Logger.getLogger(ShutdownWork.class.getName());
+
+ public ShutdownWork(NodeControllerService ncs) {
+ this.ncs = ncs;
+ }
+
+ @Override
+ public void run() {
+ try {
+ IClusterController ccs = ncs.getClusterController();
+ ccs.notifyShutdown(ncs.getId());
+ LOGGER.info("JVM Exiting.. Bye!");
+ //run the shutdown in a new thread, so we don't block this last work task
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ ncs.stop();
+ } catch (Exception e) {
+ LOGGER.severe(e.getMessage());
+ } finally {
+ Runtime rt = Runtime.getRuntime();
+ rt.exit(0);
+ }
+ }
+ };
+ t.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-examples/hyracks-shutdown-test/pom.xml b/hyracks/hyracks-examples/hyracks-shutdown-test/pom.xml
new file mode 100644
index 0000000..3f404de
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-shutdown-test/pom.xml
@@ -0,0 +1,182 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks.examples.text</groupId>
+ <artifactId>hyracks-shutdown-test</artifactId>
+ <name>hyracks-shutdown-test</name>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-examples</artifactId>
+ <version>0.2.12-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <version>1.3</version>
+ <executions>
+ <execution>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.control.cc.CCDriver</mainClass>
+ <name>hyrackscc</name>
+ </program>
+ <program>
+ <mainClass>edu.uci.ics.hyracks.control.nc.NCDriver</mainClass>
+ <name>hyracksnc</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-virtualcluster-maven-plugin</artifactId>
+ <version>0.2.12-SNAPSHOT</version>
+ <configuration>
+ <hyracksServerHome>${basedir}/target/hyracks-shutdown-test-${project.version}-binary-assembly</hyracksServerHome>
+ <jvmOptions>${jvm.extraargs}</jvmOptions>
+ </configuration>
+ <executions>
+ <execution>
+ <id>hyracks-cc-start</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>start-cc</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>hyracks-nc1-start</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>start-nc</goal>
+ </goals>
+ <configuration>
+ <nodeId>NC1</nodeId>
+ <dataIpAddress>127.0.0.1</dataIpAddress>
+ <ccHost>localhost</ccHost>
+ </configuration>
+ </execution>
+ <execution>
+ <id>hyracks-nc2-start</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>start-nc</goal>
+ </goals>
+ <configuration>
+ <nodeId>NC2</nodeId>
+ <dataIpAddress>127.0.0.1</dataIpAddress>
+ <ccHost>localhost</ccHost>
+ </configuration>
+ </execution>
+ <execution>
+ <id>stop-services</id>
+ <phase>post-integration-test</phase>
+ <goals>
+ <goal>stop-services</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.8.1</version>
+ <executions>
+ <execution>
+ <id>it</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>integration-test</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <!-- Dependency management inherited from top-level hyracks -->
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>texthelper</artifactId>
+ <version>0.2.12-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ <version>0.2.12-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.12-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>textclient</artifactId>
+ <version>0.2.12-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-examples/hyracks-shutdown-test/src/main/assembly/binary-assembly.xml b/hyracks/hyracks-examples/hyracks-shutdown-test/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..ae362ca
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-shutdown-test/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,37 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>docs</directory>
+ <outputDirectory>docs</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/edu/uci/ics/hyracks/examples/shutdown/test/ClusterShutdownIT.java b/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/edu/uci/ics/hyracks/examples/shutdown/test/ClusterShutdownIT.java
new file mode 100644
index 0000000..ee9bad5
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-shutdown-test/src/test/java/edu/uci/ics/hyracks/examples/shutdown/test/ClusterShutdownIT.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.examples.shutdown.test;
+
+import java.net.ServerSocket;
+import java.util.logging.Logger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public class ClusterShutdownIT {
+ private static Logger LOGGER = Logger.getLogger(ClusterShutdownIT.class.getName());
+ @Rule
+ public ExpectedException closeTwice = ExpectedException.none();
+ @Test
+ public void runShutdown() throws Exception {
+ IHyracksClientConnection hcc = new HyracksConnection("localhost", 1098);
+ hcc.stopCluster();
+ //what happens here...
+ closeTwice.expect(IPCException.class);
+ closeTwice.expectMessage("Cannot send on a closed handle");
+ hcc.stopCluster();
+ ServerSocket c = null;
+ ServerSocket s = null;
+ try {
+ c = new ServerSocket(1098);
+ //we should be able to bind to this
+ s = new ServerSocket(1099);
+ //and we should be able to bind to this too
+ } catch (Exception e) {
+ LOGGER.severe(e.getMessage());
+ throw e;
+ } finally {
+ s.close();
+ c.close();
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-examples/pom.xml b/hyracks/hyracks-examples/pom.xml
index c65e77b..95381eb 100644
--- a/hyracks/hyracks-examples/pom.xml
+++ b/hyracks/hyracks-examples/pom.xml
@@ -30,5 +30,6 @@
<module>btree-example</module>
<module>hyracks-integration-tests</module>
<module>hadoop-compat-example</module>
+ <module>hyracks-shutdown-test</module>
</modules>
</project>
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
index 214cc50..bb43a8c 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
@@ -26,4 +26,6 @@
public void setAttachment(Object attachment);
public Object getAttachment();
+
+ public boolean isConnected();
}
\ No newline at end of file
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java
index 155e1ac..e2881ae 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java
@@ -17,6 +17,8 @@
import java.util.HashMap;
import java.util.Map;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
public class RPCInterface implements IIPCI {
private final Map<Long, Request> reqMap;
@@ -26,9 +28,13 @@
public Object call(IIPCHandle handle, Object request) throws Exception {
Request req;
+ long mid;
synchronized (this) {
- req = new Request();
- long mid = handle.send(-1, request, null);
+ if (!handle.isConnected()) {
+ throw new IPCException("Cannot send on a closed handle");
+ }
+ req = new Request(handle, this);
+ mid = handle.send(-1, request, null);
reqMap.put(mid, req);
}
return req.getResponse();
@@ -48,14 +54,19 @@
}
}
+ protected synchronized void removeRequest(Request r) {
+ reqMap.remove(r);
+ }
+
private static class Request {
+
private boolean pending;
private Object result;
private Exception exception;
- Request() {
+ Request(IIPCHandle carrier, RPCInterface parent) {
pending = true;
result = null;
exception = null;
@@ -82,5 +93,6 @@
}
return result;
}
+
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index 0f76343..9e7198b 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -51,6 +51,9 @@
public void start() {
cMgr.start();
}
+ public void stop() throws IOException{
+ cMgr.stop();
+ }
public IIPCHandle getHandle(InetSocketAddress remoteAddress) throws IPCException {
try {
@@ -61,7 +64,7 @@
throw new IPCException(e);
}
}
-
+
IPayloadSerializerDeserializer getSerializerDeserializer() {
return serde;
}