changes for dynamic jar deployment
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_dynamic_deployment@3366 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index fd78505..74cee5f 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -80,10 +80,18 @@
private final byte[] acggfBytes;
private final EnumSet<JobFlag> jobFlags;
+ private final DeploymentId deploymentId;
public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
this.acggfBytes = acggfBytes;
this.jobFlags = jobFlags;
+ this.deploymentId = null;
+ }
+
+ public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
+ this.acggfBytes = acggfBytes;
+ this.jobFlags = jobFlags;
+ this.deploymentId = deploymentId;
}
@Override
@@ -98,6 +106,10 @@
public EnumSet<JobFlag> getJobFlags() {
return jobFlags;
}
+
+ public DeploymentId getDeploymentId() {
+ return deploymentId;
+ }
}
public static class GetDatasetDirectoryServiceInfoFunction extends Function {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index edb4ca8..70c4b8c 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -59,6 +59,13 @@
}
@Override
+ public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception {
+ HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
+ deploymentId, acggfBytes, jobFlags);
+ return (JobId) rpci.call(ipcHandle, sjf);
+ }
+
+ @Override
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction gddsf = new HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
return (NetworkAddress) rpci.call(ipcHandle, gddsf);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index a0c3a18..e70a206 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -44,4 +44,6 @@
public DeploymentId deployBinary(List<URL> binaryURLs) throws Exception;
public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception;
+
+ public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
index a2ee977..f0b47b1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
@@ -16,6 +16,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
@@ -31,5 +32,5 @@
public IDatasetPartitionManager getDatasetPartitionManager();
- public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception;
+ public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId, String nodeId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
index 6e0b00f..d3188d6 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.api.job;
+import java.io.Serializable;
import java.net.URL;
import java.util.List;
@@ -22,13 +23,9 @@
public interface IJobSerializerDeserializer {
- public JobSpecification deserializeJobSpecification(byte[] jsBytes) throws HyracksException;
+ public Object deserialize(byte[] bytes) throws HyracksException;
- public byte[] serializeJobSpecication(JobSpecification jobSpec) throws HyracksException;
-
- public ActivityClusterGraph deserializeActivityClusterGraph(byte[] acgBytes) throws HyracksException;
-
- public byte[] serializeActivityClusterGraph(ActivityClusterGraph acg) throws HyracksException;
+ public byte[] serialize(Serializable job) throws HyracksException;
public void addClassPathURLs(List<URL> binaryURLs) throws HyracksException;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
index cccee37..b7e7920 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.api.job;
+import java.io.Serializable;
import java.net.URL;
import java.util.List;
@@ -24,36 +25,18 @@
public class JobSerializerDeserializer implements IJobSerializerDeserializer {
@Override
- public JobSpecification deserializeJobSpecification(byte[] jsBytes) throws HyracksException {
+ public Object deserialize(byte[] jsBytes) throws HyracksException {
try {
- return (JobSpecification) JavaSerializationUtils.deserialize(jsBytes);
+ return JavaSerializationUtils.deserialize(jsBytes);
} catch (Exception e) {
throw new HyracksException(e);
}
}
@Override
- public byte[] serializeJobSpecication(JobSpecification jobSpec) throws HyracksException {
+ public byte[] serialize(Serializable obj) throws HyracksException {
try {
- return JavaSerializationUtils.serialize(jobSpec);
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- @Override
- public ActivityClusterGraph deserializeActivityClusterGraph(byte[] acgBytes) throws HyracksException {
- try {
- return (ActivityClusterGraph) JavaSerializationUtils.deserialize(acgBytes);
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- @Override
- public byte[] serializeActivityClusterGraph(ActivityClusterGraph acg) throws HyracksException {
- try {
- return JavaSerializationUtils.serialize(acg);
+ return JavaSerializationUtils.serialize(obj);
} catch (Exception e) {
throw new HyracksException(e);
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index fa1882a..d92315d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -334,8 +334,8 @@
case START_JOB: {
HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
JobId jobId = createJobId();
- workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getACGGFBytes(), sjf
- .getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
+ workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(), sjf
+ .getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
return;
}
@@ -494,7 +494,7 @@
case SEND_APPLICATION_MESSAGE: {
CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(), rsf
- .getNodeId()));
+ .getDeploymentId(), rsf.getNodeId()));
return;
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 33e1ff6..827a71e 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -29,6 +29,7 @@
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.ActivityCluster;
import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
@@ -45,6 +46,8 @@
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
public class JobRun implements IJobStatusConditionVariable {
+ private final DeploymentId deploymentId;
+
private final JobId jobId;
private final IActivityClusterGraphGenerator acgg;
@@ -81,8 +84,9 @@
private Exception pendingException;
- public JobRun(ClusterControllerService ccs, JobId jobId, IActivityClusterGraphGenerator acgg,
- EnumSet<JobFlag> jobFlags) {
+ public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
+ IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
+ this.deploymentId = deploymentId;
this.jobId = jobId;
this.acgg = acgg;
this.acg = acgg.initialize();
@@ -96,6 +100,10 @@
connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
}
+ public DeploymentId getDeploymentId() {
+ return deploymentId;
+ }
+
public JobId getJobId() {
return jobId;
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 34b7dc7..f3d7d34 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.ActivityCluster;
import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
@@ -427,6 +428,7 @@
}
private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
+ final DeploymentId deploymentId = jobRun.getDeploymentId();
final JobId jobId = jobRun.getJobId();
final ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>(
@@ -443,8 +445,8 @@
}
try {
byte[] jagBytes = changed ? JavaSerializationUtils.serialize(acg) : null;
- node.getNodeController().startTasks(jobId, jagBytes, taskDescriptors, connectorPolicies,
- jobRun.getFlags());
+ node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors,
+ connectorPolicies, jobRun.getFlags());
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
index b880c8a..50b3e30 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -14,14 +14,14 @@
*/
package edu.uci.ics.hyracks.control.cc.work;
-import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.messages.IMessage;
-import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
/**
@@ -31,11 +31,13 @@
private static final Logger LOGGER = Logger.getLogger(ApplicationMessageWork.class.getName());
private byte[] message;
+ private DeploymentId deploymentId;
private String nodeId;
private ClusterControllerService ccs;
- public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, String nodeId) {
+ public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId, String nodeId) {
this.ccs = ccs;
+ this.deploymentId = deploymentId;
this.nodeId = nodeId;
this.message = message;
}
@@ -44,17 +46,16 @@
public void run() {
final ICCApplicationContext ctx = ccs.getApplicationContext();
try {
- final IMessage data = (IMessage) JavaSerializationUtils.deserialize(message);
+ final IMessage data = (IMessage) DeploymentUtils.deserialize(message, deploymentId, ctx);
ccs.getExecutor().execute(new Runnable() {
@Override
public void run() {
ctx.getMessageBroker().receivedMessage(data, nodeId);
}
});
- } catch (IOException e) {
+ } catch (Exception e) {
LOGGER.log(Level.WARNING, "Error in stats reporting", e);
- } catch (ClassNotFoundException e) {
- Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in stats reporting", e);
+ throw new RuntimeException(e);
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
index f7d6c4c..7ecdd16 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
@@ -16,15 +16,16 @@
import java.util.EnumSet;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
import edu.uci.ics.hyracks.control.common.work.IResultCallback;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
@@ -32,11 +33,13 @@
private final ClusterControllerService ccs;
private final byte[] acggfBytes;
private final EnumSet<JobFlag> jobFlags;
+ private final DeploymentId deploymentId;
private final JobId jobId;
private final IResultCallback<JobId> callback;
- public JobStartWork(ClusterControllerService ccs, byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId,
- IResultCallback<JobId> callback) {
+ public JobStartWork(ClusterControllerService ccs, DeploymentId deploymentId, byte[] acggfBytes,
+ EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback) {
+ this.deploymentId = deploymentId;
this.jobId = jobId;
this.ccs = ccs;
this.acggfBytes = acggfBytes;
@@ -48,10 +51,10 @@
protected void doRun() throws Exception {
try {
final CCApplicationContext appCtx = ccs.getApplicationContext();
- IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) JavaSerializationUtils
- .deserialize(acggfBytes);
+ IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
+ .deserialize(acggfBytes, deploymentId, appCtx);
IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags);
- JobRun run = new JobRun(ccs, jobId, acgg, jobFlags);
+ JobRun run = new JobRun(ccs, deploymentId, jobId, acgg, jobFlags);
run.setStatus(JobStatus.INITIALIZED, null);
ccs.getActiveRunMap().put(jobId, run);
appCtx.notifyJobCreation(jobId, acggf);
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 94e9f92..47a5c09 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -51,7 +51,7 @@
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
- public void sendApplicationMessageToCC(byte[] data, String nodeId) throws Exception;
+ public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
int nPartitions, NetworkAddress networkAddress) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 0cfd161..49ea21e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -31,7 +31,7 @@
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
public interface INodeController {
- public void startTasks(JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+ public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception;
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
index ccf38c2..0e4d366 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
@@ -1,5 +1,6 @@
package edu.uci.ics.hyracks.control.common.deployment;
+import java.io.Serializable;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
@@ -8,7 +9,6 @@
import java.util.List;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializer;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
@@ -18,7 +18,7 @@
private URLClassLoader classLoader;
@Override
- public JobSpecification deserializeJobSpecification(byte[] jsBytes) throws HyracksException {
+ public JobSpecification deserialize(byte[] jsBytes) throws HyracksException {
try {
if (classLoader == null) {
return (JobSpecification) JavaSerializationUtils.deserialize(jsBytes);
@@ -30,7 +30,7 @@
}
@Override
- public byte[] serializeJobSpecication(JobSpecification jobSpec) throws HyracksException {
+ public byte[] serialize(Serializable jobSpec) throws HyracksException {
try {
if (classLoader == null) {
return JavaSerializationUtils.serialize(jobSpec);
@@ -42,30 +42,6 @@
}
@Override
- public ActivityClusterGraph deserializeActivityClusterGraph(byte[] acgBytes) throws HyracksException {
- try {
- if (classLoader == null) {
- return (ActivityClusterGraph) JavaSerializationUtils.deserialize(acgBytes);
- }
- return (ActivityClusterGraph) JavaSerializationUtils.deserialize(acgBytes, classLoader);
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- @Override
- public byte[] serializeActivityClusterGraph(ActivityClusterGraph acg) throws HyracksException {
- try {
- if (classLoader == null) {
- return JavaSerializationUtils.serialize(acg);
- }
- return JavaSerializationUtils.serialize(acg, classLoader);
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- @Override
public void addClassPathURLs(List<URL> binaryURLs) throws HyracksException {
Collections.sort(binaryURLs, new Comparator<URL>() {
@Override
@@ -75,11 +51,11 @@
});
try {
if (classLoader == null) {
- /**crate a new classloader*/
+ /** crate a new classloader */
URL[] urls = binaryURLs.toArray(new URL[binaryURLs.size()]);
classLoader = new URLClassLoader(urls, this.getClass().getClassLoader());
} else {
- /**add URLs to the existing classloader*/
+ /** add URLs to the existing classloader */
Object[] urls = binaryURLs.toArray(new URL[binaryURLs.size()]);
Method method = classLoader.getClass().getDeclaredMethod("addURL", new Class[] { URL.class });
method.setAccessible(true);
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index b61f101..68fae70 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -3,10 +3,12 @@
import java.net.URL;
import java.util.List;
+import edu.uci.ics.hyracks.api.application.IApplicationContext;
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializer;
import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
public class DeploymentUtils {
@@ -20,4 +22,17 @@
jobSerDe.addClassPathURLs(urls);
}
+ public static Object deserialize(byte[] bytes, DeploymentId deploymentId, IApplicationContext appCtx)
+ throws HyracksException {
+ try {
+ IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
+ IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
+ .getJobSerializerDeerializer(deploymentId);
+ Object obj = jobSerDe == null ? JavaSerializationUtils.deserialize(bytes) : jobSerDe.deserialize(bytes);
+ return obj;
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index d6ed7ec..bb84e4f 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -93,8 +93,13 @@
public static class SendApplicationMessageFunction extends Function {
private static final long serialVersionUID = 1L;
private byte[] serializedMessage;
+ private DeploymentId deploymentId;
private String nodeId;
+ public DeploymentId getDeploymentId() {
+ return deploymentId;
+ }
+
public String getNodeId() {
return nodeId;
}
@@ -107,8 +112,9 @@
return serializedMessage;
}
- public SendApplicationMessageFunction(byte[] data, String nodeId) {
+ public SendApplicationMessageFunction(byte[] data, DeploymentId deploymentId, String nodeId) {
this.serializedMessage = data;
+ this.deploymentId = deploymentId;
this.nodeId = nodeId;
}
@@ -587,14 +593,17 @@
public static class StartTasksFunction extends Function {
private static final long serialVersionUID = 1L;
+ private final DeploymentId deploymentId;
private final JobId jobId;
private final byte[] planBytes;
private final List<TaskAttemptDescriptor> taskDescriptors;
private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
private final EnumSet<JobFlag> flags;
- public StartTasksFunction(JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+ public StartTasksFunction(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
+ List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) {
+ this.deploymentId = deploymentId;
this.jobId = jobId;
this.planBytes = planBytes;
this.taskDescriptors = taskDescriptors;
@@ -607,6 +616,10 @@
return FunctionId.START_TASKS;
}
+ public DeploymentId getDeploymentId() {
+ return deploymentId;
+ }
+
public JobId getJobId() {
return jobId;
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 10cd935..5ed65cc 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -105,8 +105,9 @@
}
@Override
- public void sendApplicationMessageToCC(byte[] data, String nodeId) throws Exception {
- CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data, nodeId);
+ public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
+ CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data,
+ deploymentId, nodeId);
ipcHandle.send(-1, fn, null);
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index aaed36e..6d6ef41 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -40,10 +40,11 @@
}
@Override
- public void startTasks(JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+ public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
+ List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception {
- CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(jobId, planBytes, taskDescriptors,
- connectorPolicies, flags);
+ CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(deploymentId, jobId, planBytes,
+ taskDescriptors, connectorPolicies, flags);
ipcHandle.send(-1, stf, null);
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 64ff8c8..c6eee89 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -47,6 +47,7 @@
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
@@ -445,13 +446,13 @@
case SEND_APPLICATION_MESSAGE: {
CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), amf
- .getNodeId()));
+ .getDeploymentId(), amf.getNodeId()));
return;
}
case START_TASKS: {
CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
- queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getJobId(), stf.getPlanBytes(),
- stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
+ queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), stf.getJobId(), stf
+ .getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
return;
}
@@ -498,8 +499,8 @@
}
}
- public void sendApplicationMessageToCC(byte[] data, String nodeId) throws Exception {
- ccs.sendApplicationMessageToCC(data, nodeId);
+ public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
+ ccs.sendApplicationMessageToCC(data, deploymentId, nodeId);
}
public IDatasetPartitionManager getDatasetPartitionManager() {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index d6ea111..ac76c16 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -354,7 +355,7 @@
}
@Override
- public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
- this.ncs.sendApplicationMessageToCC(message, nodeId);
+ public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId, String nodeId) throws Exception {
+ this.ncs.sendApplicationMessageToCC(message, deploymentId, nodeId);
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
index a1499b8..19a5a81 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -14,12 +14,12 @@
*/
package edu.uci.ics.hyracks.control.nc.work;
-import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.messages.IMessage;
-import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
@@ -30,11 +30,13 @@
public class ApplicationMessageWork extends AbstractWork {
private static final Logger LOGGER = Logger.getLogger(ApplicationMessageWork.class.getName());
private byte[] message;
+ private DeploymentId deploymentId;
private String nodeId;
private NodeControllerService ncs;
- public ApplicationMessageWork(NodeControllerService ncs, byte[] message, String nodeId) {
+ public ApplicationMessageWork(NodeControllerService ncs, byte[] message, DeploymentId deploymentId, String nodeId) {
this.ncs = ncs;
+ this.deploymentId = deploymentId;
this.nodeId = nodeId;
this.message = message;
}
@@ -43,16 +45,15 @@
public void run() {
NCApplicationContext ctx = ncs.getApplicationContext();
try {
- IMessage data = (IMessage) JavaSerializationUtils.deserialize(message);
+ IMessage data = (IMessage) DeploymentUtils.deserialize(message, deploymentId, ctx);;
if (ctx.getMessageBroker() != null) {
ctx.getMessageBroker().receivedMessage(data, nodeId);
} else {
LOGGER.log(Level.WARNING, "Messsage was sent, but no Message Broker set!");
}
- } catch (IOException e) {
+ } catch (Exception e) {
Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in application message delivery!", e);
- } catch (ClassNotFoundException e) {
- Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in application message delivery!", e);
+ throw new RuntimeException(e);
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index fffecc2..939276a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -36,13 +36,14 @@
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.ActivityCluster;
import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.Joblet;
@@ -60,6 +61,8 @@
private final NodeControllerService ncs;
+ private final DeploymentId deploymentId;
+
private final JobId jobId;
private final byte[] acgBytes;
@@ -70,10 +73,11 @@
private final EnumSet<JobFlag> flags;
- public StartTasksWork(NodeControllerService ncs, JobId jobId, byte[] acgBytes,
+ public StartTasksWork(NodeControllerService ncs, DeploymentId deploymentId, JobId jobId, byte[] acgBytes,
List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, EnumSet<JobFlag> flags) {
this.ncs = ncs;
+ this.deploymentId = deploymentId;
this.jobId = jobId;
this.acgBytes = acgBytes;
this.taskDescriptors = taskDescriptors;
@@ -86,7 +90,7 @@
try {
NCApplicationContext appCtx = ncs.getApplicationContext();
final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, acgBytes == null ? null
- : (ActivityClusterGraph) JavaSerializationUtils.deserialize(acgBytes));
+ : (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx));
final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index 0ca93b2..8311ebd 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -107,7 +108,7 @@
}
@Override
- public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
+ public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId, String nodeId) throws Exception {
// TODO Auto-generated method stub
}