revised skeleton for dynamic deployment
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_dynamic_deployment@3300 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 642f550..e689e81 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -203,10 +203,9 @@
return FunctionId.GET_CLUSTER_TOPOLOGY;
}
}
-
+
public static class DeployBinaryFunction extends Function {
private static final long serialVersionUID = 1L;
-
private final List<URL> binaryURLs;
public DeployBinaryFunction(List<URL> binaryURLs) {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
new file mode 100644
index 0000000..7482ac9
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hyracks.api.deployment;
+
+import java.io.Serializable;
+
+public class DeploymentId implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String deploymentKey;
+
+ public DeploymentId(String deploymentKey) {
+ this.deploymentKey = deploymentKey;
+ }
+
+ public int hashCode() {
+ return deploymentKey.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof DeploymentId)) {
+ return false;
+ }
+ return ((DeploymentId) o).equals(deploymentKey);
+ }
+
+ @Override
+ public String toString() {
+ return deploymentKey;
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index b10d266..f480ced 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -40,6 +40,7 @@
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
@@ -49,7 +50,7 @@
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.web.WebServer;
import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
-import edu.uci.ics.hyracks.control.cc.work.DeployBinaryWork;
+import edu.uci.ics.hyracks.control.cc.work.CliDeployBinaryWork;
import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
@@ -59,6 +60,7 @@
import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
+import edu.uci.ics.hyracks.control.cc.work.NotifyDeployBinaryWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterNodeWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionRequestWork;
@@ -74,6 +76,7 @@
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.Function;
import edu.uci.ics.hyracks.control.common.logs.LogFile;
@@ -127,6 +130,8 @@
private long jobCounter;
+ private final Map<DeploymentId, DeploymentRun> deploymentRunMap;
+
public ClusterControllerService(final CCConfig ccConfig) throws Exception {
this.ccConfig = ccConfig;
File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
@@ -173,6 +178,8 @@
sweeper = new DeadNodeSweeper();
datasetDirectoryService = new DatasetDirectoryService(ccConfig.jobHistorySize);
jobCounter = 0;
+
+ deploymentRunMap = new HashMap<DeploymentId, DeploymentRun>();
}
private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
@@ -377,7 +384,7 @@
case DEPLOY_BINARY: {
HyracksClientInterfaceFunctions.DeployBinaryFunction dbf = (HyracksClientInterfaceFunctions.DeployBinaryFunction) fn;
- workQueue.schedule(new DeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs()));
+ workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs()));
return;
}
}
@@ -421,6 +428,13 @@
return;
}
+ case NOTIFY_DEPLOY_BINARY: {
+ CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn;
+ workQueue.schedule(new NotifyDeployBinaryWork(ClusterControllerService.this,
+ ndbf.getDeploymentId(), ndbf.getNodeId(), ndbf.getDeploymentStatus()));
+ return;
+ }
+
case REPORT_PROFILE: {
CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
workQueue.schedule(new ReportProfilesWork(ClusterControllerService.this, rpf.getProfiles()));
@@ -503,4 +517,32 @@
LOGGER.warning("Unknown function: " + fn.getFunctionId());
}
}
+
+ /**
+ * Add a deployment run
+ *
+ * @param deploymentKey
+ * @param nodeControllerIds
+ */
+ public synchronized void addDeploymentRun(DeploymentId deploymentKey, DeploymentRun dRun) {
+ deploymentRunMap.put(deploymentKey, dRun);
+ }
+
+ /**
+ * Get a deployment run
+ *
+ * @param deploymentKey
+ */
+ public synchronized DeploymentRun getDeploymentRun(DeploymentId deploymentKey) {
+ return deploymentRunMap.get(deploymentKey);
+ }
+
+ /**
+ * Remove a deployment run
+ *
+ * @param deploymentKey
+ */
+ public synchronized void removeDeploymentRun(DeploymentId deploymentKey) {
+ deploymentRunMap.remove(deploymentKey);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
new file mode 100644
index 0000000..355289b
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class CliDeployBinaryWork extends AbstractWork {
+
+ private ClusterControllerService ccs;
+ private List<URL> binaryURLs;
+
+ public CliDeployBinaryWork(ClusterControllerService ncs, List<URL> binaryURLs) {
+ this.ccs = ncs;
+ this.binaryURLs = binaryURLs;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap();
+ DeploymentId deploymentId = new DeploymentId(UUID.randomUUID().toString());
+ Set<String> nodeIds = new TreeSet<String>();
+ for (String nc : nodeControllerStateMap.keySet()) {
+ nodeIds.add(nc);
+ }
+ DeploymentRun dRun = new DeploymentRun(nodeIds);
+ ccs.addDeploymentRun(deploymentId, dRun);
+
+ /***
+ * deploy binaries to each node controller
+ */
+ for (NodeControllerState ncs : nodeControllerStateMap.values()) {
+ ncs.getNodeController().deployBinary(deploymentId, binaryURLs);
+ }
+
+ /**
+ * wait for completion
+ */
+ dRun.waitForCompletion();
+ ccs.removeDeploymentRun(deploymentId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/DeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/DeployBinaryWork.java
deleted file mode 100644
index 5aeba4b..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/DeployBinaryWork.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.control.cc.work;
-
-import java.net.URL;
-import java.util.List;
-
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-
-public class DeployBinaryWork extends AbstractWork {
-
- private ClusterControllerService ccs;
-
- public DeployBinaryWork(ClusterControllerService ncs, List<URL> binaryURLs) {
- this.ccs = ncs;
- }
-
- @Override
- public void run() {
-
- }
-
-}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
new file mode 100644
index 0000000..903cad3
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class NotifyDeployBinaryWork extends AbstractWork {
+
+ private final ClusterControllerService ccs;
+ private final String nodeId;
+ private final DeploymentId deploymentId;
+ private DeploymentStatus deploymentStatus;
+
+ public NotifyDeployBinaryWork(ClusterControllerService ccs, DeploymentId deploymentId, String nodeId,
+ DeploymentStatus deploymentStatus) {
+ this.ccs = ccs;
+ this.nodeId = nodeId;
+ this.deploymentId = deploymentId;
+ this.deploymentStatus = deploymentStatus;
+
+ }
+
+ @Override
+ public void run() {
+ // notify that one NC is deployed
+ DeploymentRun dRun = ccs.getDeploymentRun(deploymentId);
+ dRun.notifyDeploymentStatus(nodeId, deploymentStatus);
+ }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 924a8be..94e9f92 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -19,8 +19,10 @@
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
@@ -39,7 +41,7 @@
public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
- public void notifyDeployBinary(String nodeId) throws Exception;
+ public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception;
public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 1b7e8f6..0cfd161 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -39,5 +40,5 @@
public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception;
- public void deployBinary(List<URL> url) throws Exception;
+ public void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentRun.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentRun.java
new file mode 100644
index 0000000..519905c
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentRun.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hyracks.control.common.deployment;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+public class DeploymentRun implements IDeploymentStatusConditionVariable {
+
+ private DeploymentStatus deploymentStatus = DeploymentStatus.FAIL;
+ private final Set<String> deploymentNodeIds = new TreeSet<String>();
+
+ public DeploymentRun(Set<String> nodeIds) {
+ deploymentNodeIds.addAll(nodeIds);
+ }
+
+ public synchronized void notifyDeploymentStatus(String nodeId, DeploymentStatus status) {
+ deploymentNodeIds.remove(nodeId);
+ if (deploymentNodeIds.size() == 0) {
+ deploymentStatus = DeploymentStatus.SUCCEED;
+ notifyAll();
+ }
+ }
+
+ @Override
+ public synchronized DeploymentStatus waitForCompletion() throws Exception {
+ wait();
+ return deploymentStatus;
+ }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentStatus.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentStatus.java
new file mode 100644
index 0000000..3690518
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentStatus.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.common.deployment;
+
+public enum DeploymentStatus {
+ SUCCEED,
+ FAIL
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/IDeploymentStatusConditionVariable.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/IDeploymentStatusConditionVariable.java
new file mode 100644
index 0000000..06e398e
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/IDeploymentStatusConditionVariable.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.common.deployment;
+
+public interface IDeploymentStatusConditionVariable {
+
+ public DeploymentStatus waitForCompletion() throws Exception;
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index c4968eb..d6ed7ec 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -38,12 +38,14 @@
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
@@ -81,7 +83,8 @@
SEND_APPLICATION_MESSAGE,
GET_NODE_CONTROLLERS_INFO,
GET_NODE_CONTROLLERS_INFO_RESPONSE,
-
+
+ DEPLOY_BINARY,
NOTIFY_DEPLOY_BINARY,
OTHER
@@ -753,13 +756,42 @@
}
}
+ public static class DeployBinaryFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final List<URL> binaryURLs;
+ private final DeploymentId deploymentId;
+
+ public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs) {
+ this.binaryURLs = binaryURLs;
+ this.deploymentId = deploymentId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DEPLOY_BINARY;
+ }
+
+ public List<URL> getBinaryURLs() {
+ return binaryURLs;
+ }
+
+ public DeploymentId getDeploymentId() {
+ return deploymentId;
+ }
+ }
+
public static class NotifyDeployBinaryFunction extends Function {
private static final long serialVersionUID = 1L;
private final String nodeId;
+ private final DeploymentId deploymentId;
+ private final DeploymentStatus deploymentStatus;
- public NotifyDeployBinaryFunction(String nodeId) {
+ public NotifyDeployBinaryFunction(DeploymentId deploymentId, String nodeId, DeploymentStatus deploymentStatus) {
this.nodeId = nodeId;
+ this.deploymentId = deploymentId;
+ this.deploymentStatus = deploymentStatus;
}
@Override
@@ -770,6 +802,14 @@
public String getNodeId() {
return nodeId;
}
+
+ public DeploymentId getDeploymentId() {
+ return deploymentId;
+ }
+
+ public DeploymentStatus getDeploymentStatus() {
+ return deploymentStatus;
+ }
}
public static class SerializerDeserializer implements IPayloadSerializerDeserializer {
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 49ab53b..10cd935 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -19,9 +19,11 @@
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.base.IClusterController;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
@@ -70,8 +72,9 @@
}
@Override
- public void notifyDeployBinary(String nodeId) throws Exception {
- CCNCFunctions.NotifyDeployBinaryFunction fn = new CCNCFunctions.NotifyDeployBinaryFunction(nodeId);
+ public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception {
+ CCNCFunctions.NotifyDeployBinaryFunction fn = new CCNCFunctions.NotifyDeployBinaryFunction(deploymentId,
+ nodeId, status);
ipcHandle.send(-1, fn, null);
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 33f19ca..aaed36e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -19,11 +19,11 @@
import java.util.List;
import java.util.Map;
-import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -67,9 +67,8 @@
}
@Override
- public void deployBinary(List<URL> binaryURLs) throws Exception {
- HyracksClientInterfaceFunctions.DeployBinaryFunction rpaf = new HyracksClientInterfaceFunctions.DeployBinaryFunction(
- binaryURLs);
+ public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception {
+ CCNCFunctions.DeployBinaryFunction rpaf = new CCNCFunctions.DeployBinaryFunction(deploymentId, binaryURLs);
ipcHandle.send(-1, rpaf, null);
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index a97ec84..64ff8c8 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -73,7 +73,7 @@
import edu.uci.ics.hyracks.control.nc.work.ApplicationMessageWork;
import edu.uci.ics.hyracks.control.nc.work.BuildJobProfilesWork;
import edu.uci.ics.hyracks.control.nc.work.CleanupJobletWork;
-import edu.uci.ics.hyracks.control.nc.work.DeployBinaryNotificationWork;
+import edu.uci.ics.hyracks.control.nc.work.DeployBinaryWork;
import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
@@ -485,10 +485,11 @@
setNodeControllersInfo(gncirf.getNodeControllerInfos());
return;
}
-
- case NOTIFY_DEPLOY_BINARY: {
- CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn;
- queue.schedule(new DeployBinaryNotificationWork(NodeControllerService.this, ndbf.getNodeId()));
+
+ case DEPLOY_BINARY: {
+ CCNCFunctions.DeployBinaryFunction ndbf = (CCNCFunctions.DeployBinaryFunction) fn;
+ queue.schedule(new DeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId(), ndbf
+ .getBinaryURLs()));
return;
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryNotificationWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryNotificationWork.java
deleted file mode 100644
index 15a2095..0000000
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryNotificationWork.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.control.nc.work;
-
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-
-public class DeployBinaryNotificationWork extends AbstractWork {
-
- private NodeControllerService ccs;
-
- public DeployBinaryNotificationWork(NodeControllerService ccs, String nodeId) {
- this.ccs = ccs;
- }
-
- @Override
- public void run() {
-
- }
-
-}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
new file mode 100644
index 0000000..c56a2e5
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
@@ -0,0 +1,36 @@
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.net.URL;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class DeployBinaryWork extends AbstractWork {
+
+ private DeploymentId deploymentId;
+ private NodeControllerService ncs;
+ private List<URL> binaryURLs;
+
+ public DeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId, List<URL> binaryURLs) {
+ this.deploymentId = deploymentId;
+ this.ncs = ncs;
+ this.binaryURLs = binaryURLs;
+ }
+
+ @Override
+ public void run() {
+ ncs.getApplicationContext();
+ // add a new class path
+ try {
+ IClusterController ccs = ncs.getClusterController();
+ ccs.notifyDeployBinary(deploymentId, ncs.getId(), DeploymentStatus.SUCCEED);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}