changes for dynamic deployment according to Vinayak's suggestions
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_dynamic_deployment@3302 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
index 1b079d6..6ee719b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
@@ -16,6 +16,7 @@
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
import edu.uci.ics.hyracks.api.messages.IMessageBroker;
/**
@@ -35,4 +36,7 @@
public void setMessageBroker(IMessageBroker messageBroker);
public IMessageBroker getMessageBroker();
+
+ public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer();
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index e689e81..fd78505 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -37,7 +38,7 @@
GET_DATASET_RESULT_LOCATIONS,
WAIT_FOR_COMPLETION,
GET_NODE_CONTROLLERS_INFO,
- DEPLOY_BINARY
+ CLI_DEPLOY_BINARY
}
public abstract static class Function implements Serializable {
@@ -204,21 +205,27 @@
}
}
- public static class DeployBinaryFunction extends Function {
+ public static class CliDeployBinaryFunction extends Function {
private static final long serialVersionUID = 1L;
private final List<URL> binaryURLs;
+ private final DeploymentId deploymentId;
- public DeployBinaryFunction(List<URL> binaryURLs) {
+ public CliDeployBinaryFunction(List<URL> binaryURLs, DeploymentId deploymentId) {
this.binaryURLs = binaryURLs;
+ this.deploymentId = deploymentId;
}
@Override
public FunctionId getFunctionId() {
- return FunctionId.DEPLOY_BINARY;
+ return FunctionId.CLI_DEPLOY_BINARY;
}
public List<URL> getBinaryURLs() {
return binaryURLs;
}
+
+ public DeploymentId getDeploymentId() {
+ return deploymentId;
+ }
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 05712ac..edb4ca8 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -20,6 +20,7 @@
import java.util.Map;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -83,9 +84,16 @@
}
@Override
- public void deployBinary(List<URL> binaryURLs) throws Exception {
- HyracksClientInterfaceFunctions.DeployBinaryFunction dbf = new HyracksClientInterfaceFunctions.DeployBinaryFunction(
- binaryURLs);
+ public DeploymentId deployBinary(List<URL> binaryURLs) throws Exception {
+ HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(
+ binaryURLs, null);
+ return (DeploymentId) rpci.call(ipcHandle, dbf);
+ }
+
+ @Override
+ public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception {
+ HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(
+ binaryURLs, deploymentId);
rpci.call(ipcHandle, dbf);
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 03ddc84..a0c3a18 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -20,6 +20,7 @@
import java.util.Map;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -39,6 +40,8 @@
public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception;
public ClusterTopology getClusterTopology() throws Exception;
-
- public void deployBinary(List<URL> binaryURLs) throws Exception;
+
+ public DeploymentId deployBinary(List<URL> binaryURLs) throws Exception;
+
+ public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
index 7482ac9..b642bc7 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
@@ -23,7 +23,7 @@
if (!(o instanceof DeploymentId)) {
return false;
}
- return ((DeploymentId) o).equals(deploymentKey);
+ return ((DeploymentId) o).deploymentKey.equals(deploymentKey);
}
@Override
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
new file mode 100644
index 0000000..6e0b00f
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.job;
+
+import java.net.URL;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+public interface IJobSerializerDeserializer {
+
+ public JobSpecification deserializeJobSpecification(byte[] jsBytes) throws HyracksException;
+
+ public byte[] serializeJobSpecication(JobSpecification jobSpec) throws HyracksException;
+
+ public ActivityClusterGraph deserializeActivityClusterGraph(byte[] acgBytes) throws HyracksException;
+
+ public byte[] serializeActivityClusterGraph(ActivityClusterGraph acg) throws HyracksException;
+
+ public void addClassPathURLs(List<URL> binaryURLs) throws HyracksException;
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializerContainer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializerContainer.java
new file mode 100644
index 0000000..0ce2346
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializerContainer.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.job;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+
+public interface IJobSerializerDeserializerContainer {
+
+ /**
+ * Get the IJobSerializerDeserializer implementation instance for a specific deployment id
+ *
+ * @param deploymentId
+ * @return
+ */
+ public IJobSerializerDeserializer getJobSerializerDeerializer(DeploymentId deploymentId);
+
+ /**
+ * Add a deployment with the job serializer deserializer
+ *
+ * @param deploymentId
+ * @param jobSerDe
+ */
+ public void addJobSerializerDeserializer(DeploymentId deploymentId, IJobSerializerDeserializer jobSerDe);
+
+ /**
+ * Remove a deployment
+ *
+ * @param deploymentId
+ */
+ public void removeJobSerializerDeserializer(DeploymentId deploymentId);
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
new file mode 100644
index 0000000..cccee37
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.job;
+
+import java.net.URL;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+
+public class JobSerializerDeserializer implements IJobSerializerDeserializer {
+
+ @Override
+ public JobSpecification deserializeJobSpecification(byte[] jsBytes) throws HyracksException {
+ try {
+ return (JobSpecification) JavaSerializationUtils.deserialize(jsBytes);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ @Override
+ public byte[] serializeJobSpecication(JobSpecification jobSpec) throws HyracksException {
+ try {
+ return JavaSerializationUtils.serialize(jobSpec);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ @Override
+ public ActivityClusterGraph deserializeActivityClusterGraph(byte[] acgBytes) throws HyracksException {
+ try {
+ return (ActivityClusterGraph) JavaSerializationUtils.deserialize(acgBytes);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ @Override
+ public byte[] serializeActivityClusterGraph(ActivityClusterGraph acg) throws HyracksException {
+ try {
+ return JavaSerializationUtils.serialize(acg);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ @Override
+ public void addClassPathURLs(List<URL> binaryURLs) {
+ throw new UnsupportedOperationException("Not supported by " + this.getClass().getName());
+ }
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
new file mode 100644
index 0000000..ed26304
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializerContainer.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.job;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+
+public class JobSerializerDeserializerContainer implements IJobSerializerDeserializerContainer {
+
+ private IJobSerializerDeserializer defaultJobSerDe = new JobSerializerDeserializer();
+ private Map<DeploymentId, IJobSerializerDeserializer> jobSerializerDeserializerMap = new HashMap<DeploymentId, IJobSerializerDeserializer>();
+
+ @Override
+ public synchronized IJobSerializerDeserializer getJobSerializerDeerializer(DeploymentId deploymentId) {
+ if (deploymentId == null) {
+ return defaultJobSerDe;
+ }
+ IJobSerializerDeserializer jobSerDe = jobSerializerDeserializerMap.get(deploymentId);
+ if (jobSerDe == null) {
+ return defaultJobSerDe;
+ }
+ return jobSerDe;
+ }
+
+ @Override
+ public synchronized void addJobSerializerDeserializer(DeploymentId deploymentId, IJobSerializerDeserializer jobSerDe) {
+ jobSerializerDeserializerMap.put(deploymentId, jobSerDe);
+ }
+
+ @Override
+ public synchronized void removeJobSerializerDeserializer(DeploymentId deploymentId) {
+ jobSerializerDeserializerMap.remove(deploymentId);
+ }
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
index 764348f..cb90f4c 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
@@ -33,6 +33,19 @@
return baos.toByteArray();
}
+ public static byte[] serialize(Serializable jobSpec, ClassLoader classLoader) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(classLoader);
+ oos.writeObject(jobSpec);
+ return baos.toByteArray();
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
public static Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException {
if (bytes == null) {
return null;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index f480ced..fa1882a 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -382,9 +382,10 @@
return;
}
- case DEPLOY_BINARY: {
- HyracksClientInterfaceFunctions.DeployBinaryFunction dbf = (HyracksClientInterfaceFunctions.DeployBinaryFunction) fn;
- workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs()));
+ case CLI_DEPLOY_BINARY: {
+ HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
+ workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(), dbf
+ .getDeploymentId(), new IPCResponder<DeploymentId>(handle, mid)));
return;
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
index 355289b..26b30c5 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/CliDeployBinaryWork.java
@@ -26,23 +26,41 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.common.work.IPCResponder;
public class CliDeployBinaryWork extends AbstractWork {
private ClusterControllerService ccs;
private List<URL> binaryURLs;
+ private DeploymentId deploymentId;
+ private IPCResponder<DeploymentId> callback;
- public CliDeployBinaryWork(ClusterControllerService ncs, List<URL> binaryURLs) {
+ public CliDeployBinaryWork(ClusterControllerService ncs, List<URL> binaryURLs, DeploymentId deploymentId,
+ IPCResponder<DeploymentId> callback) {
this.ccs = ncs;
this.binaryURLs = binaryURLs;
+ this.deploymentId = deploymentId;
}
@Override
public void run() {
try {
+ if (deploymentId == null) {
+ deploymentId = new DeploymentId(UUID.randomUUID().toString());
+ }
+ /**
+ * Deploy for the cluster controller
+ */
+ DeploymentUtils.deploy(deploymentId, binaryURLs, ccs.getApplicationContext()
+ .getJobSerializerDeserializerContainer());
+
+ /**
+ * Deploy for the node controllers
+ */
Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap();
- DeploymentId deploymentId = new DeploymentId(UUID.randomUUID().toString());
+
Set<String> nodeIds = new TreeSet<String>();
for (String nc : nodeControllerStateMap.keySet()) {
nodeIds.add(nc);
@@ -62,8 +80,9 @@
*/
dRun.waitForCompletion();
ccs.removeDeploymentRun(deploymentId);
+ callback.setValue(deploymentId);
} catch (Exception e) {
- throw new RuntimeException(e);
+ callback.setException(e);
}
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
index 5251584..58ae79e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
@@ -18,6 +18,8 @@
import java.io.Serializable;
import edu.uci.ics.hyracks.api.application.IApplicationContext;
+import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
+import edu.uci.ics.hyracks.api.job.JobSerializerDeserializerContainer;
import edu.uci.ics.hyracks.api.messages.IMessageBroker;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
@@ -25,6 +27,7 @@
protected ServerContext serverCtx;
protected Serializable distributedState;
protected IMessageBroker messageBroker;
+ protected IJobSerializerDeserializerContainer jobSerDeContainer = new JobSerializerDeserializerContainer();
public ApplicationContext(ServerContext serverCtx) throws IOException {
this.serverCtx = serverCtx;
@@ -44,4 +47,9 @@
public IMessageBroker getMessageBroker() {
return this.messageBroker;
}
+
+ @Override
+ public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
+ return this.jobSerDeContainer;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
new file mode 100644
index 0000000..ccf38c2
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
@@ -0,0 +1,92 @@
+package edu.uci.ics.hyracks.control.common.deployment;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializer;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+
+public class ClassLoaderJobSerializerDeserializer implements IJobSerializerDeserializer {
+
+ private URLClassLoader classLoader;
+
+ @Override
+ public JobSpecification deserializeJobSpecification(byte[] jsBytes) throws HyracksException {
+ try {
+ if (classLoader == null) {
+ return (JobSpecification) JavaSerializationUtils.deserialize(jsBytes);
+ }
+ return (JobSpecification) JavaSerializationUtils.deserialize(jsBytes, classLoader);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ @Override
+ public byte[] serializeJobSpecication(JobSpecification jobSpec) throws HyracksException {
+ try {
+ if (classLoader == null) {
+ return JavaSerializationUtils.serialize(jobSpec);
+ }
+ return JavaSerializationUtils.serialize(jobSpec, classLoader);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ @Override
+ public ActivityClusterGraph deserializeActivityClusterGraph(byte[] acgBytes) throws HyracksException {
+ try {
+ if (classLoader == null) {
+ return (ActivityClusterGraph) JavaSerializationUtils.deserialize(acgBytes);
+ }
+ return (ActivityClusterGraph) JavaSerializationUtils.deserialize(acgBytes, classLoader);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ @Override
+ public byte[] serializeActivityClusterGraph(ActivityClusterGraph acg) throws HyracksException {
+ try {
+ if (classLoader == null) {
+ return JavaSerializationUtils.serialize(acg);
+ }
+ return JavaSerializationUtils.serialize(acg, classLoader);
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ @Override
+ public void addClassPathURLs(List<URL> binaryURLs) throws HyracksException {
+ Collections.sort(binaryURLs, new Comparator<URL>() {
+ @Override
+ public int compare(URL o1, URL o2) {
+ return o1.getFile().compareTo(o2.getFile());
+ }
+ });
+ try {
+ if (classLoader == null) {
+ /**crate a new classloader*/
+ URL[] urls = binaryURLs.toArray(new URL[binaryURLs.size()]);
+ classLoader = new URLClassLoader(urls, this.getClass().getClassLoader());
+ } else {
+ /**add URLs to the existing classloader*/
+ Object[] urls = binaryURLs.toArray(new URL[binaryURLs.size()]);
+ Method method = classLoader.getClass().getDeclaredMethod("addURL", new Class[] { URL.class });
+ method.setAccessible(true);
+ method.invoke(classLoader, urls);
+ }
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
new file mode 100644
index 0000000..b61f101
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.hyracks.control.common.deployment;
+
+import java.net.URL;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializer;
+import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
+
+public class DeploymentUtils {
+
+ public static void deploy(DeploymentId deploymentId, List<URL> urls, IJobSerializerDeserializerContainer container)
+ throws HyracksException {
+ IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeerializer(deploymentId);
+ if (jobSerDe == null) {
+ jobSerDe = new ClassLoaderJobSerializerDeserializer();
+ container.addJobSerializerDeserializer(deploymentId, jobSerDe);
+ }
+ jobSerDe.addClassPathURLs(urls);
+ }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
index c56a2e5..6c20722 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DeployBinaryWork.java
@@ -6,6 +6,7 @@
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.control.common.base.IClusterController;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
@@ -23,9 +24,9 @@
@Override
public void run() {
- ncs.getApplicationContext();
- // add a new class path
try {
+ DeploymentUtils.deploy(deploymentId, binaryURLs, ncs.getApplicationContext()
+ .getJobSerializerDeserializerContainer());
IClusterController ccs = ncs.getClusterController();
ccs.notifyDeployBinary(deploymentId, ncs.getId(), DeploymentStatus.SUCCEED);
} catch (Exception e) {
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
index 0c64d7e..285ab1f 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
@@ -18,6 +18,7 @@
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
import edu.uci.ics.hyracks.api.messages.IMessageBroker;
public class TestNCApplicationContext implements INCApplicationContext {
@@ -68,4 +69,10 @@
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
\ No newline at end of file