Added ability to use other ser-deser techniques than just java serialization for IPC. Added custom ser-deser for high-traffic messages

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1049 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index b474ee1..8c9409c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.RPCInterface;
 import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
 
 /**
  * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
@@ -65,7 +66,7 @@
     public HyracksConnection(String ccHost, int ccPort) throws Exception {
         this.ccHost = ccHost;
         RPCInterface rpci = new RPCInterface();
-        ipc = new IPCSystem(new InetSocketAddress(0), rpci);
+        ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
         ipc.start();
         IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
         this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle, rpci);
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
index 1424a63..703f74b 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
@@ -15,20 +15,16 @@
 package edu.uci.ics.hyracks.api.client;
 
 import java.io.Serializable;
-import java.net.InetAddress;
 
 public class NodeControllerInfo implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final String nodeId;
 
-    private final InetAddress ipAddress;
-
     private final NodeStatus status;
 
-    public NodeControllerInfo(String nodeId, InetAddress ipAddress, NodeStatus status) {
+    public NodeControllerInfo(String nodeId, NodeStatus status) {
         this.nodeId = nodeId;
-        this.ipAddress = ipAddress;
         this.status = status;
     }
 
@@ -36,10 +32,6 @@
         return nodeId;
     }
 
-    public InetAddress getIpAddress() {
-        return ipAddress;
-    }
-
     public NodeStatus getStatus() {
         return status;
     }
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
index 868221d..d176c3c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
@@ -15,21 +15,20 @@
 package edu.uci.ics.hyracks.api.comm;
 
 import java.io.Serializable;
-import java.net.InetAddress;
 
 public final class NetworkAddress implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final InetAddress ipAddress;
+    private final byte[] ipAddress;
 
     private final int port;
 
-    public NetworkAddress(InetAddress ipAddress, int port) {
+    public NetworkAddress(byte[] ipAddress, int port) {
         this.ipAddress = ipAddress;
         this.port = port;
     }
 
-    public InetAddress getIpAddress() {
+    public byte[] getIpAddress() {
         return ipAddress;
     }
 
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
index 75cc245..41b4c23 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -19,9 +19,9 @@
 public final class ActivityId implements Serializable {
     private static final long serialVersionUID = 1L;
     private final OperatorDescriptorId odId;
-    private final long id;
+    private final int id;
 
-    public ActivityId(OperatorDescriptorId odId, long id) {
+    public ActivityId(OperatorDescriptorId odId, int id) {
         this.odId = odId;
         this.id = id;
     }
@@ -30,7 +30,7 @@
         return odId;
     }
 
-    public long getLocalId() {
+    public int getLocalId() {
         return id;
     }
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 2578b88..d3e472b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -60,8 +60,8 @@
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions.Function;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.Function;
 import edu.uci.ics.hyracks.control.common.logs.LogFile;
 import edu.uci.ics.hyracks.control.common.work.IPCResponder;
 import edu.uci.ics.hyracks.control.common.work.WorkQueue;
@@ -69,6 +69,7 @@
 import edu.uci.ics.hyracks.ipc.api.IIPCI;
 import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
 import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
 
 public class ClusterControllerService extends AbstractRemoteService {
     private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
@@ -119,9 +120,11 @@
         serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
         executor = Executors.newCachedThreadPool();
         IIPCI ccIPCI = new ClusterControllerIPCI();
-        clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI);
+        clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
+                new CCNCFunctions.SerializerDeserializer());
         IIPCI ciIPCI = new HyracksClientInterfaceIPCI();
-        clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI);
+        clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
+                new JavaSerializationBasedPayloadSerializerDeserializer());
         webServer = new WebServer(this);
         activeRunMap = new HashMap<JobId, JobRun>();
         runMapArchive = new LinkedHashMap<JobId, JobRun>() {
@@ -317,69 +320,69 @@
     private class ClusterControllerIPCI implements IIPCI {
         @Override
         public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
-            ClusterControllerFunctions.Function fn = (Function) payload;
+            CCNCFunctions.Function fn = (Function) payload;
             switch (fn.getFunctionId()) {
                 case REGISTER_NODE: {
-                    ClusterControllerFunctions.RegisterNodeFunction rnf = (ClusterControllerFunctions.RegisterNodeFunction) fn;
+                    CCNCFunctions.RegisterNodeFunction rnf = (CCNCFunctions.RegisterNodeFunction) fn;
                     workQueue.schedule(new RegisterNodeWork(ClusterControllerService.this, rnf.getNodeRegistration()));
                     return;
                 }
 
                 case UNREGISTER_NODE: {
-                    ClusterControllerFunctions.UnregisterNodeFunction unf = (ClusterControllerFunctions.UnregisterNodeFunction) fn;
+                    CCNCFunctions.UnregisterNodeFunction unf = (CCNCFunctions.UnregisterNodeFunction) fn;
                     workQueue.schedule(new UnregisterNodeWork(ClusterControllerService.this, unf.getNodeId()));
                     return;
                 }
 
                 case NODE_HEARTBEAT: {
-                    ClusterControllerFunctions.NodeHeartbeatFunction nhf = (ClusterControllerFunctions.NodeHeartbeatFunction) fn;
+                    CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
                     workQueue.schedule(new NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(), nhf
                             .getHeartbeatData()));
                     return;
                 }
 
                 case NOTIFY_JOBLET_CLEANUP: {
-                    ClusterControllerFunctions.NotifyJobletCleanupFunction njcf = (ClusterControllerFunctions.NotifyJobletCleanupFunction) fn;
+                    CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
                     workQueue.schedule(new JobletCleanupNotificationWork(ClusterControllerService.this,
                             njcf.getJobId(), njcf.getNodeId()));
                     return;
                 }
 
                 case REPORT_PROFILE: {
-                    ClusterControllerFunctions.ReportProfileFunction rpf = (ClusterControllerFunctions.ReportProfileFunction) fn;
+                    CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
                     workQueue.schedule(new ReportProfilesWork(ClusterControllerService.this, rpf.getProfiles()));
                     return;
                 }
 
                 case NOTIFY_TASK_COMPLETE: {
-                    ClusterControllerFunctions.NotifyTaskCompleteFunction ntcf = (ClusterControllerFunctions.NotifyTaskCompleteFunction) fn;
+                    CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
                     workQueue.schedule(new TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(), ntcf
                             .getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
                     return;
                 }
                 case NOTIFY_TASK_FAILURE: {
-                    ClusterControllerFunctions.NotifyTaskFailureFunction ntff = (ClusterControllerFunctions.NotifyTaskFailureFunction) fn;
+                    CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
                     workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(), ntff
                             .getTaskId(), ntff.getDetails(), ntff.getDetails()));
                     return;
                 }
 
                 case REGISTER_PARTITION_PROVIDER: {
-                    ClusterControllerFunctions.RegisterPartitionProviderFunction rppf = (ClusterControllerFunctions.RegisterPartitionProviderFunction) fn;
+                    CCNCFunctions.RegisterPartitionProviderFunction rppf = (CCNCFunctions.RegisterPartitionProviderFunction) fn;
                     workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this, rppf
                             .getPartitionDescriptor()));
                     return;
                 }
 
                 case REGISTER_PARTITION_REQUEST: {
-                    ClusterControllerFunctions.RegisterPartitionRequestFunction rprf = (ClusterControllerFunctions.RegisterPartitionRequestFunction) fn;
+                    CCNCFunctions.RegisterPartitionRequestFunction rprf = (CCNCFunctions.RegisterPartitionRequestFunction) fn;
                     workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this, rprf
                             .getPartitionRequest()));
                     return;
                 }
 
                 case APPLICATION_STATE_CHANGE_RESPONSE: {
-                    ClusterControllerFunctions.ApplicationStateChangeResponseFunction astrf = (ClusterControllerFunctions.ApplicationStateChangeResponseFunction) fn;
+                    CCNCFunctions.ApplicationStateChangeResponseFunction astrf = (CCNCFunctions.ApplicationStateChangeResponseFunction) fn;
                     workQueue.schedule(new ApplicationStateChangeWork(ClusterControllerService.this, astrf));
                     return;
                 }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
index 2d8ae85..f6271fe 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
@@ -6,7 +6,7 @@
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 
@@ -14,10 +14,10 @@
     private static final Logger LOGGER = Logger.getLogger(ApplicationStateChangeWork.class.getName());
 
     private final ClusterControllerService ccs;
-    private final ClusterControllerFunctions.ApplicationStateChangeResponseFunction ascrf;
+    private final CCNCFunctions.ApplicationStateChangeResponseFunction ascrf;
 
     public ApplicationStateChangeWork(ClusterControllerService ccs,
-            ClusterControllerFunctions.ApplicationStateChangeResponseFunction ascrf) {
+            CCNCFunctions.ApplicationStateChangeResponseFunction ascrf) {
         this.ccs = ccs;
         this.ascrf = ascrf;
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index 49a3be9..9e8d130 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -39,8 +39,7 @@
         Map<String, NodeControllerInfo> result = new LinkedHashMap<String, NodeControllerInfo>();
         Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
         for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
-            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), e.getValue().getDataPort().getIpAddress(),
-                    NodeStatus.ALIVE));
+            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE));
         }
         callback.setValue(result);
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
index cba4b4b5..e4e0aae 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
@@ -25,7 +25,7 @@
 import edu.uci.ics.hyracks.control.common.base.INodeController;
 import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
-import edu.uci.ics.hyracks.control.common.ipc.NodeControllerFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
 import edu.uci.ics.hyracks.control.common.ipc.NodeControllerRemoteProxy;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
@@ -46,7 +46,7 @@
         String id = reg.getNodeId();
 
         IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
-        NodeControllerFunctions.NodeRegistrationResult result = null;
+        CCNCFunctions.NodeRegistrationResult result = null;
         try {
             INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
 
@@ -69,9 +69,9 @@
             params.setClusterControllerInfo(ccs.getClusterControllerInfo());
             params.setHeartbeatPeriod(ccs.getCCConfig().heartbeatPeriod);
             params.setProfileDumpPeriod(ccs.getCCConfig().profileDumpPeriod);
-            result = new NodeControllerFunctions.NodeRegistrationResult(params, null);
+            result = new CCNCFunctions.NodeRegistrationResult(params, null);
         } catch (Exception e) {
-            result = new NodeControllerFunctions.NodeRegistrationResult(null, e);
+            result = new CCNCFunctions.NodeRegistrationResult(null, e);
         }
         ncIPCHandle.send(-1, result, null);
     }
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
new file mode 100644
index 0000000..9ea4636
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -0,0 +1,792 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+
+public class CCNCFunctions {
+    private static final int FID_CODE_SIZE = 1;
+
+    public enum FunctionId {
+        REGISTER_NODE,
+        UNREGISTER_NODE,
+        NOTIFY_JOBLET_CLEANUP,
+        NOTIFY_TASK_COMPLETE,
+        NOTIFY_TASK_FAILURE,
+        NODE_HEARTBEAT,
+        REPORT_PROFILE,
+        REGISTER_PARTITION_PROVIDER,
+        REGISTER_PARTITION_REQUEST,
+        APPLICATION_STATE_CHANGE_RESPONSE,
+
+        NODE_REGISTRATION_RESULT,
+        START_TASKS,
+        ABORT_TASKS,
+        CLEANUP_JOBLET,
+        CREATE_APPLICATION,
+        DESTROY_APPLICATION,
+        REPORT_PARTITION_AVAILABILITY,
+
+        OTHER
+    }
+
+    public static abstract class Function implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        public abstract FunctionId getFunctionId();
+    }
+
+    public static class RegisterNodeFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final NodeRegistration reg;
+
+        public RegisterNodeFunction(NodeRegistration reg) {
+            this.reg = reg;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REGISTER_NODE;
+        }
+
+        public NodeRegistration getNodeRegistration() {
+            return reg;
+        }
+    }
+
+    public static class UnregisterNodeFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+
+        public UnregisterNodeFunction(String nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.UNREGISTER_NODE;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    public static class NotifyTaskCompleteFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final TaskAttemptId taskId;
+        private final String nodeId;
+        private final TaskProfile statistics;
+
+        public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
+            this.jobId = jobId;
+            this.taskId = taskId;
+            this.nodeId = nodeId;
+            this.statistics = statistics;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NOTIFY_TASK_COMPLETE;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public TaskAttemptId getTaskId() {
+            return taskId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public TaskProfile getStatistics() {
+            return statistics;
+        }
+    }
+
+    public static class NotifyTaskFailureFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final TaskAttemptId taskId;
+        private final String nodeId;
+        private final String details;
+
+        public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
+            this.jobId = jobId;
+            this.taskId = taskId;
+            this.nodeId = nodeId;
+            this.details = details;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NOTIFY_TASK_FAILURE;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public TaskAttemptId getTaskId() {
+            return taskId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public String getDetails() {
+            return details;
+        }
+    }
+
+    public static class NotifyJobletCleanupFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final String nodeId;
+
+        public NotifyJobletCleanupFunction(JobId jobId, String nodeId) {
+            this.jobId = jobId;
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NOTIFY_JOBLET_CLEANUP;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    public static class NodeHeartbeatFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+        private final HeartbeatData hbData;
+
+        public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
+            this.nodeId = nodeId;
+            this.hbData = hbData;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NODE_HEARTBEAT;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public HeartbeatData getHeartbeatData() {
+            return hbData;
+        }
+    }
+
+    public static class ReportProfileFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+        private final List<JobProfile> profiles;
+
+        public ReportProfileFunction(String nodeId, List<JobProfile> profiles) {
+            this.nodeId = nodeId;
+            this.profiles = profiles;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REPORT_PROFILE;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public List<JobProfile> getProfiles() {
+            return profiles;
+        }
+    }
+
+    public static class RegisterPartitionProviderFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final PartitionDescriptor partitionDescriptor;
+
+        public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) {
+            this.partitionDescriptor = partitionDescriptor;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REGISTER_PARTITION_PROVIDER;
+        }
+
+        public PartitionDescriptor getPartitionDescriptor() {
+            return partitionDescriptor;
+        }
+
+        public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+            DataInputStream dis = new DataInputStream(bais);
+
+            // Read PartitionId
+            PartitionId pid = readPartitionId(dis);
+
+            // Read nodeId
+            String nodeId = dis.readUTF();
+
+            // Read TaskAttemptId
+            TaskAttemptId taId = readTaskAttemptId(dis);
+
+            // Read reusable flag
+            boolean reusable = dis.readBoolean();
+
+            // Read Partition State
+            PartitionState state = readPartitionState(dis);
+
+            PartitionDescriptor pd = new PartitionDescriptor(pid, nodeId, taId, reusable);
+            pd.setState(state);
+            return new RegisterPartitionProviderFunction(pd);
+        }
+
+        public static void serialize(OutputStream out, Object object) throws Exception {
+            RegisterPartitionProviderFunction fn = (RegisterPartitionProviderFunction) object;
+
+            DataOutputStream dos = new DataOutputStream(out);
+
+            PartitionDescriptor pd = fn.getPartitionDescriptor();
+
+            // Write PartitionId
+            writePartitionId(dos, pd.getPartitionId());
+
+            // Write nodeId
+            dos.writeUTF(pd.getNodeId());
+
+            // Write TaskAttemptId
+            writeTaskAttemptId(dos, pd.getProducingTaskAttemptId());
+
+            // Write reusable flag
+            dos.writeBoolean(pd.isReusable());
+
+            // Write Partition State
+            writePartitionState(dos, pd.getState());
+        }
+    }
+
+    public static class RegisterPartitionRequestFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final PartitionRequest partitionRequest;
+
+        public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) {
+            this.partitionRequest = partitionRequest;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REGISTER_PARTITION_REQUEST;
+        }
+
+        public PartitionRequest getPartitionRequest() {
+            return partitionRequest;
+        }
+
+        public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+            DataInputStream dis = new DataInputStream(bais);
+
+            // Read PartitionId
+            PartitionId pid = readPartitionId(dis);
+
+            // Read nodeId
+            String nodeId = dis.readUTF();
+
+            // Read TaskAttemptId
+            TaskAttemptId taId = readTaskAttemptId(dis);
+
+            // Read Partition State
+            PartitionState state = readPartitionState(dis);
+
+            PartitionRequest pr = new PartitionRequest(pid, nodeId, taId, state);
+            return new RegisterPartitionRequestFunction(pr);
+        }
+
+        public static void serialize(OutputStream out, Object object) throws Exception {
+            RegisterPartitionRequestFunction fn = (RegisterPartitionRequestFunction) object;
+
+            DataOutputStream dos = new DataOutputStream(out);
+
+            PartitionRequest pr = fn.getPartitionRequest();
+
+            // Write PartitionId
+            writePartitionId(dos, pr.getPartitionId());
+
+            // Write nodeId
+            dos.writeUTF(pr.getNodeId());
+
+            // Write TaskAttemptId
+            writeTaskAttemptId(dos, pr.getRequestingTaskAttemptId());
+
+            // Write Partition State
+            writePartitionState(dos, pr.getMinimumState());
+        }
+    }
+
+    public static class ApplicationStateChangeResponseFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+        private final String appName;
+        private final ApplicationStatus status;
+
+        public ApplicationStateChangeResponseFunction(String nodeId, String appName, ApplicationStatus status) {
+            this.nodeId = nodeId;
+            this.appName = appName;
+            this.status = status;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.APPLICATION_STATE_CHANGE_RESPONSE;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public String getApplicationName() {
+            return appName;
+        }
+
+        public ApplicationStatus getStatus() {
+            return status;
+        }
+    }
+
+    public static class NodeRegistrationResult extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final NodeParameters params;
+
+        private final Exception exception;
+
+        public NodeRegistrationResult(NodeParameters params, Exception exception) {
+            this.params = params;
+            this.exception = exception;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NODE_REGISTRATION_RESULT;
+        }
+
+        public NodeParameters getNodeParameters() {
+            return params;
+        }
+
+        public Exception getException() {
+            return exception;
+        }
+    }
+
+    public static class StartTasksFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+        private final JobId jobId;
+        private final byte[] planBytes;
+        private final List<TaskAttemptDescriptor> taskDescriptors;
+        private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
+
+        public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
+                List<TaskAttemptDescriptor> taskDescriptors,
+                Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
+            this.appName = appName;
+            this.jobId = jobId;
+            this.planBytes = planBytes;
+            this.taskDescriptors = taskDescriptors;
+            this.connectorPolicies = connectorPolicies;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.START_TASKS;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public byte[] getPlanBytes() {
+            return planBytes;
+        }
+
+        public List<TaskAttemptDescriptor> getTaskDescriptors() {
+            return taskDescriptors;
+        }
+
+        public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
+            return connectorPolicies;
+        }
+    }
+
+    public static class AbortTasksFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final List<TaskAttemptId> tasks;
+
+        public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) {
+            this.jobId = jobId;
+            this.tasks = tasks;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.ABORT_TASKS;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public List<TaskAttemptId> getTasks() {
+            return tasks;
+        }
+    }
+
+    public static class CleanupJobletFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final JobStatus status;
+
+        public CleanupJobletFunction(JobId jobId, JobStatus status) {
+            this.jobId = jobId;
+            this.status = status;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CLEANUP_JOBLET;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public JobStatus getStatus() {
+            return status;
+        }
+    }
+
+    public static class CreateApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+        private final boolean deployHar;
+        private final byte[] serializedDistributedState;
+
+        public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) {
+            this.appName = appName;
+            this.deployHar = deployHar;
+            this.serializedDistributedState = serializedDistributedState;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CREATE_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+        public boolean isDeployHar() {
+            return deployHar;
+        }
+
+        public byte[] getSerializedDistributedState() {
+            return serializedDistributedState;
+        }
+    }
+
+    public static class DestroyApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public DestroyApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DESTROY_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class ReportPartitionAvailabilityFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final PartitionId pid;
+        private final NetworkAddress networkAddress;
+
+        public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) {
+            this.pid = pid;
+            this.networkAddress = networkAddress;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REPORT_PARTITION_AVAILABILITY;
+        }
+
+        public PartitionId getPartitionId() {
+            return pid;
+        }
+
+        public NetworkAddress getNetworkAddress() {
+            return networkAddress;
+        }
+
+        public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+            DataInputStream dis = new DataInputStream(bais);
+
+            // Read PartitionId
+            PartitionId pid = readPartitionId(dis);
+
+            // Read NetworkAddress
+            NetworkAddress networkAddress = readNetworkAddress(dis);
+
+            return new ReportPartitionAvailabilityFunction(pid, networkAddress);
+        }
+
+        public static void serialize(OutputStream out, Object object) throws Exception {
+            ReportPartitionAvailabilityFunction fn = (ReportPartitionAvailabilityFunction) object;
+
+            DataOutputStream dos = new DataOutputStream(out);
+
+            // Write PartitionId
+            writePartitionId(dos, fn.getPartitionId());
+
+            // Write NetworkAddress
+            writeNetworkAddress(dos, fn.getNetworkAddress());
+        }
+    }
+
+    public static class SerializerDeserializer implements IPayloadSerializerDeserializer {
+        private final JavaSerializationBasedPayloadSerializerDeserializer javaSerde;
+
+        public SerializerDeserializer() {
+            javaSerde = new JavaSerializationBasedPayloadSerializerDeserializer();
+        }
+
+        @Override
+        public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
+            if (length < FID_CODE_SIZE) {
+                throw new IllegalStateException("Message size too small: " + length);
+            }
+            byte fid = buffer.get();
+            return deserialize(fid, buffer, length - FID_CODE_SIZE);
+        }
+
+        @Override
+        public Exception deserializeException(ByteBuffer buffer, int length) throws Exception {
+            if (length < FID_CODE_SIZE) {
+                throw new IllegalStateException("Message size too small: " + length);
+            }
+            byte fid = buffer.get();
+            if (fid != FunctionId.OTHER.ordinal()) {
+                throw new IllegalStateException("Expected FID for OTHER, found: " + fid);
+            }
+            return (Exception) deserialize(fid, buffer, length - FID_CODE_SIZE);
+        }
+
+        @Override
+        public byte[] serializeObject(Object object) throws Exception {
+            if (object instanceof Function) {
+                Function fn = (Function) object;
+                return serialize(object, (byte) fn.getFunctionId().ordinal());
+            } else {
+                return serialize(object, (byte) FunctionId.OTHER.ordinal());
+            }
+        }
+
+        @Override
+        public byte[] serializeException(Exception object) throws Exception {
+            return serialize(object, (byte) FunctionId.OTHER.ordinal());
+        }
+
+        private byte[] serialize(Object object, byte fid) throws Exception {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            baos.write(fid);
+            serialize(baos, object, fid);
+            JavaSerializationBasedPayloadSerializerDeserializer.serialize(baos, object);
+            baos.close();
+            return baos.toByteArray();
+        }
+
+        private void serialize(OutputStream out, Object object, byte fid) throws Exception {
+            switch (FunctionId.values()[fid]) {
+                case REGISTER_PARTITION_PROVIDER:
+                    RegisterPartitionProviderFunction.serialize(out, object);
+                    return;
+
+                case REGISTER_PARTITION_REQUEST:
+                    RegisterPartitionRequestFunction.serialize(out, object);
+                    return;
+
+                case REPORT_PARTITION_AVAILABILITY:
+                    ReportPartitionAvailabilityFunction.serialize(out, object);
+                    return;
+            }
+            JavaSerializationBasedPayloadSerializerDeserializer.serialize(out, object);
+        }
+
+        private Object deserialize(byte fid, ByteBuffer buffer, int length) throws Exception {
+            switch (FunctionId.values()[fid]) {
+                case REGISTER_PARTITION_PROVIDER:
+                    return RegisterPartitionProviderFunction.deserialize(buffer, length);
+
+                case REGISTER_PARTITION_REQUEST:
+                    return RegisterPartitionRequestFunction.deserialize(buffer, length);
+
+                case REPORT_PARTITION_AVAILABILITY:
+                    return ReportPartitionAvailabilityFunction.deserialize(buffer, length);
+            }
+
+            return javaSerde.deserializeObject(buffer, length);
+        }
+    }
+
+    private static PartitionId readPartitionId(DataInputStream dis) throws IOException {
+        long jobId = dis.readLong();
+        int cdid = dis.readInt();
+        int senderIndex = dis.readInt();
+        int receiverIndex = dis.readInt();
+        PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, receiverIndex);
+        return pid;
+    }
+
+    private static void writePartitionId(DataOutputStream dos, PartitionId pid) throws IOException {
+        dos.writeLong(pid.getJobId().getId());
+        dos.writeInt(pid.getConnectorDescriptorId().getId());
+        dos.writeInt(pid.getSenderIndex());
+        dos.writeInt(pid.getReceiverIndex());
+    }
+
+    private static TaskAttemptId readTaskAttemptId(DataInputStream dis) throws IOException {
+        int odid = dis.readInt();
+        int aid = dis.readInt();
+        int partition = dis.readInt();
+        int attempt = dis.readInt();
+        TaskAttemptId taId = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid),
+                partition), attempt);
+        return taId;
+    }
+
+    private static void writeTaskAttemptId(DataOutputStream dos, TaskAttemptId taId) throws IOException {
+        TaskId tid = taId.getTaskId();
+        ActivityId aid = tid.getActivityId();
+        OperatorDescriptorId odId = aid.getOperatorDescriptorId();
+        dos.writeInt(odId.getId());
+        dos.writeInt(aid.getLocalId());
+        dos.writeInt(tid.getPartition());
+        dos.writeInt(taId.getAttempt());
+    }
+
+    private static PartitionState readPartitionState(DataInputStream dis) throws IOException {
+        PartitionState state = PartitionState.values()[dis.readInt()];
+        return state;
+    }
+
+    private static void writePartitionState(DataOutputStream dos, PartitionState state) throws IOException {
+        dos.writeInt(state.ordinal());
+    }
+
+    private static NetworkAddress readNetworkAddress(DataInputStream dis) throws IOException {
+        int bLen = dis.readInt();
+        byte[] ipAddress = new byte[bLen];
+        dis.read(ipAddress);
+        int port = dis.readInt();
+        NetworkAddress networkAddress = new NetworkAddress(ipAddress, port);
+        return networkAddress;
+    }
+
+    private static void writeNetworkAddress(DataOutputStream dos, NetworkAddress networkAddress) throws IOException {
+        byte[] ipAddress = networkAddress.getIpAddress();
+        dos.writeInt(ipAddress.length);
+        dos.write(ipAddress);
+        dos.writeInt(networkAddress.getPort());
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
deleted file mode 100644
index 2b6792e..0000000
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.common.ipc;
-
-import java.io.Serializable;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
-import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
-import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
-import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
-import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
-import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
-import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
-
-public class ClusterControllerFunctions {
-    public enum FunctionId {
-        REGISTER_NODE,
-        UNREGISTER_NODE,
-        NOTIFY_TASK_COMPLETE,
-        NOTIFY_TASK_FAILURE,
-        NOTIFY_JOBLET_CLEANUP,
-        NODE_HEARTBEAT,
-        REPORT_PROFILE,
-        REGISTER_PARTITION_PROVIDER,
-        REGISTER_PARTITION_REQUEST,
-        APPLICATION_STATE_CHANGE_RESPONSE,
-    }
-
-    public static abstract class Function implements Serializable {
-        private static final long serialVersionUID = 1L;
-
-        public abstract FunctionId getFunctionId();
-    }
-
-    public static class RegisterNodeFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final NodeRegistration reg;
-
-        public RegisterNodeFunction(NodeRegistration reg) {
-            this.reg = reg;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.REGISTER_NODE;
-        }
-
-        public NodeRegistration getNodeRegistration() {
-            return reg;
-        }
-    }
-
-    public static class UnregisterNodeFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final String nodeId;
-
-        public UnregisterNodeFunction(String nodeId) {
-            this.nodeId = nodeId;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.UNREGISTER_NODE;
-        }
-
-        public String getNodeId() {
-            return nodeId;
-        }
-    }
-
-    public static class NotifyTaskCompleteFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final JobId jobId;
-        private final TaskAttemptId taskId;
-        private final String nodeId;
-        private final TaskProfile statistics;
-
-        public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
-            this.jobId = jobId;
-            this.taskId = taskId;
-            this.nodeId = nodeId;
-            this.statistics = statistics;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.NOTIFY_TASK_COMPLETE;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public TaskAttemptId getTaskId() {
-            return taskId;
-        }
-
-        public String getNodeId() {
-            return nodeId;
-        }
-
-        public TaskProfile getStatistics() {
-            return statistics;
-        }
-    }
-
-    public static class NotifyTaskFailureFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final JobId jobId;
-        private final TaskAttemptId taskId;
-        private final String nodeId;
-        private final String details;
-
-        public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
-            this.jobId = jobId;
-            this.taskId = taskId;
-            this.nodeId = nodeId;
-            this.details = details;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.NOTIFY_TASK_FAILURE;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public TaskAttemptId getTaskId() {
-            return taskId;
-        }
-
-        public String getNodeId() {
-            return nodeId;
-        }
-
-        public String getDetails() {
-            return details;
-        }
-    }
-
-    public static class NotifyJobletCleanupFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final JobId jobId;
-        private final String nodeId;
-
-        public NotifyJobletCleanupFunction(JobId jobId, String nodeId) {
-            this.jobId = jobId;
-            this.nodeId = nodeId;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.NOTIFY_JOBLET_CLEANUP;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public String getNodeId() {
-            return nodeId;
-        }
-    }
-
-    public static class NodeHeartbeatFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final String nodeId;
-        private final HeartbeatData hbData;
-
-        public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
-            this.nodeId = nodeId;
-            this.hbData = hbData;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.NODE_HEARTBEAT;
-        }
-
-        public String getNodeId() {
-            return nodeId;
-        }
-
-        public HeartbeatData getHeartbeatData() {
-            return hbData;
-        }
-    }
-
-    public static class ReportProfileFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final String nodeId;
-        private final List<JobProfile> profiles;
-
-        public ReportProfileFunction(String nodeId, List<JobProfile> profiles) {
-            this.nodeId = nodeId;
-            this.profiles = profiles;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.REPORT_PROFILE;
-        }
-
-        public String getNodeId() {
-            return nodeId;
-        }
-
-        public List<JobProfile> getProfiles() {
-            return profiles;
-        }
-    }
-
-    public static class RegisterPartitionProviderFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final PartitionDescriptor partitionDescriptor;
-
-        public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) {
-            this.partitionDescriptor = partitionDescriptor;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.REGISTER_PARTITION_PROVIDER;
-        }
-
-        public PartitionDescriptor getPartitionDescriptor() {
-            return partitionDescriptor;
-        }
-    }
-
-    public static class RegisterPartitionRequestFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final PartitionRequest partitionRequest;
-
-        public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) {
-            this.partitionRequest = partitionRequest;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.REGISTER_PARTITION_REQUEST;
-        }
-
-        public PartitionRequest getPartitionRequest() {
-            return partitionRequest;
-        }
-    }
-
-    public static class ApplicationStateChangeResponseFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final String nodeId;
-        private final String appName;
-        private final ApplicationStatus status;
-
-        public ApplicationStateChangeResponseFunction(String nodeId, String appName, ApplicationStatus status) {
-            this.nodeId = nodeId;
-            this.appName = appName;
-            this.status = status;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.APPLICATION_STATE_CHANGE_RESPONSE;
-        }
-
-        public String getNodeId() {
-            return nodeId;
-        }
-
-        public String getApplicationName() {
-            return appName;
-        }
-
-        public ApplicationStatus getStatus() {
-            return status;
-        }
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index b7dd0af..a0dabdd 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -37,13 +37,13 @@
 
     @Override
     public void registerNode(NodeRegistration reg) throws Exception {
-        ClusterControllerFunctions.RegisterNodeFunction fn = new ClusterControllerFunctions.RegisterNodeFunction(reg);
+        CCNCFunctions.RegisterNodeFunction fn = new CCNCFunctions.RegisterNodeFunction(reg);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void unregisterNode(String nodeId) throws Exception {
-        ClusterControllerFunctions.UnregisterNodeFunction fn = new ClusterControllerFunctions.UnregisterNodeFunction(
+        CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(
                 nodeId);
         ipcHandle.send(-1, fn, null);
     }
@@ -51,56 +51,56 @@
     @Override
     public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
             throws Exception {
-        ClusterControllerFunctions.NotifyTaskCompleteFunction fn = new ClusterControllerFunctions.NotifyTaskCompleteFunction(
+        CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(
                 jobId, taskId, nodeId, statistics);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
-        ClusterControllerFunctions.NotifyTaskFailureFunction fn = new ClusterControllerFunctions.NotifyTaskFailureFunction(
+        CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(
                 jobId, taskId, nodeId, details);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
-        ClusterControllerFunctions.NotifyJobletCleanupFunction fn = new ClusterControllerFunctions.NotifyJobletCleanupFunction(
+        CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(
                 jobId, nodeId);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
-        ClusterControllerFunctions.NodeHeartbeatFunction fn = new ClusterControllerFunctions.NodeHeartbeatFunction(id,
+        CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id,
                 hbData);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
-        ClusterControllerFunctions.ReportProfileFunction fn = new ClusterControllerFunctions.ReportProfileFunction(id,
+        CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id,
                 profiles);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
-        ClusterControllerFunctions.RegisterPartitionProviderFunction fn = new ClusterControllerFunctions.RegisterPartitionProviderFunction(
+        CCNCFunctions.RegisterPartitionProviderFunction fn = new CCNCFunctions.RegisterPartitionProviderFunction(
                 partitionDescriptor);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
-        ClusterControllerFunctions.RegisterPartitionRequestFunction fn = new ClusterControllerFunctions.RegisterPartitionRequestFunction(
+        CCNCFunctions.RegisterPartitionRequestFunction fn = new CCNCFunctions.RegisterPartitionRequestFunction(
                 partitionRequest);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception {
-        ClusterControllerFunctions.ApplicationStateChangeResponseFunction fn = new ClusterControllerFunctions.ApplicationStateChangeResponseFunction(
+        CCNCFunctions.ApplicationStateChangeResponseFunction fn = new CCNCFunctions.ApplicationStateChangeResponseFunction(
                 nodeId, appName, status);
         ipcHandle.send(-1, fn, null);
     }
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
deleted file mode 100644
index b72d4e5..0000000
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.common.ipc;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
-import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
-
-public class NodeControllerFunctions {
-    public enum FunctionId {
-        NODE_REGISTRATION_RESULT,
-        START_TASKS,
-        ABORT_TASKS,
-        CLEANUP_JOBLET,
-        CREATE_APPLICATION,
-        DESTROY_APPLICATION,
-        REPORT_PARTITION_AVAILABILITY
-    }
-
-    public static abstract class Function implements Serializable {
-        private static final long serialVersionUID = 1L;
-
-        public abstract FunctionId getFunctionId();
-    }
-
-    public static class NodeRegistrationResult extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final NodeParameters params;
-
-        private final Exception exception;
-
-        public NodeRegistrationResult(NodeParameters params, Exception exception) {
-            this.params = params;
-            this.exception = exception;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.NODE_REGISTRATION_RESULT;
-        }
-
-        public NodeParameters getNodeParameters() {
-            return params;
-        }
-
-        public Exception getException() {
-            return exception;
-        }
-    }
-
-    public static class StartTasksFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final String appName;
-        private final JobId jobId;
-        private final byte[] planBytes;
-        private final List<TaskAttemptDescriptor> taskDescriptors;
-        private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
-
-        public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
-                List<TaskAttemptDescriptor> taskDescriptors,
-                Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
-            this.appName = appName;
-            this.jobId = jobId;
-            this.planBytes = planBytes;
-            this.taskDescriptors = taskDescriptors;
-            this.connectorPolicies = connectorPolicies;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.START_TASKS;
-        }
-
-        public String getAppName() {
-            return appName;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public byte[] getPlanBytes() {
-            return planBytes;
-        }
-
-        public List<TaskAttemptDescriptor> getTaskDescriptors() {
-            return taskDescriptors;
-        }
-
-        public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
-            return connectorPolicies;
-        }
-    }
-
-    public static class AbortTasksFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final JobId jobId;
-        private final List<TaskAttemptId> tasks;
-
-        public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) {
-            this.jobId = jobId;
-            this.tasks = tasks;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.ABORT_TASKS;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public List<TaskAttemptId> getTasks() {
-            return tasks;
-        }
-    }
-
-    public static class CleanupJobletFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final JobId jobId;
-        private final JobStatus status;
-
-        public CleanupJobletFunction(JobId jobId, JobStatus status) {
-            this.jobId = jobId;
-            this.status = status;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.CLEANUP_JOBLET;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public JobStatus getStatus() {
-            return status;
-        }
-    }
-
-    public static class CreateApplicationFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final String appName;
-        private final boolean deployHar;
-        private final byte[] serializedDistributedState;
-
-        public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) {
-            this.appName = appName;
-            this.deployHar = deployHar;
-            this.serializedDistributedState = serializedDistributedState;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.CREATE_APPLICATION;
-        }
-
-        public String getAppName() {
-            return appName;
-        }
-
-        public boolean isDeployHar() {
-            return deployHar;
-        }
-
-        public byte[] getSerializedDistributedState() {
-            return serializedDistributedState;
-        }
-    }
-
-    public static class DestroyApplicationFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final String appName;
-
-        public DestroyApplicationFunction(String appName) {
-            this.appName = appName;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.DESTROY_APPLICATION;
-        }
-
-        public String getAppName() {
-            return appName;
-        }
-    }
-
-    public static class ReportPartitionAvailabilityFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final PartitionId pid;
-        private final NetworkAddress networkAddress;
-
-        public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) {
-            this.pid = pid;
-            this.networkAddress = networkAddress;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.REPORT_PARTITION_AVAILABILITY;
-        }
-
-        public PartitionId getPartitionId() {
-            return pid;
-        }
-
-        public NetworkAddress getNetworkAddress() {
-            return networkAddress;
-        }
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 474fb93..961a9e3 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -38,20 +38,20 @@
     @Override
     public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
             Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
-        NodeControllerFunctions.StartTasksFunction stf = new NodeControllerFunctions.StartTasksFunction(appName, jobId,
+        CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(appName, jobId,
                 planBytes, taskDescriptors, connectorPolicies);
         ipcHandle.send(-1, stf, null);
     }
 
     @Override
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
-        NodeControllerFunctions.AbortTasksFunction atf = new NodeControllerFunctions.AbortTasksFunction(jobId, tasks);
+        CCNCFunctions.AbortTasksFunction atf = new CCNCFunctions.AbortTasksFunction(jobId, tasks);
         ipcHandle.send(-1, atf, null);
     }
 
     @Override
     public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
-        NodeControllerFunctions.CleanupJobletFunction cjf = new NodeControllerFunctions.CleanupJobletFunction(jobId,
+        CCNCFunctions.CleanupJobletFunction cjf = new CCNCFunctions.CleanupJobletFunction(jobId,
                 status);
         ipcHandle.send(-1, cjf, null);
     }
@@ -59,21 +59,21 @@
     @Override
     public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
             throws Exception {
-        NodeControllerFunctions.CreateApplicationFunction caf = new NodeControllerFunctions.CreateApplicationFunction(
+        CCNCFunctions.CreateApplicationFunction caf = new CCNCFunctions.CreateApplicationFunction(
                 appName, deployHar, serializedDistributedState);
         ipcHandle.send(-1, caf, null);
     }
 
     @Override
     public void destroyApplication(String appName) throws Exception {
-        NodeControllerFunctions.DestroyApplicationFunction daf = new NodeControllerFunctions.DestroyApplicationFunction(
+        CCNCFunctions.DestroyApplicationFunction daf = new CCNCFunctions.DestroyApplicationFunction(
                 appName);
         ipcHandle.send(-1, daf, null);
     }
 
     @Override
     public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
-        NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = new NodeControllerFunctions.ReportPartitionAvailabilityFunction(
+        CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = new CCNCFunctions.ReportPartitionAvailabilityFunction(
                 pid, networkAddress);
         ipcHandle.send(-1, rpaf, null);
     }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index ba1d970..0f512a7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -50,8 +50,8 @@
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
 import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
-import edu.uci.ics.hyracks.control.common.ipc.NodeControllerFunctions;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.work.FutureValue;
 import edu.uci.ics.hyracks.control.common.work.WorkQueue;
@@ -124,7 +124,8 @@
         id = ncConfig.nodeId;
         executor = Executors.newCachedThreadPool();
         NodeControllerIPCI ipci = new NodeControllerIPCI();
-        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci);
+        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
+                new CCNCFunctions.SerializerDeserializer());
         this.ctx = new RootHyracksContext(ncConfig.frameSize, new IOManager(getDevices(ncConfig.ioDevices), executor));
         if (id == null) {
             throw new Exception("id not set");
@@ -347,49 +348,49 @@
     private final class NodeControllerIPCI implements IIPCI {
         @Override
         public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
-            NodeControllerFunctions.Function fn = (NodeControllerFunctions.Function) payload;
+            CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
             switch (fn.getFunctionId()) {
                 case START_TASKS: {
-                    NodeControllerFunctions.StartTasksFunction stf = (NodeControllerFunctions.StartTasksFunction) fn;
+                    CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
                     queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getAppName(), stf.getJobId(), stf
                             .getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies()));
                     return;
                 }
 
                 case ABORT_TASKS: {
-                    NodeControllerFunctions.AbortTasksFunction atf = (NodeControllerFunctions.AbortTasksFunction) fn;
+                    CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
                     queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
                     return;
                 }
 
                 case CLEANUP_JOBLET: {
-                    NodeControllerFunctions.CleanupJobletFunction cjf = (NodeControllerFunctions.CleanupJobletFunction) fn;
+                    CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
                     queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
                     return;
                 }
 
                 case CREATE_APPLICATION: {
-                    NodeControllerFunctions.CreateApplicationFunction caf = (NodeControllerFunctions.CreateApplicationFunction) fn;
+                    CCNCFunctions.CreateApplicationFunction caf = (CCNCFunctions.CreateApplicationFunction) fn;
                     queue.schedule(new CreateApplicationWork(NodeControllerService.this, caf.getAppName(), caf
                             .isDeployHar(), caf.getSerializedDistributedState()));
                     return;
                 }
 
                 case DESTROY_APPLICATION: {
-                    NodeControllerFunctions.DestroyApplicationFunction daf = (NodeControllerFunctions.DestroyApplicationFunction) fn;
+                    CCNCFunctions.DestroyApplicationFunction daf = (CCNCFunctions.DestroyApplicationFunction) fn;
                     queue.schedule(new DestroyApplicationWork(NodeControllerService.this, daf.getAppName()));
                     return;
                 }
 
                 case REPORT_PARTITION_AVAILABILITY: {
-                    NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = (NodeControllerFunctions.ReportPartitionAvailabilityFunction) fn;
+                    CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
                     queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this, rpaf
                             .getPartitionId(), rpaf.getNetworkAddress()));
                     return;
                 }
 
                 case NODE_REGISTRATION_RESULT: {
-                    NodeControllerFunctions.NodeRegistrationResult nrrf = (NodeControllerFunctions.NodeRegistrationResult) fn;
+                    CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
                     setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
                     return;
                 }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index f94be22..1a0a820 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -60,7 +60,7 @@
     public void start() throws IOException {
         md.start();
         InetSocketAddress sockAddr = md.getLocalAddress();
-        networkAddress = new NetworkAddress(sockAddr.getAddress(), sockAddr.getPort());
+        networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
     }
 
     public NetworkAddress getNetworkAddress() {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index ce3f4e0..9734567 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.nc.work;
 
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.logging.Logger;
@@ -48,8 +49,8 @@
         Joblet ji = jobletMap.get(pid.getJobId());
         if (ji != null) {
             PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
-                    ncs.getNetworkManager(), new InetSocketAddress(networkAddress.getIpAddress(),
-                            networkAddress.getPort()), pid, 5));
+                    ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
+                            .getIpAddress()), networkAddress.getPort()), pid, 5));
             ji.reportPartitionAvailability(channel);
         }
     }
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPayloadSerializerDeserializer.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPayloadSerializerDeserializer.java
new file mode 100644
index 0000000..62648ff
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPayloadSerializerDeserializer.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.nio.ByteBuffer;
+
+public interface IPayloadSerializerDeserializer {
+    public Object deserializeObject(ByteBuffer buffer, int length) throws Exception;
+
+    public Exception deserializeException(ByteBuffer buffer, int length) throws Exception;
+
+    public byte[] serializeObject(Object object) throws Exception;
+
+    public byte[] serializeException(Exception object) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index 09e8c2f..053ac6b 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -276,7 +276,7 @@
                             }
                         }
                     }
-                } catch (IOException e) {
+                } catch (Exception e) {
                     e.printStackTrace();
                 }
             }
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
index a01b1d9..3d3bc7a 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -54,6 +54,10 @@
         return remoteAddress;
     }
 
+    IPCSystem getIPCSystem() {
+        return system;
+    }
+
     void setRemoteAddress(InetSocketAddress remoteAddress) {
         this.remoteAddress = remoteAddress;
     }
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index 5f60c6d..6c9c82c 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -20,6 +20,7 @@
 
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
 import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
 
 public class IPCSystem {
@@ -27,15 +28,15 @@
 
     private final IIPCI ipci;
 
+    private final IPayloadSerializerDeserializer serde;
+
     private final AtomicLong midFactory;
 
-    public IPCSystem(InetSocketAddress socketAddress) throws IOException {
-        this(socketAddress, null);
-    }
-
-    public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci) throws IOException {
+    public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, IPayloadSerializerDeserializer serde)
+            throws IOException {
         cMgr = new IPCConnectionManager(this, socketAddress);
         this.ipci = ipci;
+        this.serde = serde;
         midFactory = new AtomicLong();
     }
 
@@ -57,6 +58,10 @@
         }
     }
 
+    IPayloadSerializerDeserializer getSerializerDeserializer() {
+        return serde;
+    }
+
     long createMessageId() {
         return midFactory.incrementAndGet();
     }
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
new file mode 100644
index 0000000..fdf8e92
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
@@ -0,0 +1,53 @@
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+
+public class JavaSerializationBasedPayloadSerializerDeserializer implements IPayloadSerializerDeserializer {
+    @Override
+    public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
+        return deserialize(buffer, length);
+    }
+
+    @Override
+    public Exception deserializeException(ByteBuffer buffer, int length) throws Exception {
+        return (Exception) deserialize(buffer, length);
+    }
+
+    @Override
+    public byte[] serializeObject(Object object) throws Exception {
+        return serialize(object);
+    }
+
+    @Override
+    public byte[] serializeException(Exception exception) throws Exception {
+        return serialize(exception);
+    }
+
+    public static void serialize(OutputStream out, Object object) throws Exception {
+        ObjectOutputStream oos = new ObjectOutputStream(out);
+        oos.writeObject(object);
+        oos.flush();
+    }
+
+    private Object deserialize(ByteBuffer buffer, int length) throws Exception {
+        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(),
+                length));
+        Object object = ois.readObject();
+        ois.close();
+        return object;
+    }
+
+    private byte[] serialize(Object object) throws Exception {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        serialize(baos, object);
+        baos.close();
+        return baos.toByteArray();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
index 6bff56f..6bb3156 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
@@ -14,13 +14,10 @@
  */
 package edu.uci.ics.hyracks.ipc.impl;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+
 class Message {
     private static final int MSG_SIZE_SIZE = 4;
 
@@ -92,29 +89,26 @@
         return buffer.remaining() >= msgSize + MSG_SIZE_SIZE;
     }
 
-    void read(ByteBuffer buffer) throws IOException, ClassNotFoundException {
+    void read(ByteBuffer buffer) throws Exception {
         assert hasMessage(buffer);
         int msgSize = buffer.getInt();
         messageId = buffer.getLong();
         requestMessageId = buffer.getLong();
         flag = buffer.get();
         int finalPosition = buffer.position() + msgSize - HEADER_SIZE;
+        int length = msgSize - HEADER_SIZE;
         try {
-            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(),
-                    msgSize - HEADER_SIZE));
-            payload = ois.readObject();
-            ois.close();
+            IPayloadSerializerDeserializer serde = ipcHandle.getIPCSystem().getSerializerDeserializer();
+            payload = flag == ERROR ? serde.deserializeException(buffer, length) : serde.deserializeObject(buffer,
+                    length);
         } finally {
             buffer.position(finalPosition);
         }
     }
 
-    boolean write(ByteBuffer buffer) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutputStream oos = new ObjectOutputStream(baos);
-        oos.writeObject(payload);
-        oos.close();
-        byte[] bytes = baos.toByteArray();
+    boolean write(ByteBuffer buffer) throws Exception {
+        IPayloadSerializerDeserializer serde = ipcHandle.getIPCSystem().getSerializerDeserializer();
+        byte[] bytes = flag == ERROR ? serde.serializeException((Exception) payload) : serde.serializeObject(payload);
         if (buffer.remaining() >= MSG_SIZE_SIZE + HEADER_SIZE + bytes.length) {
             buffer.putInt(HEADER_SIZE + bytes.length);
             buffer.putLong(messageId);
diff --git a/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
index dac93dd..5b2f660 100644
--- a/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
+++ b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.ipc.api.RPCInterface;
 import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
 import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
 
 public class IPCTest {
     @Test
@@ -80,10 +81,12 @@
                 });
             }
         };
-        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci);
+        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci,
+                new JavaSerializationBasedPayloadSerializerDeserializer());
     }
 
     private IPCSystem createClientIPCSystem(RPCInterface rpci) throws IOException {
-        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci);
+        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci,
+                new JavaSerializationBasedPayloadSerializerDeserializer());
     }
 }
\ No newline at end of file