Merged hyracks_dev_next into this branch.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1064 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index b474ee1..8c9409c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.RPCInterface;
 import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
 
 /**
  * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
@@ -65,7 +66,7 @@
     public HyracksConnection(String ccHost, int ccPort) throws Exception {
         this.ccHost = ccHost;
         RPCInterface rpci = new RPCInterface();
-        ipc = new IPCSystem(new InetSocketAddress(0), rpci);
+        ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
         ipc.start();
         IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
         this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle, rpci);
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
index 1424a63..703f74b 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
@@ -15,20 +15,16 @@
 package edu.uci.ics.hyracks.api.client;
 
 import java.io.Serializable;
-import java.net.InetAddress;
 
 public class NodeControllerInfo implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final String nodeId;
 
-    private final InetAddress ipAddress;
-
     private final NodeStatus status;
 
-    public NodeControllerInfo(String nodeId, InetAddress ipAddress, NodeStatus status) {
+    public NodeControllerInfo(String nodeId, NodeStatus status) {
         this.nodeId = nodeId;
-        this.ipAddress = ipAddress;
         this.status = status;
     }
 
@@ -36,10 +32,6 @@
         return nodeId;
     }
 
-    public InetAddress getIpAddress() {
-        return ipAddress;
-    }
-
     public NodeStatus getStatus() {
         return status;
     }
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
index 868221d..d176c3c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
@@ -15,21 +15,20 @@
 package edu.uci.ics.hyracks.api.comm;
 
 import java.io.Serializable;
-import java.net.InetAddress;
 
 public final class NetworkAddress implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final InetAddress ipAddress;
+    private final byte[] ipAddress;
 
     private final int port;
 
-    public NetworkAddress(InetAddress ipAddress, int port) {
+    public NetworkAddress(byte[] ipAddress, int port) {
         this.ipAddress = ipAddress;
         this.port = port;
     }
 
-    public InetAddress getIpAddress() {
+    public byte[] getIpAddress() {
         return ipAddress;
     }
 
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
index 75cc245..41b4c23 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -19,9 +19,9 @@
 public final class ActivityId implements Serializable {
     private static final long serialVersionUID = 1L;
     private final OperatorDescriptorId odId;
-    private final long id;
+    private final int id;
 
-    public ActivityId(OperatorDescriptorId odId, long id) {
+    public ActivityId(OperatorDescriptorId odId, int id) {
         this.odId = odId;
         this.id = id;
     }
@@ -30,7 +30,7 @@
         return odId;
     }
 
-    public long getLocalId() {
+    public int getLocalId() {
         return id;
     }
 
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index ec8ab76..c816d26 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -44,7 +44,8 @@
     public ConnectorDescriptorId getConnectorId();
 
     /**
-     * Factory method to create the send side writer that writes into this connector.
+     * Factory method to create the send side writer that writes into this
+     * connector.
      * 
      * @param ctx
      *            Context
@@ -66,7 +67,8 @@
             throws HyracksDataException;
 
     /**
-     * Factory metod to create the receive side reader that reads data from this connector.
+     * Factory metod to create the receive side reader that reads data from this
+     * connector.
      * 
      * @param ctx
      *            Context
@@ -96,12 +98,23 @@
             ICCApplicationContext appCtx);
 
     /**
-     * Indicate which consumer partitions may receive data from the given producer partition.
+     * Indicate which consumer partitions may receive data from the given
+     * producer partition.
      */
     public void indicateTargetPartitions(int nProducerPartitions, int nConsumerPartitions, int producerIndex,
             BitSet targetBitmap);
 
     /**
+     * Gets the display name.
+     */
+    public String getDisplayName();
+
+    /**
+     * Sets the display name.
+     */
+    public void setDisplayName(String displayName);
+
+    /**
      * Translate this connector descriptor to JSON.
      * 
      * @return
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
index 78847eb..c37a530 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -59,7 +59,8 @@
     public RecordDescriptor[] getOutputRecordDescriptors();
 
     /**
-     * Contributes the activity graph that describes the behavior of this operator.
+     * Contributes the activity graph that describes the behavior of this
+     * operator.
      * 
      * @param builder
      *            - graph builder
@@ -78,6 +79,16 @@
             ICCApplicationContext appCtx);
 
     /**
+     * Gets the display name.
+     */
+    public String getDisplayName();
+
+    /**
+     * Sets the display name.
+     */
+    public void setDisplayName(String displayName);
+
+    /**
      * Translates this operator descriptor to JSON.
      */
     public JSONObject toJSON() throws JSONException;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 2578b88..d3e472b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -60,8 +60,8 @@
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions.Function;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.Function;
 import edu.uci.ics.hyracks.control.common.logs.LogFile;
 import edu.uci.ics.hyracks.control.common.work.IPCResponder;
 import edu.uci.ics.hyracks.control.common.work.WorkQueue;
@@ -69,6 +69,7 @@
 import edu.uci.ics.hyracks.ipc.api.IIPCI;
 import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
 import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
 
 public class ClusterControllerService extends AbstractRemoteService {
     private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
@@ -119,9 +120,11 @@
         serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
         executor = Executors.newCachedThreadPool();
         IIPCI ccIPCI = new ClusterControllerIPCI();
-        clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI);
+        clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
+                new CCNCFunctions.SerializerDeserializer());
         IIPCI ciIPCI = new HyracksClientInterfaceIPCI();
-        clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI);
+        clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
+                new JavaSerializationBasedPayloadSerializerDeserializer());
         webServer = new WebServer(this);
         activeRunMap = new HashMap<JobId, JobRun>();
         runMapArchive = new LinkedHashMap<JobId, JobRun>() {
@@ -317,69 +320,69 @@
     private class ClusterControllerIPCI implements IIPCI {
         @Override
         public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
-            ClusterControllerFunctions.Function fn = (Function) payload;
+            CCNCFunctions.Function fn = (Function) payload;
             switch (fn.getFunctionId()) {
                 case REGISTER_NODE: {
-                    ClusterControllerFunctions.RegisterNodeFunction rnf = (ClusterControllerFunctions.RegisterNodeFunction) fn;
+                    CCNCFunctions.RegisterNodeFunction rnf = (CCNCFunctions.RegisterNodeFunction) fn;
                     workQueue.schedule(new RegisterNodeWork(ClusterControllerService.this, rnf.getNodeRegistration()));
                     return;
                 }
 
                 case UNREGISTER_NODE: {
-                    ClusterControllerFunctions.UnregisterNodeFunction unf = (ClusterControllerFunctions.UnregisterNodeFunction) fn;
+                    CCNCFunctions.UnregisterNodeFunction unf = (CCNCFunctions.UnregisterNodeFunction) fn;
                     workQueue.schedule(new UnregisterNodeWork(ClusterControllerService.this, unf.getNodeId()));
                     return;
                 }
 
                 case NODE_HEARTBEAT: {
-                    ClusterControllerFunctions.NodeHeartbeatFunction nhf = (ClusterControllerFunctions.NodeHeartbeatFunction) fn;
+                    CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
                     workQueue.schedule(new NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(), nhf
                             .getHeartbeatData()));
                     return;
                 }
 
                 case NOTIFY_JOBLET_CLEANUP: {
-                    ClusterControllerFunctions.NotifyJobletCleanupFunction njcf = (ClusterControllerFunctions.NotifyJobletCleanupFunction) fn;
+                    CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
                     workQueue.schedule(new JobletCleanupNotificationWork(ClusterControllerService.this,
                             njcf.getJobId(), njcf.getNodeId()));
                     return;
                 }
 
                 case REPORT_PROFILE: {
-                    ClusterControllerFunctions.ReportProfileFunction rpf = (ClusterControllerFunctions.ReportProfileFunction) fn;
+                    CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
                     workQueue.schedule(new ReportProfilesWork(ClusterControllerService.this, rpf.getProfiles()));
                     return;
                 }
 
                 case NOTIFY_TASK_COMPLETE: {
-                    ClusterControllerFunctions.NotifyTaskCompleteFunction ntcf = (ClusterControllerFunctions.NotifyTaskCompleteFunction) fn;
+                    CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
                     workQueue.schedule(new TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(), ntcf
                             .getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
                     return;
                 }
                 case NOTIFY_TASK_FAILURE: {
-                    ClusterControllerFunctions.NotifyTaskFailureFunction ntff = (ClusterControllerFunctions.NotifyTaskFailureFunction) fn;
+                    CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
                     workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(), ntff
                             .getTaskId(), ntff.getDetails(), ntff.getDetails()));
                     return;
                 }
 
                 case REGISTER_PARTITION_PROVIDER: {
-                    ClusterControllerFunctions.RegisterPartitionProviderFunction rppf = (ClusterControllerFunctions.RegisterPartitionProviderFunction) fn;
+                    CCNCFunctions.RegisterPartitionProviderFunction rppf = (CCNCFunctions.RegisterPartitionProviderFunction) fn;
                     workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this, rppf
                             .getPartitionDescriptor()));
                     return;
                 }
 
                 case REGISTER_PARTITION_REQUEST: {
-                    ClusterControllerFunctions.RegisterPartitionRequestFunction rprf = (ClusterControllerFunctions.RegisterPartitionRequestFunction) fn;
+                    CCNCFunctions.RegisterPartitionRequestFunction rprf = (CCNCFunctions.RegisterPartitionRequestFunction) fn;
                     workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this, rprf
                             .getPartitionRequest()));
                     return;
                 }
 
                 case APPLICATION_STATE_CHANGE_RESPONSE: {
-                    ClusterControllerFunctions.ApplicationStateChangeResponseFunction astrf = (ClusterControllerFunctions.ApplicationStateChangeResponseFunction) fn;
+                    CCNCFunctions.ApplicationStateChangeResponseFunction astrf = (CCNCFunctions.ApplicationStateChangeResponseFunction) fn;
                     workQueue.schedule(new ApplicationStateChangeWork(ClusterControllerService.this, astrf));
                     return;
                 }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 929462b..e6e6c2f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -49,6 +49,8 @@
 import edu.uci.ics.hyracks.control.cc.job.TaskClusterId;
 
 public class ActivityClusterPlanner {
+    private static final boolean USE_CONNECTOR_POLICY_IN_TASK_CLUSTER_CONSTRUCTION = true;
+
     private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
 
     private final JobScheduler scheduler;
@@ -64,13 +66,28 @@
         JobRun jobRun = scheduler.getJobRun();
         Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
 
+        Map<ActivityId, ActivityPlan> activityPlanMap = buildActivityPlanMap(ac, jobRun, pcMap);
+
+        assignConnectorPolicy(ac, activityPlanMap);
+
+        TaskCluster[] taskClusters = computeTaskClusters(ac, jobRun, activityPlanMap);
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Plan for " + ac);
+            LOGGER.info("Built " + taskClusters.length + " Task Clusters");
+            for (TaskCluster tc : taskClusters) {
+                LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
+            }
+        }
+
+        ac.setPlan(new ActivityClusterPlan(taskClusters, activityPlanMap));
+    }
+
+    private Map<ActivityId, ActivityPlan> buildActivityPlanMap(ActivityCluster ac, JobRun jobRun,
+            Map<ActivityId, ActivityPartitionDetails> pcMap) {
         Map<ActivityId, ActivityPlan> activityPlanMap = new HashMap<ActivityId, ActivityPlan>();
-        Set<ActivityId> activities = ac.getActivities();
-
-        Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
-
         Set<ActivityId> depAnIds = new HashSet<ActivityId>();
-        for (ActivityId anId : activities) {
+        for (ActivityId anId : ac.getActivities()) {
             depAnIds.clear();
             getDependencyActivityIds(depAnIds, anId);
             ActivityPartitionDetails apd = pcMap.get(anId);
@@ -94,53 +111,21 @@
                     tasks[i].getDependencies().add(dTaskId);
                     dTask.getDependents().add(tid);
                 }
-                Set<TaskId> cluster = new HashSet<TaskId>();
-                cluster.add(tid);
-                taskClusterMap.put(tid, cluster);
             }
             activityPlan.setTasks(tasks);
             activityPlanMap.put(anId, activityPlan);
         }
+        return activityPlanMap;
+    }
 
-        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, activityPlanMap);
-        scheduler.getJobRun().getConnectorPolicyMap().putAll(connectorPolicies);
+    private TaskCluster[] computeTaskClusters(ActivityCluster ac, JobRun jobRun,
+            Map<ActivityId, ActivityPlan> activityPlanMap) {
+        Set<ActivityId> activities = ac.getActivities();
+        Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = computeTaskConnectivity(jobRun,
+                activityPlanMap, activities);
 
-        Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
-        JobActivityGraph jag = jobRun.getJobActivityGraph();
-        BitSet targetBitmap = new BitSet();
-        for (ActivityId ac1 : activities) {
-            Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
-            int nProducers = ac1TaskStates.length;
-            List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
-            if (outputConns != null) {
-                for (IConnectorDescriptor c : outputConns) {
-                    ConnectorDescriptorId cdId = c.getConnectorId();
-                    ActivityId ac2 = jag.getConsumerActivity(cdId);
-                    Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
-                    int nConsumers = ac2TaskStates.length;
-                    for (int i = 0; i < nProducers; ++i) {
-                        c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
-                        List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
-                                .getTaskId());
-                        if (cInfoList == null) {
-                            cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
-                            taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
-                        }
-                        Set<TaskId> cluster = taskClusterMap.get(ac1TaskStates[i].getTaskId());
-                        for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
-                            TaskId targetTID = ac2TaskStates[j].getTaskId();
-                            cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
-                            IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
-                            if (cPolicy.requiresProducerConsumerCoscheduling()) {
-                                cluster.add(targetTID);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-        TaskCluster[] taskClusters = buildTaskClusters(ac, activityPlanMap, taskClusterMap);
+        TaskCluster[] taskClusters = USE_CONNECTOR_POLICY_IN_TASK_CLUSTER_CONSTRUCTION ? buildConnectorPolicyAwareTaskClusters(
+                ac, activityPlanMap, taskConnectivity) : buildConnectorPolicyUnawareTaskClusters(ac, activityPlanMap);
 
         for (TaskCluster tc : taskClusters) {
             Set<TaskCluster> tcDependencyTaskClusters = tc.getDependencyTaskClusters();
@@ -170,20 +155,88 @@
                 }
             }
         }
+        return taskClusters;
+    }
 
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Plan for " + ac);
-            LOGGER.info("Built " + taskClusters.length + " Task Clusters");
-            for (TaskCluster tc : taskClusters) {
-                LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
+    private TaskCluster[] buildConnectorPolicyUnawareTaskClusters(ActivityCluster ac,
+            Map<ActivityId, ActivityPlan> activityPlanMap) {
+        List<Task> taskStates = new ArrayList<Task>();
+        for (ActivityId anId : ac.getActivities()) {
+            ActivityPlan ap = activityPlanMap.get(anId);
+            Task[] tasks = ap.getTasks();
+            for (Task t : tasks) {
+                taskStates.add(t);
+            }
+        }
+        TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getActivityClusterId(), 0), ac,
+                taskStates.toArray(new Task[taskStates.size()]));
+        for (Task t : tc.getTasks()) {
+            t.setTaskCluster(tc);
+        }
+        return new TaskCluster[] { tc };
+    }
+
+    private Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> computeTaskConnectivity(JobRun jobRun,
+            Map<ActivityId, ActivityPlan> activityPlanMap, Set<ActivityId> activities) {
+        Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
+        JobActivityGraph jag = jobRun.getJobActivityGraph();
+        BitSet targetBitmap = new BitSet();
+        for (ActivityId ac1 : activities) {
+            Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
+            int nProducers = ac1TaskStates.length;
+            List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
+            if (outputConns != null) {
+                for (IConnectorDescriptor c : outputConns) {
+                    ConnectorDescriptorId cdId = c.getConnectorId();
+                    ActivityId ac2 = jag.getConsumerActivity(cdId);
+                    Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
+                    int nConsumers = ac2TaskStates.length;
+                    for (int i = 0; i < nProducers; ++i) {
+                        c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+                        List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
+                                .getTaskId());
+                        if (cInfoList == null) {
+                            cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+                            taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+                        }
+                        for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
+                            TaskId targetTID = ac2TaskStates[j].getTaskId();
+                            cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
+                        }
+                    }
+                }
+            }
+        }
+        return taskConnectivity;
+    }
+
+    private TaskCluster[] buildConnectorPolicyAwareTaskClusters(ActivityCluster ac,
+            Map<ActivityId, ActivityPlan> activityPlanMap,
+            Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity) {
+        Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
+        for (ActivityId anId : ac.getActivities()) {
+            ActivityPlan ap = activityPlanMap.get(anId);
+            Task[] tasks = ap.getTasks();
+            for (Task t : tasks) {
+                Set<TaskId> cluster = new HashSet<TaskId>();
+                TaskId tid = t.getTaskId();
+                cluster.add(tid);
+                taskClusterMap.put(tid, cluster);
             }
         }
 
-        ac.setPlan(new ActivityClusterPlan(taskClusters, activityPlanMap));
-    }
+        JobRun jobRun = ac.getJobRun();
+        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
+        for (Map.Entry<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> e : taskConnectivity.entrySet()) {
+            Set<TaskId> cluster = taskClusterMap.get(e.getKey());
+            for (Pair<TaskId, ConnectorDescriptorId> p : e.getValue()) {
+                IConnectorPolicy cPolicy = connectorPolicies.get(p.getRight());
+                if (cPolicy.requiresProducerConsumerCoscheduling()) {
+                    cluster.add(p.getLeft());
+                }
+            }
+        }
 
-    private TaskCluster[] buildTaskClusters(ActivityCluster ac, Map<ActivityId, ActivityPlan> activityPlanMap,
-            Map<TaskId, Set<TaskId>> taskClusterMap) {
         /*
          * taskClusterMap contains for every TID x, x -> { coscheduled consumer TIDs U x }
          * We compute the transitive closure of this relation to find the largest set of
@@ -273,8 +326,7 @@
         }
     }
 
-    private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
-            Map<ActivityId, ActivityPlan> taskMap) {
+    private void assignConnectorPolicy(ActivityCluster ac, Map<ActivityId, ActivityPlan> taskMap) {
         JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
         Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
         Set<ActivityId> activities = ac.getActivities();
@@ -300,7 +352,7 @@
                 }
             }
         }
-        return cPolicyMap;
+        ac.getJobRun().getConnectorPolicyMap().putAll(cPolicyMap);
     }
 
     private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
index 2d8ae85..f6271fe 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
@@ -6,7 +6,7 @@
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 
@@ -14,10 +14,10 @@
     private static final Logger LOGGER = Logger.getLogger(ApplicationStateChangeWork.class.getName());
 
     private final ClusterControllerService ccs;
-    private final ClusterControllerFunctions.ApplicationStateChangeResponseFunction ascrf;
+    private final CCNCFunctions.ApplicationStateChangeResponseFunction ascrf;
 
     public ApplicationStateChangeWork(ClusterControllerService ccs,
-            ClusterControllerFunctions.ApplicationStateChangeResponseFunction ascrf) {
+            CCNCFunctions.ApplicationStateChangeResponseFunction ascrf) {
         this.ccs = ccs;
         this.ascrf = ascrf;
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index 49a3be9..9e8d130 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -39,8 +39,7 @@
         Map<String, NodeControllerInfo> result = new LinkedHashMap<String, NodeControllerInfo>();
         Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
         for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
-            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), e.getValue().getDataPort().getIpAddress(),
-                    NodeStatus.ALIVE));
+            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE));
         }
         callback.setValue(result);
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
index cba4b4b..e4e0aae 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
@@ -25,7 +25,7 @@
 import edu.uci.ics.hyracks.control.common.base.INodeController;
 import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
-import edu.uci.ics.hyracks.control.common.ipc.NodeControllerFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
 import edu.uci.ics.hyracks.control.common.ipc.NodeControllerRemoteProxy;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
@@ -46,7 +46,7 @@
         String id = reg.getNodeId();
 
         IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
-        NodeControllerFunctions.NodeRegistrationResult result = null;
+        CCNCFunctions.NodeRegistrationResult result = null;
         try {
             INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
 
@@ -69,9 +69,9 @@
             params.setClusterControllerInfo(ccs.getClusterControllerInfo());
             params.setHeartbeatPeriod(ccs.getCCConfig().heartbeatPeriod);
             params.setProfileDumpPeriod(ccs.getCCConfig().profileDumpPeriod);
-            result = new NodeControllerFunctions.NodeRegistrationResult(params, null);
+            result = new CCNCFunctions.NodeRegistrationResult(params, null);
         } catch (Exception e) {
-            result = new NodeControllerFunctions.NodeRegistrationResult(null, e);
+            result = new CCNCFunctions.NodeRegistrationResult(null, e);
         }
         ncIPCHandle.send(-1, result, null);
     }
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
new file mode 100644
index 0000000..9ea4636
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -0,0 +1,792 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+
+public class CCNCFunctions {
+    private static final int FID_CODE_SIZE = 1;
+
+    public enum FunctionId {
+        REGISTER_NODE,
+        UNREGISTER_NODE,
+        NOTIFY_JOBLET_CLEANUP,
+        NOTIFY_TASK_COMPLETE,
+        NOTIFY_TASK_FAILURE,
+        NODE_HEARTBEAT,
+        REPORT_PROFILE,
+        REGISTER_PARTITION_PROVIDER,
+        REGISTER_PARTITION_REQUEST,
+        APPLICATION_STATE_CHANGE_RESPONSE,
+
+        NODE_REGISTRATION_RESULT,
+        START_TASKS,
+        ABORT_TASKS,
+        CLEANUP_JOBLET,
+        CREATE_APPLICATION,
+        DESTROY_APPLICATION,
+        REPORT_PARTITION_AVAILABILITY,
+
+        OTHER
+    }
+
+    public static abstract class Function implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        public abstract FunctionId getFunctionId();
+    }
+
+    public static class RegisterNodeFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final NodeRegistration reg;
+
+        public RegisterNodeFunction(NodeRegistration reg) {
+            this.reg = reg;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REGISTER_NODE;
+        }
+
+        public NodeRegistration getNodeRegistration() {
+            return reg;
+        }
+    }
+
+    public static class UnregisterNodeFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+
+        public UnregisterNodeFunction(String nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.UNREGISTER_NODE;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    public static class NotifyTaskCompleteFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final TaskAttemptId taskId;
+        private final String nodeId;
+        private final TaskProfile statistics;
+
+        public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
+            this.jobId = jobId;
+            this.taskId = taskId;
+            this.nodeId = nodeId;
+            this.statistics = statistics;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NOTIFY_TASK_COMPLETE;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public TaskAttemptId getTaskId() {
+            return taskId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public TaskProfile getStatistics() {
+            return statistics;
+        }
+    }
+
+    public static class NotifyTaskFailureFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final TaskAttemptId taskId;
+        private final String nodeId;
+        private final String details;
+
+        public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
+            this.jobId = jobId;
+            this.taskId = taskId;
+            this.nodeId = nodeId;
+            this.details = details;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NOTIFY_TASK_FAILURE;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public TaskAttemptId getTaskId() {
+            return taskId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public String getDetails() {
+            return details;
+        }
+    }
+
+    public static class NotifyJobletCleanupFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final String nodeId;
+
+        public NotifyJobletCleanupFunction(JobId jobId, String nodeId) {
+            this.jobId = jobId;
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NOTIFY_JOBLET_CLEANUP;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    public static class NodeHeartbeatFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+        private final HeartbeatData hbData;
+
+        public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
+            this.nodeId = nodeId;
+            this.hbData = hbData;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NODE_HEARTBEAT;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public HeartbeatData getHeartbeatData() {
+            return hbData;
+        }
+    }
+
+    public static class ReportProfileFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+        private final List<JobProfile> profiles;
+
+        public ReportProfileFunction(String nodeId, List<JobProfile> profiles) {
+            this.nodeId = nodeId;
+            this.profiles = profiles;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REPORT_PROFILE;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public List<JobProfile> getProfiles() {
+            return profiles;
+        }
+    }
+
+    public static class RegisterPartitionProviderFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final PartitionDescriptor partitionDescriptor;
+
+        public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) {
+            this.partitionDescriptor = partitionDescriptor;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REGISTER_PARTITION_PROVIDER;
+        }
+
+        public PartitionDescriptor getPartitionDescriptor() {
+            return partitionDescriptor;
+        }
+
+        public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+            DataInputStream dis = new DataInputStream(bais);
+
+            // Read PartitionId
+            PartitionId pid = readPartitionId(dis);
+
+            // Read nodeId
+            String nodeId = dis.readUTF();
+
+            // Read TaskAttemptId
+            TaskAttemptId taId = readTaskAttemptId(dis);
+
+            // Read reusable flag
+            boolean reusable = dis.readBoolean();
+
+            // Read Partition State
+            PartitionState state = readPartitionState(dis);
+
+            PartitionDescriptor pd = new PartitionDescriptor(pid, nodeId, taId, reusable);
+            pd.setState(state);
+            return new RegisterPartitionProviderFunction(pd);
+        }
+
+        public static void serialize(OutputStream out, Object object) throws Exception {
+            RegisterPartitionProviderFunction fn = (RegisterPartitionProviderFunction) object;
+
+            DataOutputStream dos = new DataOutputStream(out);
+
+            PartitionDescriptor pd = fn.getPartitionDescriptor();
+
+            // Write PartitionId
+            writePartitionId(dos, pd.getPartitionId());
+
+            // Write nodeId
+            dos.writeUTF(pd.getNodeId());
+
+            // Write TaskAttemptId
+            writeTaskAttemptId(dos, pd.getProducingTaskAttemptId());
+
+            // Write reusable flag
+            dos.writeBoolean(pd.isReusable());
+
+            // Write Partition State
+            writePartitionState(dos, pd.getState());
+        }
+    }
+
+    public static class RegisterPartitionRequestFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final PartitionRequest partitionRequest;
+
+        public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) {
+            this.partitionRequest = partitionRequest;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REGISTER_PARTITION_REQUEST;
+        }
+
+        public PartitionRequest getPartitionRequest() {
+            return partitionRequest;
+        }
+
+        public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+            DataInputStream dis = new DataInputStream(bais);
+
+            // Read PartitionId
+            PartitionId pid = readPartitionId(dis);
+
+            // Read nodeId
+            String nodeId = dis.readUTF();
+
+            // Read TaskAttemptId
+            TaskAttemptId taId = readTaskAttemptId(dis);
+
+            // Read Partition State
+            PartitionState state = readPartitionState(dis);
+
+            PartitionRequest pr = new PartitionRequest(pid, nodeId, taId, state);
+            return new RegisterPartitionRequestFunction(pr);
+        }
+
+        public static void serialize(OutputStream out, Object object) throws Exception {
+            RegisterPartitionRequestFunction fn = (RegisterPartitionRequestFunction) object;
+
+            DataOutputStream dos = new DataOutputStream(out);
+
+            PartitionRequest pr = fn.getPartitionRequest();
+
+            // Write PartitionId
+            writePartitionId(dos, pr.getPartitionId());
+
+            // Write nodeId
+            dos.writeUTF(pr.getNodeId());
+
+            // Write TaskAttemptId
+            writeTaskAttemptId(dos, pr.getRequestingTaskAttemptId());
+
+            // Write Partition State
+            writePartitionState(dos, pr.getMinimumState());
+        }
+    }
+
+    public static class ApplicationStateChangeResponseFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+        private final String appName;
+        private final ApplicationStatus status;
+
+        public ApplicationStateChangeResponseFunction(String nodeId, String appName, ApplicationStatus status) {
+            this.nodeId = nodeId;
+            this.appName = appName;
+            this.status = status;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.APPLICATION_STATE_CHANGE_RESPONSE;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public String getApplicationName() {
+            return appName;
+        }
+
+        public ApplicationStatus getStatus() {
+            return status;
+        }
+    }
+
+    public static class NodeRegistrationResult extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final NodeParameters params;
+
+        private final Exception exception;
+
+        public NodeRegistrationResult(NodeParameters params, Exception exception) {
+            this.params = params;
+            this.exception = exception;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NODE_REGISTRATION_RESULT;
+        }
+
+        public NodeParameters getNodeParameters() {
+            return params;
+        }
+
+        public Exception getException() {
+            return exception;
+        }
+    }
+
+    public static class StartTasksFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+        private final JobId jobId;
+        private final byte[] planBytes;
+        private final List<TaskAttemptDescriptor> taskDescriptors;
+        private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
+
+        public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
+                List<TaskAttemptDescriptor> taskDescriptors,
+                Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
+            this.appName = appName;
+            this.jobId = jobId;
+            this.planBytes = planBytes;
+            this.taskDescriptors = taskDescriptors;
+            this.connectorPolicies = connectorPolicies;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.START_TASKS;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public byte[] getPlanBytes() {
+            return planBytes;
+        }
+
+        public List<TaskAttemptDescriptor> getTaskDescriptors() {
+            return taskDescriptors;
+        }
+
+        public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
+            return connectorPolicies;
+        }
+    }
+
+    public static class AbortTasksFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final List<TaskAttemptId> tasks;
+
+        public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) {
+            this.jobId = jobId;
+            this.tasks = tasks;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.ABORT_TASKS;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public List<TaskAttemptId> getTasks() {
+            return tasks;
+        }
+    }
+
+    public static class CleanupJobletFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+        private final JobStatus status;
+
+        public CleanupJobletFunction(JobId jobId, JobStatus status) {
+            this.jobId = jobId;
+            this.status = status;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CLEANUP_JOBLET;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+
+        public JobStatus getStatus() {
+            return status;
+        }
+    }
+
+    public static class CreateApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+        private final boolean deployHar;
+        private final byte[] serializedDistributedState;
+
+        public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) {
+            this.appName = appName;
+            this.deployHar = deployHar;
+            this.serializedDistributedState = serializedDistributedState;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CREATE_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+        public boolean isDeployHar() {
+            return deployHar;
+        }
+
+        public byte[] getSerializedDistributedState() {
+            return serializedDistributedState;
+        }
+    }
+
+    public static class DestroyApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public DestroyApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DESTROY_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class ReportPartitionAvailabilityFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final PartitionId pid;
+        private final NetworkAddress networkAddress;
+
+        public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) {
+            this.pid = pid;
+            this.networkAddress = networkAddress;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.REPORT_PARTITION_AVAILABILITY;
+        }
+
+        public PartitionId getPartitionId() {
+            return pid;
+        }
+
+        public NetworkAddress getNetworkAddress() {
+            return networkAddress;
+        }
+
+        public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+            DataInputStream dis = new DataInputStream(bais);
+
+            // Read PartitionId
+            PartitionId pid = readPartitionId(dis);
+
+            // Read NetworkAddress
+            NetworkAddress networkAddress = readNetworkAddress(dis);
+
+            return new ReportPartitionAvailabilityFunction(pid, networkAddress);
+        }
+
+        public static void serialize(OutputStream out, Object object) throws Exception {
+            ReportPartitionAvailabilityFunction fn = (ReportPartitionAvailabilityFunction) object;
+
+            DataOutputStream dos = new DataOutputStream(out);
+
+            // Write PartitionId
+            writePartitionId(dos, fn.getPartitionId());
+
+            // Write NetworkAddress
+            writeNetworkAddress(dos, fn.getNetworkAddress());
+        }
+    }
+
+    public static class SerializerDeserializer implements IPayloadSerializerDeserializer {
+        private final JavaSerializationBasedPayloadSerializerDeserializer javaSerde;
+
+        public SerializerDeserializer() {
+            javaSerde = new JavaSerializationBasedPayloadSerializerDeserializer();
+        }
+
+        @Override
+        public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
+            if (length < FID_CODE_SIZE) {
+                throw new IllegalStateException("Message size too small: " + length);
+            }
+            byte fid = buffer.get();
+            return deserialize(fid, buffer, length - FID_CODE_SIZE);
+        }
+
+        @Override
+        public Exception deserializeException(ByteBuffer buffer, int length) throws Exception {
+            if (length < FID_CODE_SIZE) {
+                throw new IllegalStateException("Message size too small: " + length);
+            }
+            byte fid = buffer.get();
+            if (fid != FunctionId.OTHER.ordinal()) {
+                throw new IllegalStateException("Expected FID for OTHER, found: " + fid);
+            }
+            return (Exception) deserialize(fid, buffer, length - FID_CODE_SIZE);
+        }
+
+        @Override
+        public byte[] serializeObject(Object object) throws Exception {
+            if (object instanceof Function) {
+                Function fn = (Function) object;
+                return serialize(object, (byte) fn.getFunctionId().ordinal());
+            } else {
+                return serialize(object, (byte) FunctionId.OTHER.ordinal());
+            }
+        }
+
+        @Override
+        public byte[] serializeException(Exception object) throws Exception {
+            return serialize(object, (byte) FunctionId.OTHER.ordinal());
+        }
+
+        private byte[] serialize(Object object, byte fid) throws Exception {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            baos.write(fid);
+            serialize(baos, object, fid);
+            JavaSerializationBasedPayloadSerializerDeserializer.serialize(baos, object);
+            baos.close();
+            return baos.toByteArray();
+        }
+
+        private void serialize(OutputStream out, Object object, byte fid) throws Exception {
+            switch (FunctionId.values()[fid]) {
+                case REGISTER_PARTITION_PROVIDER:
+                    RegisterPartitionProviderFunction.serialize(out, object);
+                    return;
+
+                case REGISTER_PARTITION_REQUEST:
+                    RegisterPartitionRequestFunction.serialize(out, object);
+                    return;
+
+                case REPORT_PARTITION_AVAILABILITY:
+                    ReportPartitionAvailabilityFunction.serialize(out, object);
+                    return;
+            }
+            JavaSerializationBasedPayloadSerializerDeserializer.serialize(out, object);
+        }
+
+        private Object deserialize(byte fid, ByteBuffer buffer, int length) throws Exception {
+            switch (FunctionId.values()[fid]) {
+                case REGISTER_PARTITION_PROVIDER:
+                    return RegisterPartitionProviderFunction.deserialize(buffer, length);
+
+                case REGISTER_PARTITION_REQUEST:
+                    return RegisterPartitionRequestFunction.deserialize(buffer, length);
+
+                case REPORT_PARTITION_AVAILABILITY:
+                    return ReportPartitionAvailabilityFunction.deserialize(buffer, length);
+            }
+
+            return javaSerde.deserializeObject(buffer, length);
+        }
+    }
+
+    private static PartitionId readPartitionId(DataInputStream dis) throws IOException {
+        long jobId = dis.readLong();
+        int cdid = dis.readInt();
+        int senderIndex = dis.readInt();
+        int receiverIndex = dis.readInt();
+        PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, receiverIndex);
+        return pid;
+    }
+
+    private static void writePartitionId(DataOutputStream dos, PartitionId pid) throws IOException {
+        dos.writeLong(pid.getJobId().getId());
+        dos.writeInt(pid.getConnectorDescriptorId().getId());
+        dos.writeInt(pid.getSenderIndex());
+        dos.writeInt(pid.getReceiverIndex());
+    }
+
+    private static TaskAttemptId readTaskAttemptId(DataInputStream dis) throws IOException {
+        int odid = dis.readInt();
+        int aid = dis.readInt();
+        int partition = dis.readInt();
+        int attempt = dis.readInt();
+        TaskAttemptId taId = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid),
+                partition), attempt);
+        return taId;
+    }
+
+    private static void writeTaskAttemptId(DataOutputStream dos, TaskAttemptId taId) throws IOException {
+        TaskId tid = taId.getTaskId();
+        ActivityId aid = tid.getActivityId();
+        OperatorDescriptorId odId = aid.getOperatorDescriptorId();
+        dos.writeInt(odId.getId());
+        dos.writeInt(aid.getLocalId());
+        dos.writeInt(tid.getPartition());
+        dos.writeInt(taId.getAttempt());
+    }
+
+    private static PartitionState readPartitionState(DataInputStream dis) throws IOException {
+        PartitionState state = PartitionState.values()[dis.readInt()];
+        return state;
+    }
+
+    private static void writePartitionState(DataOutputStream dos, PartitionState state) throws IOException {
+        dos.writeInt(state.ordinal());
+    }
+
+    private static NetworkAddress readNetworkAddress(DataInputStream dis) throws IOException {
+        int bLen = dis.readInt();
+        byte[] ipAddress = new byte[bLen];
+        dis.read(ipAddress);
+        int port = dis.readInt();
+        NetworkAddress networkAddress = new NetworkAddress(ipAddress, port);
+        return networkAddress;
+    }
+
+    private static void writeNetworkAddress(DataOutputStream dos, NetworkAddress networkAddress) throws IOException {
+        byte[] ipAddress = networkAddress.getIpAddress();
+        dos.writeInt(ipAddress.length);
+        dos.write(ipAddress);
+        dos.writeInt(networkAddress.getPort());
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
deleted file mode 100644
index 2b6792e..0000000
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.common.ipc;
-
-import java.io.Serializable;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
-import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
-import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
-import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
-import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
-import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
-import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
-
-public class ClusterControllerFunctions {
-    public enum FunctionId {
-        REGISTER_NODE,
-        UNREGISTER_NODE,
-        NOTIFY_TASK_COMPLETE,
-        NOTIFY_TASK_FAILURE,
-        NOTIFY_JOBLET_CLEANUP,
-        NODE_HEARTBEAT,
-        REPORT_PROFILE,
-        REGISTER_PARTITION_PROVIDER,
-        REGISTER_PARTITION_REQUEST,
-        APPLICATION_STATE_CHANGE_RESPONSE,
-    }
-
-    public static abstract class Function implements Serializable {
-        private static final long serialVersionUID = 1L;
-
-        public abstract FunctionId getFunctionId();
-    }
-
-    public static class RegisterNodeFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final NodeRegistration reg;
-
-        public RegisterNodeFunction(NodeRegistration reg) {
-            this.reg = reg;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.REGISTER_NODE;
-        }
-
-        public NodeRegistration getNodeRegistration() {
-            return reg;
-        }
-    }
-
-    public static class UnregisterNodeFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final String nodeId;
-
-        public UnregisterNodeFunction(String nodeId) {
-            this.nodeId = nodeId;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.UNREGISTER_NODE;
-        }
-
-        public String getNodeId() {
-            return nodeId;
-        }
-    }
-
-    public static class NotifyTaskCompleteFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final JobId jobId;
-        private final TaskAttemptId taskId;
-        private final String nodeId;
-        private final TaskProfile statistics;
-
-        public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
-            this.jobId = jobId;
-            this.taskId = taskId;
-            this.nodeId = nodeId;
-            this.statistics = statistics;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.NOTIFY_TASK_COMPLETE;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public TaskAttemptId getTaskId() {
-            return taskId;
-        }
-
-        public String getNodeId() {
-            return nodeId;
-        }
-
-        public TaskProfile getStatistics() {
-            return statistics;
-        }
-    }
-
-    public static class NotifyTaskFailureFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final JobId jobId;
-        private final TaskAttemptId taskId;
-        private final String nodeId;
-        private final String details;
-
-        public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
-            this.jobId = jobId;
-            this.taskId = taskId;
-            this.nodeId = nodeId;
-            this.details = details;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.NOTIFY_TASK_FAILURE;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public TaskAttemptId getTaskId() {
-            return taskId;
-        }
-
-        public String getNodeId() {
-            return nodeId;
-        }
-
-        public String getDetails() {
-            return details;
-        }
-    }
-
-    public static class NotifyJobletCleanupFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final JobId jobId;
-        private final String nodeId;
-
-        public NotifyJobletCleanupFunction(JobId jobId, String nodeId) {
-            this.jobId = jobId;
-            this.nodeId = nodeId;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.NOTIFY_JOBLET_CLEANUP;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public String getNodeId() {
-            return nodeId;
-        }
-    }
-
-    public static class NodeHeartbeatFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final String nodeId;
-        private final HeartbeatData hbData;
-
-        public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
-            this.nodeId = nodeId;
-            this.hbData = hbData;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.NODE_HEARTBEAT;
-        }
-
-        public String getNodeId() {
-            return nodeId;
-        }
-
-        public HeartbeatData getHeartbeatData() {
-            return hbData;
-        }
-    }
-
-    public static class ReportProfileFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final String nodeId;
-        private final List<JobProfile> profiles;
-
-        public ReportProfileFunction(String nodeId, List<JobProfile> profiles) {
-            this.nodeId = nodeId;
-            this.profiles = profiles;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.REPORT_PROFILE;
-        }
-
-        public String getNodeId() {
-            return nodeId;
-        }
-
-        public List<JobProfile> getProfiles() {
-            return profiles;
-        }
-    }
-
-    public static class RegisterPartitionProviderFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final PartitionDescriptor partitionDescriptor;
-
-        public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) {
-            this.partitionDescriptor = partitionDescriptor;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.REGISTER_PARTITION_PROVIDER;
-        }
-
-        public PartitionDescriptor getPartitionDescriptor() {
-            return partitionDescriptor;
-        }
-    }
-
-    public static class RegisterPartitionRequestFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final PartitionRequest partitionRequest;
-
-        public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) {
-            this.partitionRequest = partitionRequest;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.REGISTER_PARTITION_REQUEST;
-        }
-
-        public PartitionRequest getPartitionRequest() {
-            return partitionRequest;
-        }
-    }
-
-    public static class ApplicationStateChangeResponseFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final String nodeId;
-        private final String appName;
-        private final ApplicationStatus status;
-
-        public ApplicationStateChangeResponseFunction(String nodeId, String appName, ApplicationStatus status) {
-            this.nodeId = nodeId;
-            this.appName = appName;
-            this.status = status;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.APPLICATION_STATE_CHANGE_RESPONSE;
-        }
-
-        public String getNodeId() {
-            return nodeId;
-        }
-
-        public String getApplicationName() {
-            return appName;
-        }
-
-        public ApplicationStatus getStatus() {
-            return status;
-        }
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index b7dd0af..a0dabdd 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -37,13 +37,13 @@
 
     @Override
     public void registerNode(NodeRegistration reg) throws Exception {
-        ClusterControllerFunctions.RegisterNodeFunction fn = new ClusterControllerFunctions.RegisterNodeFunction(reg);
+        CCNCFunctions.RegisterNodeFunction fn = new CCNCFunctions.RegisterNodeFunction(reg);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void unregisterNode(String nodeId) throws Exception {
-        ClusterControllerFunctions.UnregisterNodeFunction fn = new ClusterControllerFunctions.UnregisterNodeFunction(
+        CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(
                 nodeId);
         ipcHandle.send(-1, fn, null);
     }
@@ -51,56 +51,56 @@
     @Override
     public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
             throws Exception {
-        ClusterControllerFunctions.NotifyTaskCompleteFunction fn = new ClusterControllerFunctions.NotifyTaskCompleteFunction(
+        CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(
                 jobId, taskId, nodeId, statistics);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
-        ClusterControllerFunctions.NotifyTaskFailureFunction fn = new ClusterControllerFunctions.NotifyTaskFailureFunction(
+        CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(
                 jobId, taskId, nodeId, details);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
-        ClusterControllerFunctions.NotifyJobletCleanupFunction fn = new ClusterControllerFunctions.NotifyJobletCleanupFunction(
+        CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(
                 jobId, nodeId);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
-        ClusterControllerFunctions.NodeHeartbeatFunction fn = new ClusterControllerFunctions.NodeHeartbeatFunction(id,
+        CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id,
                 hbData);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
-        ClusterControllerFunctions.ReportProfileFunction fn = new ClusterControllerFunctions.ReportProfileFunction(id,
+        CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id,
                 profiles);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
-        ClusterControllerFunctions.RegisterPartitionProviderFunction fn = new ClusterControllerFunctions.RegisterPartitionProviderFunction(
+        CCNCFunctions.RegisterPartitionProviderFunction fn = new CCNCFunctions.RegisterPartitionProviderFunction(
                 partitionDescriptor);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
-        ClusterControllerFunctions.RegisterPartitionRequestFunction fn = new ClusterControllerFunctions.RegisterPartitionRequestFunction(
+        CCNCFunctions.RegisterPartitionRequestFunction fn = new CCNCFunctions.RegisterPartitionRequestFunction(
                 partitionRequest);
         ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception {
-        ClusterControllerFunctions.ApplicationStateChangeResponseFunction fn = new ClusterControllerFunctions.ApplicationStateChangeResponseFunction(
+        CCNCFunctions.ApplicationStateChangeResponseFunction fn = new CCNCFunctions.ApplicationStateChangeResponseFunction(
                 nodeId, appName, status);
         ipcHandle.send(-1, fn, null);
     }
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
deleted file mode 100644
index b72d4e5..0000000
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.common.ipc;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
-import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
-
-public class NodeControllerFunctions {
-    public enum FunctionId {
-        NODE_REGISTRATION_RESULT,
-        START_TASKS,
-        ABORT_TASKS,
-        CLEANUP_JOBLET,
-        CREATE_APPLICATION,
-        DESTROY_APPLICATION,
-        REPORT_PARTITION_AVAILABILITY
-    }
-
-    public static abstract class Function implements Serializable {
-        private static final long serialVersionUID = 1L;
-
-        public abstract FunctionId getFunctionId();
-    }
-
-    public static class NodeRegistrationResult extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final NodeParameters params;
-
-        private final Exception exception;
-
-        public NodeRegistrationResult(NodeParameters params, Exception exception) {
-            this.params = params;
-            this.exception = exception;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.NODE_REGISTRATION_RESULT;
-        }
-
-        public NodeParameters getNodeParameters() {
-            return params;
-        }
-
-        public Exception getException() {
-            return exception;
-        }
-    }
-
-    public static class StartTasksFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final String appName;
-        private final JobId jobId;
-        private final byte[] planBytes;
-        private final List<TaskAttemptDescriptor> taskDescriptors;
-        private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
-
-        public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
-                List<TaskAttemptDescriptor> taskDescriptors,
-                Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
-            this.appName = appName;
-            this.jobId = jobId;
-            this.planBytes = planBytes;
-            this.taskDescriptors = taskDescriptors;
-            this.connectorPolicies = connectorPolicies;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.START_TASKS;
-        }
-
-        public String getAppName() {
-            return appName;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public byte[] getPlanBytes() {
-            return planBytes;
-        }
-
-        public List<TaskAttemptDescriptor> getTaskDescriptors() {
-            return taskDescriptors;
-        }
-
-        public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
-            return connectorPolicies;
-        }
-    }
-
-    public static class AbortTasksFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final JobId jobId;
-        private final List<TaskAttemptId> tasks;
-
-        public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) {
-            this.jobId = jobId;
-            this.tasks = tasks;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.ABORT_TASKS;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public List<TaskAttemptId> getTasks() {
-            return tasks;
-        }
-    }
-
-    public static class CleanupJobletFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final JobId jobId;
-        private final JobStatus status;
-
-        public CleanupJobletFunction(JobId jobId, JobStatus status) {
-            this.jobId = jobId;
-            this.status = status;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.CLEANUP_JOBLET;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public JobStatus getStatus() {
-            return status;
-        }
-    }
-
-    public static class CreateApplicationFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final String appName;
-        private final boolean deployHar;
-        private final byte[] serializedDistributedState;
-
-        public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) {
-            this.appName = appName;
-            this.deployHar = deployHar;
-            this.serializedDistributedState = serializedDistributedState;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.CREATE_APPLICATION;
-        }
-
-        public String getAppName() {
-            return appName;
-        }
-
-        public boolean isDeployHar() {
-            return deployHar;
-        }
-
-        public byte[] getSerializedDistributedState() {
-            return serializedDistributedState;
-        }
-    }
-
-    public static class DestroyApplicationFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final String appName;
-
-        public DestroyApplicationFunction(String appName) {
-            this.appName = appName;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.DESTROY_APPLICATION;
-        }
-
-        public String getAppName() {
-            return appName;
-        }
-    }
-
-    public static class ReportPartitionAvailabilityFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final PartitionId pid;
-        private final NetworkAddress networkAddress;
-
-        public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) {
-            this.pid = pid;
-            this.networkAddress = networkAddress;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.REPORT_PARTITION_AVAILABILITY;
-        }
-
-        public PartitionId getPartitionId() {
-            return pid;
-        }
-
-        public NetworkAddress getNetworkAddress() {
-            return networkAddress;
-        }
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 474fb93..961a9e3 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -38,20 +38,20 @@
     @Override
     public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
             Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
-        NodeControllerFunctions.StartTasksFunction stf = new NodeControllerFunctions.StartTasksFunction(appName, jobId,
+        CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(appName, jobId,
                 planBytes, taskDescriptors, connectorPolicies);
         ipcHandle.send(-1, stf, null);
     }
 
     @Override
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
-        NodeControllerFunctions.AbortTasksFunction atf = new NodeControllerFunctions.AbortTasksFunction(jobId, tasks);
+        CCNCFunctions.AbortTasksFunction atf = new CCNCFunctions.AbortTasksFunction(jobId, tasks);
         ipcHandle.send(-1, atf, null);
     }
 
     @Override
     public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
-        NodeControllerFunctions.CleanupJobletFunction cjf = new NodeControllerFunctions.CleanupJobletFunction(jobId,
+        CCNCFunctions.CleanupJobletFunction cjf = new CCNCFunctions.CleanupJobletFunction(jobId,
                 status);
         ipcHandle.send(-1, cjf, null);
     }
@@ -59,21 +59,21 @@
     @Override
     public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
             throws Exception {
-        NodeControllerFunctions.CreateApplicationFunction caf = new NodeControllerFunctions.CreateApplicationFunction(
+        CCNCFunctions.CreateApplicationFunction caf = new CCNCFunctions.CreateApplicationFunction(
                 appName, deployHar, serializedDistributedState);
         ipcHandle.send(-1, caf, null);
     }
 
     @Override
     public void destroyApplication(String appName) throws Exception {
-        NodeControllerFunctions.DestroyApplicationFunction daf = new NodeControllerFunctions.DestroyApplicationFunction(
+        CCNCFunctions.DestroyApplicationFunction daf = new CCNCFunctions.DestroyApplicationFunction(
                 appName);
         ipcHandle.send(-1, daf, null);
     }
 
     @Override
     public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
-        NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = new NodeControllerFunctions.ReportPartitionAvailabilityFunction(
+        CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = new CCNCFunctions.ReportPartitionAvailabilityFunction(
                 pid, networkAddress);
         ipcHandle.send(-1, rpaf, null);
     }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index ba1d970..0f512a7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -50,8 +50,8 @@
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
 import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
-import edu.uci.ics.hyracks.control.common.ipc.NodeControllerFunctions;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.work.FutureValue;
 import edu.uci.ics.hyracks.control.common.work.WorkQueue;
@@ -124,7 +124,8 @@
         id = ncConfig.nodeId;
         executor = Executors.newCachedThreadPool();
         NodeControllerIPCI ipci = new NodeControllerIPCI();
-        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci);
+        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
+                new CCNCFunctions.SerializerDeserializer());
         this.ctx = new RootHyracksContext(ncConfig.frameSize, new IOManager(getDevices(ncConfig.ioDevices), executor));
         if (id == null) {
             throw new Exception("id not set");
@@ -347,49 +348,49 @@
     private final class NodeControllerIPCI implements IIPCI {
         @Override
         public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
-            NodeControllerFunctions.Function fn = (NodeControllerFunctions.Function) payload;
+            CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
             switch (fn.getFunctionId()) {
                 case START_TASKS: {
-                    NodeControllerFunctions.StartTasksFunction stf = (NodeControllerFunctions.StartTasksFunction) fn;
+                    CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
                     queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getAppName(), stf.getJobId(), stf
                             .getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies()));
                     return;
                 }
 
                 case ABORT_TASKS: {
-                    NodeControllerFunctions.AbortTasksFunction atf = (NodeControllerFunctions.AbortTasksFunction) fn;
+                    CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
                     queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
                     return;
                 }
 
                 case CLEANUP_JOBLET: {
-                    NodeControllerFunctions.CleanupJobletFunction cjf = (NodeControllerFunctions.CleanupJobletFunction) fn;
+                    CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
                     queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
                     return;
                 }
 
                 case CREATE_APPLICATION: {
-                    NodeControllerFunctions.CreateApplicationFunction caf = (NodeControllerFunctions.CreateApplicationFunction) fn;
+                    CCNCFunctions.CreateApplicationFunction caf = (CCNCFunctions.CreateApplicationFunction) fn;
                     queue.schedule(new CreateApplicationWork(NodeControllerService.this, caf.getAppName(), caf
                             .isDeployHar(), caf.getSerializedDistributedState()));
                     return;
                 }
 
                 case DESTROY_APPLICATION: {
-                    NodeControllerFunctions.DestroyApplicationFunction daf = (NodeControllerFunctions.DestroyApplicationFunction) fn;
+                    CCNCFunctions.DestroyApplicationFunction daf = (CCNCFunctions.DestroyApplicationFunction) fn;
                     queue.schedule(new DestroyApplicationWork(NodeControllerService.this, daf.getAppName()));
                     return;
                 }
 
                 case REPORT_PARTITION_AVAILABILITY: {
-                    NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = (NodeControllerFunctions.ReportPartitionAvailabilityFunction) fn;
+                    CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
                     queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this, rpaf
                             .getPartitionId(), rpaf.getNetworkAddress()));
                     return;
                 }
 
                 case NODE_REGISTRATION_RESULT: {
-                    NodeControllerFunctions.NodeRegistrationResult nrrf = (NodeControllerFunctions.NodeRegistrationResult) fn;
+                    CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
                     setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
                     return;
                 }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index f94be22..1a0a820 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -60,7 +60,7 @@
     public void start() throws IOException {
         md.start();
         InetSocketAddress sockAddr = md.getLocalAddress();
-        networkAddress = new NetworkAddress(sockAddr.getAddress(), sockAddr.getPort());
+        networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
     }
 
     public NetworkAddress getNetworkAddress() {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index ce3f4e0..9734567 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.nc.work;
 
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.logging.Logger;
@@ -48,8 +49,8 @@
         Joblet ji = jobletMap.get(pid.getJobId());
         if (ji != null) {
             PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
-                    ncs.getNetworkManager(), new InetSocketAddress(networkAddress.getIpAddress(),
-                            networkAddress.getPort()), pid, 5));
+                    ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
+                            .getIpAddress()), networkAddress.getPort()), pid, 5));
             ji.reportPartitionAvailability(channel);
         }
     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
index 4ffc060..37154bc 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
@@ -28,21 +28,33 @@
     private static final long serialVersionUID = 1L;
     protected final ConnectorDescriptorId id;
 
+    protected String displayName;
+
     public AbstractConnectorDescriptor(JobSpecification spec) {
         this.id = spec.createConnectorDescriptor();
         spec.getConnectorMap().put(id, this);
+        displayName = getClass().getName() + "[" + id + "]";
     }
 
     public ConnectorDescriptorId getConnectorId() {
         return id;
     }
 
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    public void setDisplayName(String displayName) {
+        this.displayName = displayName;
+    }
+
     @Override
     public JSONObject toJSON() throws JSONException {
         JSONObject jconn = new JSONObject();
 
         jconn.put("id", getConnectorId().getId());
         jconn.put("java-class", getClass().getName());
+        jconn.put("display-name", displayName);
 
         return jconn;
     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index 1bb13ca..1a60290 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -38,12 +38,15 @@
 
     protected final int outputArity;
 
+    protected String displayName;
+
     public AbstractOperatorDescriptor(JobSpecification spec, int inputArity, int outputArity) {
         odId = spec.createOperatorDescriptorId();
         this.inputArity = inputArity;
         this.outputArity = outputArity;
         recordDescriptors = new RecordDescriptor[outputArity];
         spec.getOperatorMap().put(getOperatorId(), this);
+        displayName = getClass().getName() + "[" + odId + "]";
     }
 
     @Override
@@ -66,6 +69,14 @@
         return recordDescriptors;
     }
 
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    public void setDisplayName(String displayName) {
+        this.displayName = displayName;
+    }
+
     @Override
     public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
             ICCApplicationContext appCtx) {
@@ -79,6 +90,7 @@
         jop.put("java-class", getClass().getName());
         jop.put("in-arity", getInputArity());
         jop.put("out-arity", getOutputArity());
+        jop.put("display-name", displayName);
         return jop;
     }
 }
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPayloadSerializerDeserializer.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPayloadSerializerDeserializer.java
new file mode 100644
index 0000000..62648ff
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPayloadSerializerDeserializer.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.nio.ByteBuffer;
+
+public interface IPayloadSerializerDeserializer {
+    public Object deserializeObject(ByteBuffer buffer, int length) throws Exception;
+
+    public Exception deserializeException(ByteBuffer buffer, int length) throws Exception;
+
+    public byte[] serializeObject(Object object) throws Exception;
+
+    public byte[] serializeException(Exception object) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index eb9cbd3..053ac6b 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -208,7 +208,7 @@
                                         SelectionKey key = handle.getKey();
                                         key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
                                     } else {
-                                        if (buffer.position() == 0) {
+                                        if (!buffer.hasRemaining()) {
                                             handle.resizeOutBuffer();
                                             continue;
                                         }
@@ -251,7 +251,10 @@
                                 } else if (!writeBuffer.hasRemaining()) {
                                     key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                                 }
-                                handle.clearFull();
+                                if (handle.full()) {
+                                    handle.clearFull();
+                                    selector.wakeup();
+                                }
                             } else if (key.isAcceptable()) {
                                 assert sc == serverSocketChannel;
                                 SocketChannel channel = serverSocketChannel.accept();
@@ -273,7 +276,7 @@
                             }
                         }
                     }
-                } catch (IOException e) {
+                } catch (Exception e) {
                     e.printStackTrace();
                 }
             }
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
index f3c05c6..3d3bc7a 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -54,6 +54,10 @@
         return remoteAddress;
     }
 
+    IPCSystem getIPCSystem() {
+        return system;
+    }
+
     void setRemoteAddress(InetSocketAddress remoteAddress) {
         this.remoteAddress = remoteAddress;
     }
@@ -161,14 +165,12 @@
         inBuffer.flip();
         ByteBuffer readBuffer = ByteBuffer.allocate(inBuffer.capacity() * 2);
         readBuffer.put(inBuffer);
-        readBuffer.compact();
         inBuffer = readBuffer;
     }
 
     void resizeOutBuffer() {
         ByteBuffer writeBuffer = ByteBuffer.allocate(outBuffer.capacity() * 2);
         writeBuffer.put(outBuffer);
-        writeBuffer.compact();
         writeBuffer.flip();
         outBuffer = writeBuffer;
     }
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index 5f60c6d..6c9c82c 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -20,6 +20,7 @@
 
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
 import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
 
 public class IPCSystem {
@@ -27,15 +28,15 @@
 
     private final IIPCI ipci;
 
+    private final IPayloadSerializerDeserializer serde;
+
     private final AtomicLong midFactory;
 
-    public IPCSystem(InetSocketAddress socketAddress) throws IOException {
-        this(socketAddress, null);
-    }
-
-    public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci) throws IOException {
+    public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, IPayloadSerializerDeserializer serde)
+            throws IOException {
         cMgr = new IPCConnectionManager(this, socketAddress);
         this.ipci = ipci;
+        this.serde = serde;
         midFactory = new AtomicLong();
     }
 
@@ -57,6 +58,10 @@
         }
     }
 
+    IPayloadSerializerDeserializer getSerializerDeserializer() {
+        return serde;
+    }
+
     long createMessageId() {
         return midFactory.incrementAndGet();
     }
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
new file mode 100644
index 0000000..fdf8e92
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
@@ -0,0 +1,53 @@
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+
+public class JavaSerializationBasedPayloadSerializerDeserializer implements IPayloadSerializerDeserializer {
+    @Override
+    public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
+        return deserialize(buffer, length);
+    }
+
+    @Override
+    public Exception deserializeException(ByteBuffer buffer, int length) throws Exception {
+        return (Exception) deserialize(buffer, length);
+    }
+
+    @Override
+    public byte[] serializeObject(Object object) throws Exception {
+        return serialize(object);
+    }
+
+    @Override
+    public byte[] serializeException(Exception exception) throws Exception {
+        return serialize(exception);
+    }
+
+    public static void serialize(OutputStream out, Object object) throws Exception {
+        ObjectOutputStream oos = new ObjectOutputStream(out);
+        oos.writeObject(object);
+        oos.flush();
+    }
+
+    private Object deserialize(ByteBuffer buffer, int length) throws Exception {
+        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(),
+                length));
+        Object object = ois.readObject();
+        ois.close();
+        return object;
+    }
+
+    private byte[] serialize(Object object) throws Exception {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        serialize(baos, object);
+        baos.close();
+        return baos.toByteArray();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
index 6bff56f..6bb3156 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
@@ -14,13 +14,10 @@
  */
 package edu.uci.ics.hyracks.ipc.impl;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+
 class Message {
     private static final int MSG_SIZE_SIZE = 4;
 
@@ -92,29 +89,26 @@
         return buffer.remaining() >= msgSize + MSG_SIZE_SIZE;
     }
 
-    void read(ByteBuffer buffer) throws IOException, ClassNotFoundException {
+    void read(ByteBuffer buffer) throws Exception {
         assert hasMessage(buffer);
         int msgSize = buffer.getInt();
         messageId = buffer.getLong();
         requestMessageId = buffer.getLong();
         flag = buffer.get();
         int finalPosition = buffer.position() + msgSize - HEADER_SIZE;
+        int length = msgSize - HEADER_SIZE;
         try {
-            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(),
-                    msgSize - HEADER_SIZE));
-            payload = ois.readObject();
-            ois.close();
+            IPayloadSerializerDeserializer serde = ipcHandle.getIPCSystem().getSerializerDeserializer();
+            payload = flag == ERROR ? serde.deserializeException(buffer, length) : serde.deserializeObject(buffer,
+                    length);
         } finally {
             buffer.position(finalPosition);
         }
     }
 
-    boolean write(ByteBuffer buffer) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutputStream oos = new ObjectOutputStream(baos);
-        oos.writeObject(payload);
-        oos.close();
-        byte[] bytes = baos.toByteArray();
+    boolean write(ByteBuffer buffer) throws Exception {
+        IPayloadSerializerDeserializer serde = ipcHandle.getIPCSystem().getSerializerDeserializer();
+        byte[] bytes = flag == ERROR ? serde.serializeException((Exception) payload) : serde.serializeObject(payload);
         if (buffer.remaining() >= MSG_SIZE_SIZE + HEADER_SIZE + bytes.length) {
             buffer.putInt(HEADER_SIZE + bytes.length);
             buffer.putLong(messageId);
diff --git a/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
index dac93dd..5b2f660 100644
--- a/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
+++ b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.ipc.api.RPCInterface;
 import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
 import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
 
 public class IPCTest {
     @Test
@@ -80,10 +81,12 @@
                 });
             }
         };
-        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci);
+        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci,
+                new JavaSerializationBasedPayloadSerializerDeserializer());
     }
 
     private IPCSystem createClientIPCSystem(RPCInterface rpci) throws IOException {
-        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci);
+        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci,
+                new JavaSerializationBasedPayloadSerializerDeserializer());
     }
 }
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 99140db..7f6853b 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -21,12 +21,16 @@
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
 import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
 import edu.uci.ics.hyracks.net.exceptions.NetException;
 
 public class ChannelControlBlock {
+    private static final Logger LOGGER = Logger.getLogger(ChannelControlBlock.class.getName());
+
     private final ChannelSet cSet;
 
     private final int channelId;
@@ -166,6 +170,9 @@
             public void close() {
                 synchronized (ChannelControlBlock.this) {
                     if (eos) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Received duplicate close() on channel: " + channelId);
+                        }
                         return;
                     }
                     eos = true;
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 238f2ee..1d71fac 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -178,6 +178,7 @@
             if (!writerState.performPendingWrite(sc)) {
                 return;
             }
+            pendingWriteEventsCounter.decrement();
         }
         int numCycles;
 
@@ -239,9 +240,11 @@
             }
             writeCCB.write(writerState);
             if (writerState.writePending()) {
+                pendingWriteEventsCounter.increment();
                 if (!writerState.performPendingWrite(sc)) {
                     return;
                 }
+                pendingWriteEventsCounter.decrement();
             }
         }
     }
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index 37641cb..d73ba21 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -70,7 +70,7 @@
         ioThreads[targetThread].initiateConnection(remoteAddress);
     }
 
-    private void addIncomingConnection(SocketChannel channel) {
+    private void distributeIncomingConnection(SocketChannel channel) {
         int targetThread = getNextThread();
         ioThreads[targetThread].addIncomingConnection(channel);
     }
@@ -80,24 +80,23 @@
     }
 
     private class IOThread extends Thread {
-        private final List<InetSocketAddress>[] pendingConnections;
+        private final List<InetSocketAddress> pendingConnections;
 
-        private final List<SocketChannel>[] incomingConnections;
+        private final List<InetSocketAddress> workingPendingConnections;
 
-        private int writerIndex;
+        private final List<SocketChannel> incomingConnections;
 
-        private int readerIndex;
+        private final List<SocketChannel> workingIncomingConnections;
 
         private Selector selector;
 
         public IOThread() throws IOException {
             super("TCPEndpoint IO Thread");
             setPriority(MAX_PRIORITY);
-            this.pendingConnections = new List[] { new ArrayList<InetSocketAddress>(),
-                    new ArrayList<InetSocketAddress>() };
-            this.incomingConnections = new List[] { new ArrayList<SocketChannel>(), new ArrayList<SocketChannel>() };
-            writerIndex = 0;
-            readerIndex = 1;
+            this.pendingConnections = new ArrayList<InetSocketAddress>();
+            this.workingPendingConnections = new ArrayList<InetSocketAddress>();
+            this.incomingConnections = new ArrayList<SocketChannel>();
+            this.workingIncomingConnections = new ArrayList<SocketChannel>();
             selector = Selector.open();
         }
 
@@ -106,9 +105,9 @@
             while (true) {
                 try {
                     int n = selector.select();
-                    swapReadersAndWriters();
-                    if (!pendingConnections[readerIndex].isEmpty()) {
-                        for (InetSocketAddress address : pendingConnections[readerIndex]) {
+                    collectOutstandingWork();
+                    if (!workingPendingConnections.isEmpty()) {
+                        for (InetSocketAddress address : workingPendingConnections) {
                             SocketChannel channel = SocketChannel.open();
                             channel.configureBlocking(false);
                             if (!channel.connect(address)) {
@@ -118,10 +117,10 @@
                                 createConnection(key, channel);
                             }
                         }
-                        pendingConnections[readerIndex].clear();
+                        workingPendingConnections.clear();
                     }
-                    if (!incomingConnections[readerIndex].isEmpty()) {
-                        for (SocketChannel channel : incomingConnections[readerIndex]) {
+                    if (!workingIncomingConnections.isEmpty()) {
+                        for (SocketChannel channel : workingIncomingConnections) {
                             channel.configureBlocking(false);
                             SelectionKey sKey = channel.register(selector, 0);
                             TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
@@ -130,7 +129,7 @@
                                 connectionListener.acceptedConnection(connection);
                             }
                         }
-                        incomingConnections[readerIndex].clear();
+                        workingIncomingConnections.clear();
                     }
                     if (n > 0) {
                         Iterator<SelectionKey> i = selector.selectedKeys().iterator();
@@ -148,7 +147,7 @@
                             if (key.isAcceptable()) {
                                 assert sc == serverSocketChannel;
                                 SocketChannel channel = serverSocketChannel.accept();
-                                addIncomingConnection(channel);
+                                distributeIncomingConnection(channel);
                             } else if (key.isConnectable()) {
                                 SocketChannel channel = (SocketChannel) sc;
                                 if (channel.finishConnect()) {
@@ -171,12 +170,12 @@
         }
 
         synchronized void initiateConnection(InetSocketAddress remoteAddress) {
-            pendingConnections[writerIndex].add(remoteAddress);
+            pendingConnections.add(remoteAddress);
             selector.wakeup();
         }
 
         synchronized void addIncomingConnection(SocketChannel channel) {
-            incomingConnections[writerIndex].add(channel);
+            incomingConnections.add(channel);
             selector.wakeup();
         }
 
@@ -185,10 +184,15 @@
             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
         }
 
-        private synchronized void swapReadersAndWriters() {
-            int temp = readerIndex;
-            readerIndex = writerIndex;
-            writerIndex = temp;
+        private synchronized void collectOutstandingWork() {
+            if (!pendingConnections.isEmpty()) {
+                workingPendingConnections.addAll(pendingConnections);
+                pendingConnections.clear();
+            }
+            if (!incomingConnections.isEmpty()) {
+                workingIncomingConnections.addAll(incomingConnections);
+                incomingConnections.clear();
+            }
         }
     }
 }
\ No newline at end of file