add undeployment

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_dynamic_deployment@3368 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 74cee5f..0467eae 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -38,7 +38,8 @@
         GET_DATASET_RESULT_LOCATIONS,
         WAIT_FOR_COMPLETION,
         GET_NODE_CONTROLLERS_INFO,
-        CLI_DEPLOY_BINARY
+        CLI_DEPLOY_BINARY,
+        CLI_UNDEPLOY_BINARY
     }
 
     public abstract static class Function implements Serializable {
@@ -240,4 +241,22 @@
             return deploymentId;
         }
     }
+
+    public static class CliUnDeployBinaryFunction extends Function {
+        private static final long serialVersionUID = 1L;
+        private final DeploymentId deploymentId;
+
+        public CliUnDeployBinaryFunction(DeploymentId deploymentId) {
+            this.deploymentId = deploymentId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CLI_UNDEPLOY_BINARY;
+        }
+
+        public DeploymentId getDeploymentId() {
+            return deploymentId;
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 70c4b8c..8f1c401 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -103,4 +103,11 @@
                 binaryURLs, deploymentId);
         rpci.call(ipcHandle, dbf);
     }
+
+    @Override
+    public void unDeployBinary(DeploymentId deploymentId) throws Exception {
+        HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction dbf = new HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction(
+                deploymentId);
+        rpci.call(ipcHandle, dbf);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index e70a206..9c0ba3b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -45,5 +45,7 @@
 
     public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception;
 
+    public void unDeployBinary(DeploymentId deploymentId) throws Exception;
+
     public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index d92315d..789e48f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -51,6 +51,7 @@
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
 import edu.uci.ics.hyracks.control.cc.work.CliDeployBinaryWork;
+import edu.uci.ics.hyracks.control.cc.work.CliUnDeployBinaryWork;
 import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
 import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
@@ -388,6 +389,13 @@
                             .getDeploymentId(), new IPCResponder<DeploymentId>(handle, mid)));
                     return;
                 }
+
+                case CLI_UNDEPLOY_BINARY: {
+                    HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf = (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
+                    workQueue
+                            .schedule(new CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId()));
+                    return;
+                }
             }
             try {
                 handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId()));
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
index 26b30c5..a4f34af 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
@@ -54,7 +54,7 @@
              * Deploy for the cluster controller
              */
             DeploymentUtils.deploy(deploymentId, binaryURLs, ccs.getApplicationContext()
-                    .getJobSerializerDeserializerContainer());
+                    .getJobSerializerDeserializerContainer(), ccs.getServerContext());
 
             /**
              * Deploy for the node controllers
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
new file mode 100644
index 0000000..584ab7c
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliUnDeployBinaryWork.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class CliUnDeployBinaryWork extends AbstractWork {
+
+    private ClusterControllerService ccs;
+    private DeploymentId deploymentId;
+
+    public CliUnDeployBinaryWork(ClusterControllerService ncs, DeploymentId deploymentId) {
+        this.ccs = ncs;
+        this.deploymentId = deploymentId;
+    }
+
+    @Override
+    public void run() {
+        try {
+            if (deploymentId == null) {
+                deploymentId = new DeploymentId(UUID.randomUUID().toString());
+            }
+            /**
+             * Deploy for the cluster controller
+             */
+            DeploymentUtils.undeploy(deploymentId, ccs.getApplicationContext().getJobSerializerDeserializerContainer());
+
+            /**
+             * Deploy for the node controllers
+             */
+            Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap();
+
+            Set<String> nodeIds = new TreeSet<String>();
+            for (String nc : nodeControllerStateMap.keySet()) {
+                nodeIds.add(nc);
+            }
+            DeploymentRun dRun = new DeploymentRun(nodeIds);
+            ccs.addDeploymentRun(deploymentId, dRun);
+
+            /***
+             * deploy binaries to each node controller
+             */
+            for (NodeControllerState ncs : nodeControllerStateMap.values()) {
+                ncs.getNodeController().undeployBinary(deploymentId);
+            }
+            
+            /**
+             * wait for completion
+             */
+            dRun.waitForCompletion();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 49ea21e..cf27740 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -31,7 +31,8 @@
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 
 public interface INodeController {
-    public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+    public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
+            List<TaskAttemptDescriptor> taskDescriptors,
             Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception;
 
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
@@ -41,4 +42,6 @@
     public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception;
 
     public void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception;
+
+    public void undeployBinary(DeploymentId deploymentId) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index 68fae70..ab84065 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -1,25 +1,40 @@
 package edu.uci.ics.hyracks.control.common.deployment;
 
+import java.io.File;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.io.FileUtils;
+
 import edu.uci.ics.hyracks.api.application.IApplicationContext;
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializer;
 import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
 
 public class DeploymentUtils {
 
-    public static void deploy(DeploymentId deploymentId, List<URL> urls, IJobSerializerDeserializerContainer container)
+    private static final String DEPLOYMENT = "deployment";
+
+    public static void undeploy(DeploymentId deploymentId, IJobSerializerDeserializerContainer container)
             throws HyracksException {
+        container.removeJobSerializerDeserializer(deploymentId);
+    }
+
+    public static void deploy(DeploymentId deploymentId, List<URL> urls, IJobSerializerDeserializerContainer container,
+            ServerContext ctx) throws HyracksException {
         IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeerializer(deploymentId);
         if (jobSerDe == null) {
             jobSerDe = new ClassLoaderJobSerializerDeserializer();
             container.addJobSerializerDeserializer(deploymentId, jobSerDe);
         }
-        jobSerDe.addClassPathURLs(urls);
+        String rootDir = ctx.getBaseDir().toString();
+        String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + DEPLOYMENT : rootDir + File.separator
+                + DEPLOYMENT;
+        jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir));
     }
 
     public static Object deserialize(byte[] bytes, DeploymentId deploymentId, IApplicationContext appCtx)
@@ -35,4 +50,25 @@
         }
     }
 
+    private static List<URL> downloadURLs(List<URL> urls, String deploymentDir) throws HyracksException {
+        try {
+            List<URL> downloadedFileURLs = new ArrayList<URL>();
+            File dir = new File(deploymentDir);
+            if (!dir.exists()) {
+                FileUtils.forceMkdir(dir);
+            }
+            for (URL url : urls) {
+                String urlString = url.toString();
+                int slashIndex = urlString.lastIndexOf('/');
+                String fileName = urlString.substring(slashIndex + 1);
+                String filePath = deploymentDir + File.separator + fileName;
+                File targetFile = new File(filePath);
+                FileUtils.copyURLToFile(url, targetFile);
+                downloadedFileURLs.add(targetFile.toURI().toURL());
+            }
+            return downloadedFileURLs;
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index bb84e4f..e343657 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -86,6 +86,7 @@
 
         DEPLOY_BINARY,
         NOTIFY_DEPLOY_BINARY,
+        UNDEPLOY_BINARY,
 
         OTHER
     }
@@ -794,6 +795,25 @@
         }
     }
 
+    public static class UnDeployBinaryFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final DeploymentId deploymentId;
+
+        public UnDeployBinaryFunction(DeploymentId deploymentId) {
+            this.deploymentId = deploymentId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.UNDEPLOY_BINARY;
+        }
+
+        public DeploymentId getDeploymentId() {
+            return deploymentId;
+        }
+    }
+
     public static class NotifyDeployBinaryFunction extends Function {
         private static final long serialVersionUID = 1L;
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 6d6ef41..8346ecb 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -72,4 +72,10 @@
         CCNCFunctions.DeployBinaryFunction rpaf = new CCNCFunctions.DeployBinaryFunction(deploymentId, binaryURLs);
         ipcHandle.send(-1, rpaf, null);
     }
+
+    @Override
+    public void undeployBinary(DeploymentId deploymentId) throws Exception {
+        CCNCFunctions.UnDeployBinaryFunction rpaf = new CCNCFunctions.UnDeployBinaryFunction(deploymentId);
+        ipcHandle.send(-1, rpaf, null);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index c6eee89..a1c3b08 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -77,6 +77,7 @@
 import edu.uci.ics.hyracks.control.nc.work.DeployBinaryWork;
 import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
+import edu.uci.ics.hyracks.control.nc.work.UnDeployBinaryWork;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.IIPCI;
 import edu.uci.ics.hyracks.ipc.api.IPCPerformanceCounters;
@@ -451,8 +452,9 @@
                 }
                 case START_TASKS: {
                     CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
-                    queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), stf.getJobId(), stf
-                            .getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
+                    queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(),
+                            stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(),
+                            stf.getFlags()));
                     return;
                 }
 
@@ -493,6 +495,12 @@
                             .getBinaryURLs()));
                     return;
                 }
+
+                case UNDEPLOY_BINARY: {
+                    CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn;
+                    queue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
+                    return;
+                }
             }
             throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
index 6c20722..eecf67f 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
@@ -26,7 +26,7 @@
     public void run() {
         try {
             DeploymentUtils.deploy(deploymentId, binaryURLs, ncs.getApplicationContext()
-                    .getJobSerializerDeserializerContainer());
+                    .getJobSerializerDeserializerContainer(), ncs.getServerContext());
             IClusterController ccs = ncs.getClusterController();
             ccs.notifyDeployBinary(deploymentId, ncs.getId(), DeploymentStatus.SUCCEED);
         } catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/UnDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/UnDeployBinaryWork.java
new file mode 100644
index 0000000..38a764b
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/UnDeployBinaryWork.java
@@ -0,0 +1,31 @@
+package edu.uci.ics.hyracks.control.nc.work;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class UnDeployBinaryWork extends AbstractWork {
+
+    private DeploymentId deploymentId;
+    private NodeControllerService ncs;
+
+    public UnDeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId) {
+        this.deploymentId = deploymentId;
+        this.ncs = ncs;
+    }
+
+    @Override
+    public void run() {
+        try {
+            DeploymentUtils.undeploy(deploymentId, ncs.getApplicationContext().getJobSerializerDeserializerContainer());
+            IClusterController ccs = ncs.getClusterController();
+            ccs.notifyDeployBinary(deploymentId, ncs.getId(), DeploymentStatus.SUCCEED);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}