add undeployment
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_dynamic_deployment@3368 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 74cee5f..0467eae 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -38,7 +38,8 @@
GET_DATASET_RESULT_LOCATIONS,
WAIT_FOR_COMPLETION,
GET_NODE_CONTROLLERS_INFO,
- CLI_DEPLOY_BINARY
+ CLI_DEPLOY_BINARY,
+ CLI_UNDEPLOY_BINARY
}
public abstract static class Function implements Serializable {
@@ -240,4 +241,22 @@
return deploymentId;
}
}
+
+ public static class CliUnDeployBinaryFunction extends Function {
+ private static final long serialVersionUID = 1L;
+ private final DeploymentId deploymentId;
+
+ public CliUnDeployBinaryFunction(DeploymentId deploymentId) {
+ this.deploymentId = deploymentId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CLI_UNDEPLOY_BINARY;
+ }
+
+ public DeploymentId getDeploymentId() {
+ return deploymentId;
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 70c4b8c..8f1c401 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -103,4 +103,11 @@
binaryURLs, deploymentId);
rpci.call(ipcHandle, dbf);
}
+
+ @Override
+ public void unDeployBinary(DeploymentId deploymentId) throws Exception {
+ HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction dbf = new HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction(
+ deploymentId);
+ rpci.call(ipcHandle, dbf);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index e70a206..9c0ba3b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -45,5 +45,7 @@
public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception;
+ public void unDeployBinary(DeploymentId deploymentId) throws Exception;
+
public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index d92315d..789e48f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -51,6 +51,7 @@
import edu.uci.ics.hyracks.control.cc.web.WebServer;
import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
import edu.uci.ics.hyracks.control.cc.work.CliDeployBinaryWork;
+import edu.uci.ics.hyracks.control.cc.work.CliUnDeployBinaryWork;
import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
@@ -388,6 +389,13 @@
.getDeploymentId(), new IPCResponder<DeploymentId>(handle, mid)));
return;
}
+
+ case CLI_UNDEPLOY_BINARY: {
+ HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf = (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
+ workQueue
+ .schedule(new CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId()));
+ return;
+ }
}
try {
handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId()));
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
index 26b30c5..a4f34af 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
@@ -54,7 +54,7 @@
* Deploy for the cluster controller
*/
DeploymentUtils.deploy(deploymentId, binaryURLs, ccs.getApplicationContext()
- .getJobSerializerDeserializerContainer());
+ .getJobSerializerDeserializerContainer(), ccs.getServerContext());
/**
* Deploy for the node controllers
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
new file mode 100644
index 0000000..584ab7c
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class CliUnDeployBinaryWork extends AbstractWork {
+
+ private ClusterControllerService ccs;
+ private DeploymentId deploymentId;
+
+ public CliUnDeployBinaryWork(ClusterControllerService ncs, DeploymentId deploymentId) {
+ this.ccs = ncs;
+ this.deploymentId = deploymentId;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (deploymentId == null) {
+ deploymentId = new DeploymentId(UUID.randomUUID().toString());
+ }
+ /**
+ * Deploy for the cluster controller
+ */
+ DeploymentUtils.undeploy(deploymentId, ccs.getApplicationContext().getJobSerializerDeserializerContainer());
+
+ /**
+ * Deploy for the node controllers
+ */
+ Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap();
+
+ Set<String> nodeIds = new TreeSet<String>();
+ for (String nc : nodeControllerStateMap.keySet()) {
+ nodeIds.add(nc);
+ }
+ DeploymentRun dRun = new DeploymentRun(nodeIds);
+ ccs.addDeploymentRun(deploymentId, dRun);
+
+ /***
+ * deploy binaries to each node controller
+ */
+ for (NodeControllerState ncs : nodeControllerStateMap.values()) {
+ ncs.getNodeController().undeployBinary(deploymentId);
+ }
+
+ /**
+ * wait for completion
+ */
+ dRun.waitForCompletion();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 49ea21e..cf27740 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -31,7 +31,8 @@
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
public interface INodeController {
- public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+ public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
+ List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception;
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
@@ -41,4 +42,6 @@
public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception;
public void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception;
+
+ public void undeployBinary(DeploymentId deploymentId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index 68fae70..ab84065 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -1,25 +1,40 @@
package edu.uci.ics.hyracks.control.common.deployment;
+import java.io.File;
import java.net.URL;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.io.FileUtils;
+
import edu.uci.ics.hyracks.api.application.IApplicationContext;
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializer;
import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
public class DeploymentUtils {
- public static void deploy(DeploymentId deploymentId, List<URL> urls, IJobSerializerDeserializerContainer container)
+ private static final String DEPLOYMENT = "deployment";
+
+ public static void undeploy(DeploymentId deploymentId, IJobSerializerDeserializerContainer container)
throws HyracksException {
+ container.removeJobSerializerDeserializer(deploymentId);
+ }
+
+ public static void deploy(DeploymentId deploymentId, List<URL> urls, IJobSerializerDeserializerContainer container,
+ ServerContext ctx) throws HyracksException {
IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeerializer(deploymentId);
if (jobSerDe == null) {
jobSerDe = new ClassLoaderJobSerializerDeserializer();
container.addJobSerializerDeserializer(deploymentId, jobSerDe);
}
- jobSerDe.addClassPathURLs(urls);
+ String rootDir = ctx.getBaseDir().toString();
+ String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + DEPLOYMENT : rootDir + File.separator
+ + DEPLOYMENT;
+ jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir));
}
public static Object deserialize(byte[] bytes, DeploymentId deploymentId, IApplicationContext appCtx)
@@ -35,4 +50,25 @@
}
}
+ private static List<URL> downloadURLs(List<URL> urls, String deploymentDir) throws HyracksException {
+ try {
+ List<URL> downloadedFileURLs = new ArrayList<URL>();
+ File dir = new File(deploymentDir);
+ if (!dir.exists()) {
+ FileUtils.forceMkdir(dir);
+ }
+ for (URL url : urls) {
+ String urlString = url.toString();
+ int slashIndex = urlString.lastIndexOf('/');
+ String fileName = urlString.substring(slashIndex + 1);
+ String filePath = deploymentDir + File.separator + fileName;
+ File targetFile = new File(filePath);
+ FileUtils.copyURLToFile(url, targetFile);
+ downloadedFileURLs.add(targetFile.toURI().toURL());
+ }
+ return downloadedFileURLs;
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index bb84e4f..e343657 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -86,6 +86,7 @@
DEPLOY_BINARY,
NOTIFY_DEPLOY_BINARY,
+ UNDEPLOY_BINARY,
OTHER
}
@@ -794,6 +795,25 @@
}
}
+ public static class UnDeployBinaryFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final DeploymentId deploymentId;
+
+ public UnDeployBinaryFunction(DeploymentId deploymentId) {
+ this.deploymentId = deploymentId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.UNDEPLOY_BINARY;
+ }
+
+ public DeploymentId getDeploymentId() {
+ return deploymentId;
+ }
+ }
+
public static class NotifyDeployBinaryFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 6d6ef41..8346ecb 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -72,4 +72,10 @@
CCNCFunctions.DeployBinaryFunction rpaf = new CCNCFunctions.DeployBinaryFunction(deploymentId, binaryURLs);
ipcHandle.send(-1, rpaf, null);
}
+
+ @Override
+ public void undeployBinary(DeploymentId deploymentId) throws Exception {
+ CCNCFunctions.UnDeployBinaryFunction rpaf = new CCNCFunctions.UnDeployBinaryFunction(deploymentId);
+ ipcHandle.send(-1, rpaf, null);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index c6eee89..a1c3b08 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -77,6 +77,7 @@
import edu.uci.ics.hyracks.control.nc.work.DeployBinaryWork;
import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
+import edu.uci.ics.hyracks.control.nc.work.UnDeployBinaryWork;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.api.IIPCI;
import edu.uci.ics.hyracks.ipc.api.IPCPerformanceCounters;
@@ -451,8 +452,9 @@
}
case START_TASKS: {
CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
- queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), stf.getJobId(), stf
- .getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
+ queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(),
+ stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(),
+ stf.getFlags()));
return;
}
@@ -493,6 +495,12 @@
.getBinaryURLs()));
return;
}
+
+ case UNDEPLOY_BINARY: {
+ CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn;
+ queue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
+ return;
+ }
}
throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
index 6c20722..eecf67f 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
@@ -26,7 +26,7 @@
public void run() {
try {
DeploymentUtils.deploy(deploymentId, binaryURLs, ncs.getApplicationContext()
- .getJobSerializerDeserializerContainer());
+ .getJobSerializerDeserializerContainer(), ncs.getServerContext());
IClusterController ccs = ncs.getClusterController();
ccs.notifyDeployBinary(deploymentId, ncs.getId(), DeploymentStatus.SUCCEED);
} catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/UnDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/UnDeployBinaryWork.java
new file mode 100644
index 0000000..38a764b
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/UnDeployBinaryWork.java
@@ -0,0 +1,31 @@
+package edu.uci.ics.hyracks.control.nc.work;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class UnDeployBinaryWork extends AbstractWork {
+
+ private DeploymentId deploymentId;
+ private NodeControllerService ncs;
+
+ public UnDeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId) {
+ this.deploymentId = deploymentId;
+ this.ncs = ncs;
+ }
+
+ @Override
+ public void run() {
+ try {
+ DeploymentUtils.undeploy(deploymentId, ncs.getApplicationContext().getJobSerializerDeserializerContainer());
+ IClusterController ccs = ncs.getClusterController();
+ ccs.notifyDeployBinary(deploymentId, ncs.getId(), DeploymentStatus.SUCCEED);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}