changes for dynamic jar deployment

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_dynamic_deployment@3366 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index fd78505..74cee5f 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -80,10 +80,18 @@
 
         private final byte[] acggfBytes;
         private final EnumSet<JobFlag> jobFlags;
+        private final DeploymentId deploymentId;
 
         public StartJobFunction(byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
             this.acggfBytes = acggfBytes;
             this.jobFlags = jobFlags;
+            this.deploymentId = null;
+        }
+
+        public StartJobFunction(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
+            this.acggfBytes = acggfBytes;
+            this.jobFlags = jobFlags;
+            this.deploymentId = deploymentId;
         }
 
         @Override
@@ -98,6 +106,10 @@
         public EnumSet<JobFlag> getJobFlags() {
             return jobFlags;
         }
+
+        public DeploymentId getDeploymentId() {
+            return deploymentId;
+        }
     }
 
     public static class GetDatasetDirectoryServiceInfoFunction extends Function {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index edb4ca8..70c4b8c 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -59,6 +59,13 @@
     }
 
     @Override
+    public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception {
+        HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
+                deploymentId, acggfBytes, jobFlags);
+        return (JobId) rpci.call(ipcHandle, sjf);
+    }
+
+    @Override
     public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
         HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction gddsf = new HyracksClientInterfaceFunctions.GetDatasetDirectoryServiceInfoFunction();
         return (NetworkAddress) rpci.call(ipcHandle, gddsf);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index a0c3a18..e70a206 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -44,4 +44,6 @@
     public DeploymentId deployBinary(List<URL> binaryURLs) throws Exception;
 
     public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception;
+
+    public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
index a2ee977..f0b47b1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
@@ -16,6 +16,7 @@
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
@@ -31,5 +32,5 @@
 
     public IDatasetPartitionManager getDatasetPartitionManager();
 
-    public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception;
+    public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId, String nodeId) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
index 6e0b00f..d3188d6 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSerializerDeserializer.java
@@ -15,6 +15,7 @@
 
 package edu.uci.ics.hyracks.api.job;
 
+import java.io.Serializable;
 import java.net.URL;
 import java.util.List;
 
@@ -22,13 +23,9 @@
 
 public interface IJobSerializerDeserializer {
 
-    public JobSpecification deserializeJobSpecification(byte[] jsBytes) throws HyracksException;
+    public Object deserialize(byte[] bytes) throws HyracksException;
 
-    public byte[] serializeJobSpecication(JobSpecification jobSpec) throws HyracksException;
-
-    public ActivityClusterGraph deserializeActivityClusterGraph(byte[] acgBytes) throws HyracksException;
-
-    public byte[] serializeActivityClusterGraph(ActivityClusterGraph acg) throws HyracksException;
+    public byte[] serialize(Serializable job) throws HyracksException;
 
     public void addClassPathURLs(List<URL> binaryURLs) throws HyracksException;
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
index cccee37..b7e7920 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSerializerDeserializer.java
@@ -15,6 +15,7 @@
 
 package edu.uci.ics.hyracks.api.job;
 
+import java.io.Serializable;
 import java.net.URL;
 import java.util.List;
 
@@ -24,36 +25,18 @@
 public class JobSerializerDeserializer implements IJobSerializerDeserializer {
 
     @Override
-    public JobSpecification deserializeJobSpecification(byte[] jsBytes) throws HyracksException {
+    public Object deserialize(byte[] jsBytes) throws HyracksException {
         try {
-            return (JobSpecification) JavaSerializationUtils.deserialize(jsBytes);
+            return JavaSerializationUtils.deserialize(jsBytes);
         } catch (Exception e) {
             throw new HyracksException(e);
         }
     }
 
     @Override
-    public byte[] serializeJobSpecication(JobSpecification jobSpec) throws HyracksException {
+    public byte[] serialize(Serializable obj) throws HyracksException {
         try {
-            return JavaSerializationUtils.serialize(jobSpec);
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    @Override
-    public ActivityClusterGraph deserializeActivityClusterGraph(byte[] acgBytes) throws HyracksException {
-        try {
-            return (ActivityClusterGraph) JavaSerializationUtils.deserialize(acgBytes);
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    @Override
-    public byte[] serializeActivityClusterGraph(ActivityClusterGraph acg) throws HyracksException {
-        try {
-            return JavaSerializationUtils.serialize(acg);
+            return JavaSerializationUtils.serialize(obj);
         } catch (Exception e) {
             throw new HyracksException(e);
         }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index fa1882a..d92315d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -334,8 +334,8 @@
                 case START_JOB: {
                     HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
                     JobId jobId = createJobId();
-                    workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getACGGFBytes(), sjf
-                            .getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
+                    workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(), sjf
+                            .getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
                     return;
                 }
 
@@ -494,7 +494,7 @@
                 case SEND_APPLICATION_MESSAGE: {
                     CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
                     workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(), rsf
-                            .getNodeId()));
+                            .getDeploymentId(), rsf.getNodeId()));
                     return;
                 }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 33e1ff6..827a71e 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.ActivityCluster;
 import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
@@ -45,6 +46,8 @@
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 
 public class JobRun implements IJobStatusConditionVariable {
+    private final DeploymentId deploymentId;
+
     private final JobId jobId;
 
     private final IActivityClusterGraphGenerator acgg;
@@ -81,8 +84,9 @@
 
     private Exception pendingException;
 
-    public JobRun(ClusterControllerService ccs, JobId jobId, IActivityClusterGraphGenerator acgg,
-            EnumSet<JobFlag> jobFlags) {
+    public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
+            IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
+        this.deploymentId = deploymentId;
         this.jobId = jobId;
         this.acgg = acgg;
         this.acg = acgg.initialize();
@@ -96,6 +100,10 @@
         connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
     }
 
+    public DeploymentId getDeploymentId() {
+        return deploymentId;
+    }
+
     public JobId getJobId() {
         return jobId;
     }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 34b7dc7..f3d7d34 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.ActivityCluster;
 import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
@@ -427,6 +428,7 @@
     }
 
     private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
+        final DeploymentId deploymentId = jobRun.getDeploymentId();
         final JobId jobId = jobRun.getJobId();
         final ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
         final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>(
@@ -443,8 +445,8 @@
                 }
                 try {
                     byte[] jagBytes = changed ? JavaSerializationUtils.serialize(acg) : null;
-                    node.getNodeController().startTasks(jobId, jagBytes, taskDescriptors, connectorPolicies,
-                            jobRun.getFlags());
+                    node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors,
+                            connectorPolicies, jobRun.getFlags());
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
index b880c8a..50b3e30 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -14,14 +14,14 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
-import java.io.IOException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.messages.IMessage;
-import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 
 /**
@@ -31,11 +31,13 @@
 
     private static final Logger LOGGER = Logger.getLogger(ApplicationMessageWork.class.getName());
     private byte[] message;
+    private DeploymentId deploymentId;
     private String nodeId;
     private ClusterControllerService ccs;
 
-    public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, String nodeId) {
+    public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId, String nodeId) {
         this.ccs = ccs;
+        this.deploymentId = deploymentId;
         this.nodeId = nodeId;
         this.message = message;
     }
@@ -44,17 +46,16 @@
     public void run() {
         final ICCApplicationContext ctx = ccs.getApplicationContext();
         try {
-            final IMessage data = (IMessage) JavaSerializationUtils.deserialize(message);
+            final IMessage data = (IMessage) DeploymentUtils.deserialize(message, deploymentId, ctx);
             ccs.getExecutor().execute(new Runnable() {
                 @Override
                 public void run() {
                     ctx.getMessageBroker().receivedMessage(data, nodeId);
                 }
             });
-        } catch (IOException e) {
+        } catch (Exception e) {
             LOGGER.log(Level.WARNING, "Error in stats reporting", e);
-        } catch (ClassNotFoundException e) {
-            Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in stats reporting", e);
+            throw new RuntimeException(e);
         }
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
index f7d6c4c..7ecdd16 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
@@ -16,15 +16,16 @@
 
 import java.util.EnumSet;
 
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
 import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 
@@ -32,11 +33,13 @@
     private final ClusterControllerService ccs;
     private final byte[] acggfBytes;
     private final EnumSet<JobFlag> jobFlags;
+    private final DeploymentId deploymentId;
     private final JobId jobId;
     private final IResultCallback<JobId> callback;
 
-    public JobStartWork(ClusterControllerService ccs, byte[] acggfBytes, EnumSet<JobFlag> jobFlags, JobId jobId,
-            IResultCallback<JobId> callback) {
+    public JobStartWork(ClusterControllerService ccs, DeploymentId deploymentId, byte[] acggfBytes,
+            EnumSet<JobFlag> jobFlags, JobId jobId, IResultCallback<JobId> callback) {
+        this.deploymentId = deploymentId;
         this.jobId = jobId;
         this.ccs = ccs;
         this.acggfBytes = acggfBytes;
@@ -48,10 +51,10 @@
     protected void doRun() throws Exception {
         try {
             final CCApplicationContext appCtx = ccs.getApplicationContext();
-            IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) JavaSerializationUtils
-                    .deserialize(acggfBytes);
+            IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils
+                    .deserialize(acggfBytes, deploymentId, appCtx);
             IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(jobId, appCtx, jobFlags);
-            JobRun run = new JobRun(ccs, jobId, acgg, jobFlags);
+            JobRun run = new JobRun(ccs, deploymentId, jobId, acgg, jobFlags);
             run.setStatus(JobStatus.INITIALIZED, null);
             ccs.getActiveRunMap().put(jobId, run);
             appCtx.notifyJobCreation(jobId, acggf);
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 94e9f92..47a5c09 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -51,7 +51,7 @@
 
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
 
-    public void sendApplicationMessageToCC(byte[] data, String nodeId) throws Exception;
+    public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
 
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
             int nPartitions, NetworkAddress networkAddress) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 0cfd161..49ea21e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -31,7 +31,7 @@
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 
 public interface INodeController {
-    public void startTasks(JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+    public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
             Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception;
 
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
index ccf38c2..0e4d366 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
@@ -1,5 +1,6 @@
 package edu.uci.ics.hyracks.control.common.deployment;
 
+import java.io.Serializable;
 import java.lang.reflect.Method;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -8,7 +9,6 @@
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
 import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializer;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
@@ -18,7 +18,7 @@
     private URLClassLoader classLoader;
 
     @Override
-    public JobSpecification deserializeJobSpecification(byte[] jsBytes) throws HyracksException {
+    public JobSpecification deserialize(byte[] jsBytes) throws HyracksException {
         try {
             if (classLoader == null) {
                 return (JobSpecification) JavaSerializationUtils.deserialize(jsBytes);
@@ -30,7 +30,7 @@
     }
 
     @Override
-    public byte[] serializeJobSpecication(JobSpecification jobSpec) throws HyracksException {
+    public byte[] serialize(Serializable jobSpec) throws HyracksException {
         try {
             if (classLoader == null) {
                 return JavaSerializationUtils.serialize(jobSpec);
@@ -42,30 +42,6 @@
     }
 
     @Override
-    public ActivityClusterGraph deserializeActivityClusterGraph(byte[] acgBytes) throws HyracksException {
-        try {
-            if (classLoader == null) {
-                return (ActivityClusterGraph) JavaSerializationUtils.deserialize(acgBytes);
-            }
-            return (ActivityClusterGraph) JavaSerializationUtils.deserialize(acgBytes, classLoader);
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    @Override
-    public byte[] serializeActivityClusterGraph(ActivityClusterGraph acg) throws HyracksException {
-        try {
-            if (classLoader == null) {
-                return JavaSerializationUtils.serialize(acg);
-            }
-            return JavaSerializationUtils.serialize(acg, classLoader);
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    @Override
     public void addClassPathURLs(List<URL> binaryURLs) throws HyracksException {
         Collections.sort(binaryURLs, new Comparator<URL>() {
             @Override
@@ -75,11 +51,11 @@
         });
         try {
             if (classLoader == null) {
-                /**crate a new classloader*/
+                /** crate a new classloader */
                 URL[] urls = binaryURLs.toArray(new URL[binaryURLs.size()]);
                 classLoader = new URLClassLoader(urls, this.getClass().getClassLoader());
             } else {
-                /**add URLs to the existing classloader*/
+                /** add URLs to the existing classloader */
                 Object[] urls = binaryURLs.toArray(new URL[binaryURLs.size()]);
                 Method method = classLoader.getClass().getDeclaredMethod("addURL", new Class[] { URL.class });
                 method.setAccessible(true);
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index b61f101..68fae70 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -3,10 +3,12 @@
 import java.net.URL;
 import java.util.List;
 
+import edu.uci.ics.hyracks.api.application.IApplicationContext;
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializer;
 import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 
 public class DeploymentUtils {
 
@@ -20,4 +22,17 @@
         jobSerDe.addClassPathURLs(urls);
     }
 
+    public static Object deserialize(byte[] bytes, DeploymentId deploymentId, IApplicationContext appCtx)
+            throws HyracksException {
+        try {
+            IJobSerializerDeserializerContainer jobSerDeContainer = appCtx.getJobSerializerDeserializerContainer();
+            IJobSerializerDeserializer jobSerDe = deploymentId == null ? null : jobSerDeContainer
+                    .getJobSerializerDeerializer(deploymentId);
+            Object obj = jobSerDe == null ? JavaSerializationUtils.deserialize(bytes) : jobSerDe.deserialize(bytes);
+            return obj;
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
 }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index d6ed7ec..bb84e4f 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -93,8 +93,13 @@
     public static class SendApplicationMessageFunction extends Function {
         private static final long serialVersionUID = 1L;
         private byte[] serializedMessage;
+        private DeploymentId deploymentId;
         private String nodeId;
 
+        public DeploymentId getDeploymentId() {
+            return deploymentId;
+        }
+
         public String getNodeId() {
             return nodeId;
         }
@@ -107,8 +112,9 @@
             return serializedMessage;
         }
 
-        public SendApplicationMessageFunction(byte[] data, String nodeId) {
+        public SendApplicationMessageFunction(byte[] data, DeploymentId deploymentId, String nodeId) {
             this.serializedMessage = data;
+            this.deploymentId = deploymentId;
             this.nodeId = nodeId;
         }
 
@@ -587,14 +593,17 @@
     public static class StartTasksFunction extends Function {
         private static final long serialVersionUID = 1L;
 
+        private final DeploymentId deploymentId;
         private final JobId jobId;
         private final byte[] planBytes;
         private final List<TaskAttemptDescriptor> taskDescriptors;
         private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
         private final EnumSet<JobFlag> flags;
 
-        public StartTasksFunction(JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+        public StartTasksFunction(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
+                List<TaskAttemptDescriptor> taskDescriptors,
                 Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) {
+            this.deploymentId = deploymentId;
             this.jobId = jobId;
             this.planBytes = planBytes;
             this.taskDescriptors = taskDescriptors;
@@ -607,6 +616,10 @@
             return FunctionId.START_TASKS;
         }
 
+        public DeploymentId getDeploymentId() {
+            return deploymentId;
+        }
+
         public JobId getJobId() {
             return jobId;
         }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 10cd935..5ed65cc 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -105,8 +105,9 @@
     }
 
     @Override
-    public void sendApplicationMessageToCC(byte[] data, String nodeId) throws Exception {
-        CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data, nodeId);
+    public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
+        CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data,
+                deploymentId, nodeId);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index aaed36e..6d6ef41 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -40,10 +40,11 @@
     }
 
     @Override
-    public void startTasks(JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+    public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
+            List<TaskAttemptDescriptor> taskDescriptors,
             Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception {
-        CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(jobId, planBytes, taskDescriptors,
-                connectorPolicies, flags);
+        CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(deploymentId, jobId, planBytes,
+                taskDescriptors, connectorPolicies, flags);
         ipcHandle.send(-1, stf, null);
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 64ff8c8..c6eee89 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -47,6 +47,7 @@
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
@@ -445,13 +446,13 @@
                 case SEND_APPLICATION_MESSAGE: {
                     CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
                     queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), amf
-                            .getNodeId()));
+                            .getDeploymentId(), amf.getNodeId()));
                     return;
                 }
                 case START_TASKS: {
                     CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
-                    queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getJobId(), stf.getPlanBytes(),
-                            stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
+                    queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), stf.getJobId(), stf
+                            .getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
                     return;
                 }
 
@@ -498,8 +499,8 @@
         }
     }
 
-    public void sendApplicationMessageToCC(byte[] data, String nodeId) throws Exception {
-        ccs.sendApplicationMessageToCC(data, nodeId);
+    public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
+        ccs.sendApplicationMessageToCC(data, deploymentId, nodeId);
     }
 
     public IDatasetPartitionManager getDatasetPartitionManager() {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index d6ea111..ac76c16 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -35,6 +35,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -354,7 +355,7 @@
     }
 
     @Override
-    public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
-        this.ncs.sendApplicationMessageToCC(message, nodeId);
+    public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId, String nodeId) throws Exception {
+        this.ncs.sendApplicationMessageToCC(message, deploymentId, nodeId);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
index a1499b8..19a5a81 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -14,12 +14,12 @@
  */
 package edu.uci.ics.hyracks.control.nc.work;
 
-import java.io.IOException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.messages.IMessage;
-import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
@@ -30,11 +30,13 @@
 public class ApplicationMessageWork extends AbstractWork {
     private static final Logger LOGGER = Logger.getLogger(ApplicationMessageWork.class.getName());
     private byte[] message;
+    private DeploymentId deploymentId;
     private String nodeId;
     private NodeControllerService ncs;
 
-    public ApplicationMessageWork(NodeControllerService ncs, byte[] message, String nodeId) {
+    public ApplicationMessageWork(NodeControllerService ncs, byte[] message, DeploymentId deploymentId, String nodeId) {
         this.ncs = ncs;
+        this.deploymentId = deploymentId;
         this.nodeId = nodeId;
         this.message = message;
     }
@@ -43,16 +45,15 @@
     public void run() {
         NCApplicationContext ctx = ncs.getApplicationContext();
         try {
-            IMessage data = (IMessage) JavaSerializationUtils.deserialize(message);
+            IMessage data = (IMessage) DeploymentUtils.deserialize(message, deploymentId, ctx);;
             if (ctx.getMessageBroker() != null) {
                 ctx.getMessageBroker().receivedMessage(data, nodeId);
             } else {
                 LOGGER.log(Level.WARNING, "Messsage was sent, but no Message Broker set!");
             }
-        } catch (IOException e) {
+        } catch (Exception e) {
             Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in application message delivery!", e);
-        } catch (ClassNotFoundException e) {
-            Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in application message delivery!", e);
+            throw new RuntimeException(e);
         }
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index fffecc2..939276a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -36,13 +36,14 @@
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.ActivityCluster;
 import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.nc.Joblet;
@@ -60,6 +61,8 @@
 
     private final NodeControllerService ncs;
 
+    private final DeploymentId deploymentId;
+
     private final JobId jobId;
 
     private final byte[] acgBytes;
@@ -70,10 +73,11 @@
 
     private final EnumSet<JobFlag> flags;
 
-    public StartTasksWork(NodeControllerService ncs, JobId jobId, byte[] acgBytes,
+    public StartTasksWork(NodeControllerService ncs, DeploymentId deploymentId, JobId jobId, byte[] acgBytes,
             List<TaskAttemptDescriptor> taskDescriptors,
             Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, EnumSet<JobFlag> flags) {
         this.ncs = ncs;
+        this.deploymentId = deploymentId;
         this.jobId = jobId;
         this.acgBytes = acgBytes;
         this.taskDescriptors = taskDescriptors;
@@ -86,7 +90,7 @@
         try {
             NCApplicationContext appCtx = ncs.getApplicationContext();
             final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, acgBytes == null ? null
-                    : (ActivityClusterGraph) JavaSerializationUtils.deserialize(acgBytes));
+                    : (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx));
             final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
 
             IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index 0ca93b2..8311ebd 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -107,7 +108,7 @@
     }
 
     @Override
-    public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
+    public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId, String nodeId) throws Exception {
         // TODO Auto-generated method stub
 
     }