revised skeleton for dynamic deployment

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_dynamic_deployment@3300 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 642f550..e689e81 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -203,10 +203,9 @@
             return FunctionId.GET_CLUSTER_TOPOLOGY;
         }
     }
-    
+
     public static class DeployBinaryFunction extends Function {
         private static final long serialVersionUID = 1L;
-
         private final List<URL> binaryURLs;
 
         public DeployBinaryFunction(List<URL> binaryURLs) {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
new file mode 100644
index 0000000..7482ac9
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hyracks.api.deployment;
+
+import java.io.Serializable;
+
+public class DeploymentId implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String deploymentKey;
+
+    public DeploymentId(String deploymentKey) {
+        this.deploymentKey = deploymentKey;
+    }
+
+    public int hashCode() {
+        return deploymentKey.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof DeploymentId)) {
+            return false;
+        }
+        return ((DeploymentId) o).equals(deploymentKey);
+    }
+
+    @Override
+    public String toString() {
+        return deploymentKey;
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index b10d266..f480ced 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -40,6 +40,7 @@
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.topology.ClusterTopology;
@@ -49,7 +50,7 @@
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
-import edu.uci.ics.hyracks.control.cc.work.DeployBinaryWork;
+import edu.uci.ics.hyracks.control.cc.work.CliDeployBinaryWork;
 import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
 import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
@@ -59,6 +60,7 @@
 import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
 import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
 import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
+import edu.uci.ics.hyracks.control.cc.work.NotifyDeployBinaryWork;
 import edu.uci.ics.hyracks.control.cc.work.RegisterNodeWork;
 import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
 import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionRequestWork;
@@ -74,6 +76,7 @@
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
 import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
 import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.Function;
 import edu.uci.ics.hyracks.control.common.logs.LogFile;
@@ -127,6 +130,8 @@
 
     private long jobCounter;
 
+    private final Map<DeploymentId, DeploymentRun> deploymentRunMap;
+
     public ClusterControllerService(final CCConfig ccConfig) throws Exception {
         this.ccConfig = ccConfig;
         File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
@@ -173,6 +178,8 @@
         sweeper = new DeadNodeSweeper();
         datasetDirectoryService = new DatasetDirectoryService(ccConfig.jobHistorySize);
         jobCounter = 0;
+
+        deploymentRunMap = new HashMap<DeploymentId, DeploymentRun>();
     }
 
     private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
@@ -377,7 +384,7 @@
 
                 case DEPLOY_BINARY: {
                     HyracksClientInterfaceFunctions.DeployBinaryFunction dbf = (HyracksClientInterfaceFunctions.DeployBinaryFunction) fn;
-                    workQueue.schedule(new DeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs()));
+                    workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs()));
                     return;
                 }
             }
@@ -421,6 +428,13 @@
                     return;
                 }
 
+                case NOTIFY_DEPLOY_BINARY: {
+                    CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn;
+                    workQueue.schedule(new NotifyDeployBinaryWork(ClusterControllerService.this,
+                            ndbf.getDeploymentId(), ndbf.getNodeId(), ndbf.getDeploymentStatus()));
+                    return;
+                }
+
                 case REPORT_PROFILE: {
                     CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
                     workQueue.schedule(new ReportProfilesWork(ClusterControllerService.this, rpf.getProfiles()));
@@ -503,4 +517,32 @@
             LOGGER.warning("Unknown function: " + fn.getFunctionId());
         }
     }
+
+    /**
+     * Add a deployment run
+     * 
+     * @param deploymentKey
+     * @param nodeControllerIds
+     */
+    public synchronized void addDeploymentRun(DeploymentId deploymentKey, DeploymentRun dRun) {
+        deploymentRunMap.put(deploymentKey, dRun);
+    }
+
+    /**
+     * Get a deployment run
+     * 
+     * @param deploymentKey
+     */
+    public synchronized DeploymentRun getDeploymentRun(DeploymentId deploymentKey) {
+        return deploymentRunMap.get(deploymentKey);
+    }
+
+    /**
+     * Remove a deployment run
+     * 
+     * @param deploymentKey
+     */
+    public synchronized void removeDeploymentRun(DeploymentId deploymentKey) {
+        deploymentRunMap.remove(deploymentKey);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
new file mode 100644
index 0000000..355289b
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class CliDeployBinaryWork extends AbstractWork {
+
+    private ClusterControllerService ccs;
+    private List<URL> binaryURLs;
+
+    public CliDeployBinaryWork(ClusterControllerService ncs, List<URL> binaryURLs) {
+        this.ccs = ncs;
+        this.binaryURLs = binaryURLs;
+    }
+
+    @Override
+    public void run() {
+        try {
+            Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap();
+            DeploymentId deploymentId = new DeploymentId(UUID.randomUUID().toString());
+            Set<String> nodeIds = new TreeSet<String>();
+            for (String nc : nodeControllerStateMap.keySet()) {
+                nodeIds.add(nc);
+            }
+            DeploymentRun dRun = new DeploymentRun(nodeIds);
+            ccs.addDeploymentRun(deploymentId, dRun);
+
+            /***
+             * deploy binaries to each node controller
+             */
+            for (NodeControllerState ncs : nodeControllerStateMap.values()) {
+                ncs.getNodeController().deployBinary(deploymentId, binaryURLs);
+            }
+
+            /**
+             * wait for completion
+             */
+            dRun.waitForCompletion();
+            ccs.removeDeploymentRun(deploymentId);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/DeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/DeployBinaryWork.java
deleted file mode 100644
index 5aeba4b..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/DeployBinaryWork.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.control.cc.work;
-
-import java.net.URL;
-import java.util.List;
-
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-
-public class DeployBinaryWork extends AbstractWork {
-
-	private ClusterControllerService ccs;
-
-	public DeployBinaryWork(ClusterControllerService ncs, List<URL> binaryURLs) {
-		this.ccs = ncs;
-	}
-
-	@Override
-	public void run() {
-
-	}
-
-}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
new file mode 100644
index 0000000..903cad3
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class NotifyDeployBinaryWork extends AbstractWork {
+
+    private final ClusterControllerService ccs;
+    private final String nodeId;
+    private final DeploymentId deploymentId;
+    private DeploymentStatus deploymentStatus;
+
+    public NotifyDeployBinaryWork(ClusterControllerService ccs, DeploymentId deploymentId, String nodeId,
+            DeploymentStatus deploymentStatus) {
+        this.ccs = ccs;
+        this.nodeId = nodeId;
+        this.deploymentId = deploymentId;
+        this.deploymentStatus = deploymentStatus;
+
+    }
+
+    @Override
+    public void run() {
+        // notify that one NC is deployed
+        DeploymentRun dRun = ccs.getDeploymentRun(deploymentId);
+        dRun.notifyDeploymentStatus(nodeId, deploymentStatus);
+    }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 924a8be..94e9f92 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -19,8 +19,10 @@
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
 import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
 import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
@@ -39,7 +41,7 @@
 
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
 
-    public void notifyDeployBinary(String nodeId) throws Exception;
+    public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception;
 
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 1b7e8f6..0cfd161 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -39,5 +40,5 @@
 
     public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception;
 
-    public void deployBinary(List<URL> url) throws Exception;
+    public void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentRun.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentRun.java
new file mode 100644
index 0000000..519905c
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentRun.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hyracks.control.common.deployment;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+public class DeploymentRun implements IDeploymentStatusConditionVariable {
+
+    private DeploymentStatus deploymentStatus = DeploymentStatus.FAIL;
+    private final Set<String> deploymentNodeIds = new TreeSet<String>();
+
+    public DeploymentRun(Set<String> nodeIds) {
+        deploymentNodeIds.addAll(nodeIds);
+    }
+
+    public synchronized void notifyDeploymentStatus(String nodeId, DeploymentStatus status) {
+        deploymentNodeIds.remove(nodeId);
+        if (deploymentNodeIds.size() == 0) {
+            deploymentStatus = DeploymentStatus.SUCCEED;
+            notifyAll();
+        }
+    }
+
+    @Override
+    public synchronized DeploymentStatus waitForCompletion() throws Exception {
+        wait();
+        return deploymentStatus;
+    }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentStatus.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentStatus.java
new file mode 100644
index 0000000..3690518
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentStatus.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.common.deployment;
+
+public enum DeploymentStatus {
+    SUCCEED,
+    FAIL
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/IDeploymentStatusConditionVariable.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/IDeploymentStatusConditionVariable.java
new file mode 100644
index 0000000..06e398e
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/IDeploymentStatusConditionVariable.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.common.deployment;
+
+public interface IDeploymentStatusConditionVariable {
+
+    public DeploymentStatus waitForCompletion() throws Exception;
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index c4968eb..d6ed7ec 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -38,12 +38,14 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
 import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
 import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
@@ -81,7 +83,8 @@
         SEND_APPLICATION_MESSAGE,
         GET_NODE_CONTROLLERS_INFO,
         GET_NODE_CONTROLLERS_INFO_RESPONSE,
-        
+
+        DEPLOY_BINARY,
         NOTIFY_DEPLOY_BINARY,
 
         OTHER
@@ -753,13 +756,42 @@
         }
     }
 
+    public static class DeployBinaryFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final List<URL> binaryURLs;
+        private final DeploymentId deploymentId;
+
+        public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs) {
+            this.binaryURLs = binaryURLs;
+            this.deploymentId = deploymentId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DEPLOY_BINARY;
+        }
+
+        public List<URL> getBinaryURLs() {
+            return binaryURLs;
+        }
+
+        public DeploymentId getDeploymentId() {
+            return deploymentId;
+        }
+    }
+
     public static class NotifyDeployBinaryFunction extends Function {
         private static final long serialVersionUID = 1L;
 
         private final String nodeId;
+        private final DeploymentId deploymentId;
+        private final DeploymentStatus deploymentStatus;
 
-        public NotifyDeployBinaryFunction(String nodeId) {
+        public NotifyDeployBinaryFunction(DeploymentId deploymentId, String nodeId, DeploymentStatus deploymentStatus) {
             this.nodeId = nodeId;
+            this.deploymentId = deploymentId;
+            this.deploymentStatus = deploymentStatus;
         }
 
         @Override
@@ -770,6 +802,14 @@
         public String getNodeId() {
             return nodeId;
         }
+
+        public DeploymentId getDeploymentId() {
+            return deploymentId;
+        }
+
+        public DeploymentStatus getDeploymentStatus() {
+            return deploymentStatus;
+        }
     }
 
     public static class SerializerDeserializer implements IPayloadSerializerDeserializer {
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 49ab53b..10cd935 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -19,9 +19,11 @@
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.base.IClusterController;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
 import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
 import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
@@ -70,8 +72,9 @@
     }
 
     @Override
-    public void notifyDeployBinary(String nodeId) throws Exception {
-        CCNCFunctions.NotifyDeployBinaryFunction fn = new CCNCFunctions.NotifyDeployBinaryFunction(nodeId);
+    public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception {
+        CCNCFunctions.NotifyDeployBinaryFunction fn = new CCNCFunctions.NotifyDeployBinaryFunction(deploymentId,
+                nodeId, status);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 33f19ca..aaed36e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -19,11 +19,11 @@
 import java.util.List;
 import java.util.Map;
 
-import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -67,9 +67,8 @@
     }
 
     @Override
-    public void deployBinary(List<URL> binaryURLs) throws Exception {
-        HyracksClientInterfaceFunctions.DeployBinaryFunction rpaf = new HyracksClientInterfaceFunctions.DeployBinaryFunction(
-                binaryURLs);
+    public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception {
+        CCNCFunctions.DeployBinaryFunction rpaf = new CCNCFunctions.DeployBinaryFunction(deploymentId, binaryURLs);
         ipcHandle.send(-1, rpaf, null);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index a97ec84..64ff8c8 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -73,7 +73,7 @@
 import edu.uci.ics.hyracks.control.nc.work.ApplicationMessageWork;
 import edu.uci.ics.hyracks.control.nc.work.BuildJobProfilesWork;
 import edu.uci.ics.hyracks.control.nc.work.CleanupJobletWork;
-import edu.uci.ics.hyracks.control.nc.work.DeployBinaryNotificationWork;
+import edu.uci.ics.hyracks.control.nc.work.DeployBinaryWork;
 import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
@@ -485,10 +485,11 @@
                     setNodeControllersInfo(gncirf.getNodeControllerInfos());
                     return;
                 }
-                
-                case NOTIFY_DEPLOY_BINARY: {
-                    CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn;
-                    queue.schedule(new DeployBinaryNotificationWork(NodeControllerService.this, ndbf.getNodeId()));
+
+                case DEPLOY_BINARY: {
+                    CCNCFunctions.DeployBinaryFunction ndbf = (CCNCFunctions.DeployBinaryFunction) fn;
+                    queue.schedule(new DeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId(), ndbf
+                            .getBinaryURLs()));
                     return;
                 }
             }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryNotificationWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryNotificationWork.java
deleted file mode 100644
index 15a2095..0000000
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryNotificationWork.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.control.nc.work;
-
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-
-public class DeployBinaryNotificationWork extends AbstractWork {
-
-	private NodeControllerService ccs;
-
-	public DeployBinaryNotificationWork(NodeControllerService ccs, String nodeId) {
-		this.ccs = ccs;
-	}
-
-	@Override
-	public void run() {
-
-	}
-
-}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
new file mode 100644
index 0000000..c56a2e5
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
@@ -0,0 +1,36 @@
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.net.URL;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class DeployBinaryWork extends AbstractWork {
+
+    private DeploymentId deploymentId;
+    private NodeControllerService ncs;
+    private List<URL> binaryURLs;
+
+    public DeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId, List<URL> binaryURLs) {
+        this.deploymentId = deploymentId;
+        this.ncs = ncs;
+        this.binaryURLs = binaryURLs;
+    }
+
+    @Override
+    public void run() {
+        ncs.getApplicationContext();
+        // add a new class path
+        try {
+            IClusterController ccs = ncs.getClusterController();
+            ccs.notifyDeployBinary(deploymentId, ncs.getId(), DeploymentStatus.SUCCEED);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}