Made communication between Cluster Controller and Node Controller asynchronous

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1025 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
index 06b76ef..922b0fd 100644
--- a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -81,9 +81,9 @@
     public static void runJob(JobSpecification spec) throws Exception {
         JobId jobId = hcc.createJob(AlgebricksConfig.HYRACKS_APP_NAME, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
         AlgebricksConfig.ALGEBRICKS_LOGGER.info(spec.toJSON().toString());
-        cc.startJob(jobId);
+        hcc.start(jobId);
         AlgebricksConfig.ALGEBRICKS_LOGGER.info(jobId.toString());
-        cc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId);
     }
 
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index f74d06e..602288c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -21,82 +21,76 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+import edu.uci.ics.hyracks.ipc.api.RPCInterface;
 
 public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterface {
     private final IIPCHandle ipcHandle;
 
-    public HyracksClientInterfaceRemoteProxy(IIPCHandle ipcHandle) {
+    private final RPCInterface rpci;
+
+    public HyracksClientInterfaceRemoteProxy(IIPCHandle ipcHandle, RPCInterface rpci) {
         this.ipcHandle = ipcHandle;
+        this.rpci = rpci;
     }
 
     @Override
     public ClusterControllerInfo getClusterControllerInfo() throws Exception {
-        SyncRMI sync = new SyncRMI();
         HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif = new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
-        return (ClusterControllerInfo) sync.call(ipcHandle, gccif);
+        return (ClusterControllerInfo) rpci.call(ipcHandle, gccif);
     }
 
     @Override
     public void createApplication(String appName) throws Exception {
-        SyncRMI sync = new SyncRMI();
         HyracksClientInterfaceFunctions.CreateApplicationFunction caf = new HyracksClientInterfaceFunctions.CreateApplicationFunction(
                 appName);
-        sync.call(ipcHandle, caf);
+        rpci.call(ipcHandle, caf);
     }
 
     @Override
     public void startApplication(String appName) throws Exception {
-        SyncRMI sync = new SyncRMI();
         HyracksClientInterfaceFunctions.StartApplicationFunction saf = new HyracksClientInterfaceFunctions.StartApplicationFunction(
                 appName);
-        sync.call(ipcHandle, saf);
+        rpci.call(ipcHandle, saf);
     }
 
     @Override
     public void destroyApplication(String appName) throws Exception {
-        SyncRMI sync = new SyncRMI();
         HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = new HyracksClientInterfaceFunctions.DestroyApplicationFunction(
                 appName);
-        sync.call(ipcHandle, daf);
+        rpci.call(ipcHandle, daf);
     }
 
     @Override
     public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
-        SyncRMI sync = new SyncRMI();
         HyracksClientInterfaceFunctions.CreateJobFunction cjf = new HyracksClientInterfaceFunctions.CreateJobFunction(
                 appName, jobSpec, jobFlags);
-        return (JobId) sync.call(ipcHandle, cjf);
+        return (JobId) rpci.call(ipcHandle, cjf);
     }
 
     @Override
     public JobStatus getJobStatus(JobId jobId) throws Exception {
-        SyncRMI sync = new SyncRMI();
         HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = new HyracksClientInterfaceFunctions.GetJobStatusFunction(
                 jobId);
-        return (JobStatus) sync.call(ipcHandle, gjsf);
+        return (JobStatus) rpci.call(ipcHandle, gjsf);
     }
 
     @Override
     public void startJob(JobId jobId) throws Exception {
-        SyncRMI sync = new SyncRMI();
         HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
                 jobId);
-        sync.call(ipcHandle, sjf);
+        rpci.call(ipcHandle, sjf);
     }
 
     @Override
     public void waitForCompletion(JobId jobId) throws Exception {
-        SyncRMI sync = new SyncRMI();
         HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(
                 jobId);
-        sync.call(ipcHandle, wfcf);
+        rpci.call(ipcHandle, wfcf);
     }
 
     @Override
     public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
-        SyncRMI sync = new SyncRMI();
         HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
-        return (Map<String, NodeControllerInfo>) sync.call(ipcHandle, gncif);
+        return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif);
     }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 1b38cb9..b474ee1 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -32,6 +32,7 @@
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.RPCInterface;
 import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
 
 /**
@@ -63,10 +64,11 @@
      */
     public HyracksConnection(String ccHost, int ccPort) throws Exception {
         this.ccHost = ccHost;
-        ipc = new IPCSystem(new InetSocketAddress(0));
+        RPCInterface rpci = new RPCInterface();
+        ipc = new IPCSystem(new InetSocketAddress(0), rpci);
         ipc.start();
         IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
-        this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle);
+        this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle, rpci);
         ccInfo = hci.getClusterControllerInfo();
     }
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index fd401b4..2578b88 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -16,11 +16,9 @@
 
 import java.io.File;
 import java.net.InetSocketAddress;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
@@ -31,23 +29,19 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
-import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.context.ICCContext;
-import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
-import edu.uci.ics.hyracks.control.cc.ipc.HyracksClientInterfaceDelegateIPCI;
-import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationCreateWork;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationDestroyWork;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationStartWork;
+import edu.uci.ics.hyracks.control.cc.work.ApplicationStateChangeWork;
 import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
-import edu.uci.ics.hyracks.control.cc.work.GetJobStatusConditionVariableWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
 import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
 import edu.uci.ics.hyracks.control.cc.work.JobCreateWork;
@@ -62,29 +56,21 @@
 import edu.uci.ics.hyracks.control.cc.work.TaskCompleteWork;
 import edu.uci.ics.hyracks.control.cc.work.TaskFailureWork;
 import edu.uci.ics.hyracks.control.cc.work.UnregisterNodeWork;
+import edu.uci.ics.hyracks.control.cc.work.WaitForJobCompletionWork;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
-import edu.uci.ics.hyracks.control.common.base.IClusterController;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
-import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
-import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
-import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerDelegateIPCI;
-import edu.uci.ics.hyracks.control.common.ipc.NodeControllerRemoteProxy;
-import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
-import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
-import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
-import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions.Function;
 import edu.uci.ics.hyracks.control.common.logs.LogFile;
-import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.IPCResponder;
 import edu.uci.ics.hyracks.control.common.work.WorkQueue;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
 import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
 
-public class ClusterControllerService extends AbstractRemoteService implements IClusterController,
-        IHyracksClientInterface {
+public class ClusterControllerService extends AbstractRemoteService {
     private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
 
     private final CCConfig ccConfig;
@@ -132,11 +118,10 @@
         applications = new Hashtable<String, CCApplicationContext>();
         serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
         executor = Executors.newCachedThreadPool();
-        IIPCI ccIPCI = new ClusterControllerDelegateIPCI(this);
-        clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI, executor);
-        IIPCI ciIPCI = new HyracksClientInterfaceDelegateIPCI(this);
-        clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
-                executor);
+        IIPCI ccIPCI = new ClusterControllerIPCI();
+        clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI);
+        IIPCI ciIPCI = new HyracksClientInterfaceIPCI();
+        clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI);
         webServer = new WebServer(this);
         activeRunMap = new HashMap<JobId, JobRun>();
         runMapArchive = new LinkedHashMap<JobId, JobRun>() {
@@ -203,7 +188,7 @@
     public Map<JobId, JobRun> getRunMapArchive() {
         return runMapArchive;
     }
-    
+
     public Map<String, Set<String>> getIpAddressNodeNameMap() {
         return ipAddressNodeNameMap;
     }
@@ -232,130 +217,16 @@
         return new JobId(jobCounter++);
     }
 
-    @Override
-    public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
-        JobId jobId = createJobId();
-        JobCreateWork jce = new JobCreateWork(this, jobId, appName, jobSpec, jobFlags);
-        workQueue.schedule(jce);
-        jce.sync();
-        return jobId;
-    }
-
-    @Override
-    public NodeParameters registerNode(NodeRegistration reg) throws Exception {
-        String id = reg.getNodeId();
-
-        IIPCHandle ncIPCHandle = clusterIPC.getHandle(reg.getNodeControllerAddress());
-        INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
-
-        NodeControllerState state = new NodeControllerState(nodeController, reg);
-        workQueue.scheduleAndSync(new RegisterNodeWork(this, id, state));
-        LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
-        NodeParameters params = new NodeParameters();
-        params.setClusterControllerInfo(info);
-        params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
-        params.setProfileDumpPeriod(ccConfig.profileDumpPeriod);
-        return params;
-    }
-
-    @Override
-    public void unregisterNode(String nodeId) throws Exception {
-        workQueue.schedule(new UnregisterNodeWork(this, nodeId));
-    }
-
-    @Override
-    public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
-            throws Exception {
-        TaskCompleteWork sce = new TaskCompleteWork(this, jobId, taskId, nodeId, statistics);
-        workQueue.schedule(sce);
-    }
-
-    @Override
-    public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
-        TaskFailureWork tfe = new TaskFailureWork(this, jobId, taskId, nodeId, details);
-        workQueue.schedule(tfe);
-    }
-
-    @Override
-    public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
-        JobletCleanupNotificationWork jcnw = new JobletCleanupNotificationWork(this, jobId, nodeId);
-        workQueue.schedule(jcnw);
-    }
-
-    @Override
-    public JobStatus getJobStatus(JobId jobId) throws Exception {
-        GetJobStatusWork gse = new GetJobStatusWork(this, jobId);
-        workQueue.scheduleAndSync(gse);
-        return gse.getStatus();
-    }
-
-    @Override
-    public void startJob(JobId jobId) throws Exception {
-        JobStartWork jse = new JobStartWork(this, jobId);
-        workQueue.schedule(jse);
-    }
-
-    @Override
-    public void waitForCompletion(JobId jobId) throws Exception {
-        GetJobStatusConditionVariableWork e = new GetJobStatusConditionVariableWork(this, jobId);
-        workQueue.scheduleAndSync(e);
-        IJobStatusConditionVariable var = e.getConditionVariable();
-        if (var != null) {
-            var.waitForCompletion();
-        }
-    }
-
-    @Override
-    public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
-        workQueue.schedule(new ReportProfilesWork(this, profiles));
-    }
-
-    @Override
-    public synchronized void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
-        workQueue.schedule(new NodeHeartbeatWork(this, id, hbData));
-    }
-
-    @Override
-    public void createApplication(String appName) throws Exception {
-        FutureValue<Object> fv = new FutureValue<Object>();
-        workQueue.schedule(new ApplicationCreateWork(this, appName, fv));
-        fv.get();
-    }
-
-    @Override
-    public void destroyApplication(String appName) throws Exception {
-        FutureValue<Object> fv = new FutureValue<Object>();
-        workQueue.schedule(new ApplicationDestroyWork(this, appName, fv));
-        fv.get();
-    }
-
-    @Override
-    public void startApplication(final String appName) throws Exception {
-        FutureValue<Object> fv = new FutureValue<Object>();
-        workQueue.schedule(new ApplicationStartWork(this, appName, fv));
-        fv.get();
-    }
-
-    @Override
-    public ClusterControllerInfo getClusterControllerInfo() throws Exception {
+    public ClusterControllerInfo getClusterControllerInfo() {
         return info;
     }
 
-    @Override
-    public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
-        FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<Map<String, NodeControllerInfo>>();
-        workQueue.schedule(new GetNodeControllersInfoWork(this, fv));
-        return fv.get();
+    public CCConfig getCCConfig() {
+        return ccConfig;
     }
 
-    @Override
-    public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) {
-        workQueue.schedule(new RegisterPartitionAvailibilityWork(this, partitionDescriptor));
-    }
-
-    @Override
-    public void registerPartitionRequest(PartitionRequest partitionRequest) {
-        workQueue.schedule(new RegisterPartitionRequestWork(this, partitionRequest));
+    public IPCSystem getClusterIPC() {
+        return clusterIPC;
     }
 
     private class DeadNodeSweeper extends TimerTask {
@@ -364,4 +235,156 @@
             workQueue.schedule(new RemoveDeadNodesWork(ClusterControllerService.this));
         }
     }
+
+    private class HyracksClientInterfaceIPCI implements IIPCI {
+        @Override
+        public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+            HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload;
+            switch (fn.getFunctionId()) {
+                case GET_CLUSTER_CONTROLLER_INFO: {
+                    try {
+                        handle.send(mid, info, null);
+                    } catch (IPCException e) {
+                        e.printStackTrace();
+                    }
+                    return;
+                }
+
+                case CREATE_APPLICATION: {
+                    HyracksClientInterfaceFunctions.CreateApplicationFunction caf = (HyracksClientInterfaceFunctions.CreateApplicationFunction) fn;
+                    workQueue.schedule(new ApplicationCreateWork(ClusterControllerService.this, caf.getAppName(),
+                            new IPCResponder<Object>(handle, mid)));
+                    return;
+                }
+
+                case START_APPLICATION: {
+                    HyracksClientInterfaceFunctions.StartApplicationFunction saf = (HyracksClientInterfaceFunctions.StartApplicationFunction) fn;
+                    workQueue.schedule(new ApplicationStartWork(ClusterControllerService.this, saf.getAppName(),
+                            new IPCResponder<Object>(handle, mid)));
+                    return;
+                }
+
+                case DESTROY_APPLICATION: {
+                    HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = (HyracksClientInterfaceFunctions.DestroyApplicationFunction) fn;
+                    workQueue.schedule(new ApplicationDestroyWork(ClusterControllerService.this, daf.getAppName(),
+                            new IPCResponder<Object>(handle, mid)));
+                    return;
+                }
+
+                case CREATE_JOB: {
+                    HyracksClientInterfaceFunctions.CreateJobFunction cjf = (HyracksClientInterfaceFunctions.CreateJobFunction) fn;
+                    JobId jobId = createJobId();
+                    workQueue.schedule(new JobCreateWork(ClusterControllerService.this, jobId, cjf.getAppName(), cjf
+                            .getJobSpec(), cjf.getJobFlags(), new IPCResponder<JobId>(handle, mid)));
+                    return;
+                }
+
+                case GET_JOB_STATUS: {
+                    HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
+                    workQueue.schedule(new GetJobStatusWork(ClusterControllerService.this, gjsf.getJobId(),
+                            new IPCResponder<JobStatus>(handle, mid)));
+                    return;
+                }
+
+                case START_JOB: {
+                    HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
+                    workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getJobId(),
+                            new IPCResponder<Object>(handle, mid)));
+                    return;
+                }
+
+                case WAIT_FOR_COMPLETION: {
+                    HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
+                    workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
+                            new IPCResponder<Object>(handle, mid)));
+                    return;
+                }
+
+                case GET_NODE_CONTROLLERS_INFO: {
+                    workQueue.schedule(new GetNodeControllersInfoWork(ClusterControllerService.this,
+                            new IPCResponder<Map<String, NodeControllerInfo>>(handle, mid)));
+                    return;
+                }
+            }
+            try {
+                handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId()));
+            } catch (IPCException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private class ClusterControllerIPCI implements IIPCI {
+        @Override
+        public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+            ClusterControllerFunctions.Function fn = (Function) payload;
+            switch (fn.getFunctionId()) {
+                case REGISTER_NODE: {
+                    ClusterControllerFunctions.RegisterNodeFunction rnf = (ClusterControllerFunctions.RegisterNodeFunction) fn;
+                    workQueue.schedule(new RegisterNodeWork(ClusterControllerService.this, rnf.getNodeRegistration()));
+                    return;
+                }
+
+                case UNREGISTER_NODE: {
+                    ClusterControllerFunctions.UnregisterNodeFunction unf = (ClusterControllerFunctions.UnregisterNodeFunction) fn;
+                    workQueue.schedule(new UnregisterNodeWork(ClusterControllerService.this, unf.getNodeId()));
+                    return;
+                }
+
+                case NODE_HEARTBEAT: {
+                    ClusterControllerFunctions.NodeHeartbeatFunction nhf = (ClusterControllerFunctions.NodeHeartbeatFunction) fn;
+                    workQueue.schedule(new NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(), nhf
+                            .getHeartbeatData()));
+                    return;
+                }
+
+                case NOTIFY_JOBLET_CLEANUP: {
+                    ClusterControllerFunctions.NotifyJobletCleanupFunction njcf = (ClusterControllerFunctions.NotifyJobletCleanupFunction) fn;
+                    workQueue.schedule(new JobletCleanupNotificationWork(ClusterControllerService.this,
+                            njcf.getJobId(), njcf.getNodeId()));
+                    return;
+                }
+
+                case REPORT_PROFILE: {
+                    ClusterControllerFunctions.ReportProfileFunction rpf = (ClusterControllerFunctions.ReportProfileFunction) fn;
+                    workQueue.schedule(new ReportProfilesWork(ClusterControllerService.this, rpf.getProfiles()));
+                    return;
+                }
+
+                case NOTIFY_TASK_COMPLETE: {
+                    ClusterControllerFunctions.NotifyTaskCompleteFunction ntcf = (ClusterControllerFunctions.NotifyTaskCompleteFunction) fn;
+                    workQueue.schedule(new TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(), ntcf
+                            .getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
+                    return;
+                }
+                case NOTIFY_TASK_FAILURE: {
+                    ClusterControllerFunctions.NotifyTaskFailureFunction ntff = (ClusterControllerFunctions.NotifyTaskFailureFunction) fn;
+                    workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(), ntff
+                            .getTaskId(), ntff.getDetails(), ntff.getDetails()));
+                    return;
+                }
+
+                case REGISTER_PARTITION_PROVIDER: {
+                    ClusterControllerFunctions.RegisterPartitionProviderFunction rppf = (ClusterControllerFunctions.RegisterPartitionProviderFunction) fn;
+                    workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this, rppf
+                            .getPartitionDescriptor()));
+                    return;
+                }
+
+                case REGISTER_PARTITION_REQUEST: {
+                    ClusterControllerFunctions.RegisterPartitionRequestFunction rprf = (ClusterControllerFunctions.RegisterPartitionRequestFunction) fn;
+                    workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this, rprf
+                            .getPartitionRequest()));
+                    return;
+                }
+
+                case APPLICATION_STATE_CHANGE_RESPONSE: {
+                    ClusterControllerFunctions.ApplicationStateChangeResponseFunction astrf = (ClusterControllerFunctions.ApplicationStateChangeResponseFunction) fn;
+                    workQueue.schedule(new ApplicationStateChangeWork(ClusterControllerService.this, astrf));
+                    return;
+                }
+            }
+            LOGGER.warning("Unknown function: " + fn.getFunctionId());
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index c1c0161..28f7c59 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -17,7 +17,9 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.application.ICCBootstrap;
@@ -30,10 +32,17 @@
 import edu.uci.ics.hyracks.control.cc.job.DeserializingJobSpecificationFactory;
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 
 public class CCApplicationContext extends ApplicationContext implements ICCApplicationContext {
     private final ICCContext ccContext;
 
+    protected final Set<String> initPendingNodeIds;
+    protected final Set<String> deinitPendingNodeIds;
+
+    protected IResultCallback<Object> initializationCallback;
+    protected IResultCallback<Object> deinitializationCallback;
+
     private IJobSpecificationFactory jobSpecFactory;
 
     private List<IJobLifecycleListener> jobLifecycleListeners;
@@ -41,6 +50,8 @@
     public CCApplicationContext(ServerContext serverCtx, ICCContext ccContext, String appName) throws IOException {
         super(serverCtx, appName);
         this.ccContext = ccContext;
+        initPendingNodeIds = new HashSet<String>();
+        deinitPendingNodeIds = new HashSet<String>();
         jobSpecFactory = DeserializingJobSpecificationFactory.INSTANCE;
         jobLifecycleListeners = new ArrayList<IJobLifecycleListener>();
     }
@@ -98,4 +109,28 @@
             l.notifyJobCreation(jobId, specification);
         }
     }
+
+    public Set<String> getInitializationPendingNodeIds() {
+        return initPendingNodeIds;
+    }
+
+    public Set<String> getDeinitializationPendingNodeIds() {
+        return deinitPendingNodeIds;
+    }
+
+    public IResultCallback<Object> getInitializationCallback() {
+        return initializationCallback;
+    }
+
+    public void setInitializationCallback(IResultCallback<Object> initializationCallback) {
+        this.initializationCallback = initializationCallback;
+    }
+
+    public IResultCallback<Object> getDeinitializationCallback() {
+        return deinitializationCallback;
+    }
+
+    public void setDeinitializationCallback(IResultCallback<Object> deinitializationCallback) {
+        this.deinitializationCallback = deinitializationCallback;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java
deleted file mode 100644
index b3aa406..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.ipc;
-
-import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
-import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
-import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.IIPCI;
-
-public class HyracksClientInterfaceDelegateIPCI implements IIPCI {
-    private final IHyracksClientInterface hci;
-
-    public HyracksClientInterfaceDelegateIPCI(IHyracksClientInterface hci) {
-        this.hci = hci;
-    }
-
-    @Override
-    public Object call(IIPCHandle caller, Object req) throws Exception {
-        HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) req;
-        switch (fn.getFunctionId()) {
-            case GET_CLUSTER_CONTROLLER_INFO: {
-                return hci.getClusterControllerInfo();
-            }
-
-            case CREATE_APPLICATION: {
-                HyracksClientInterfaceFunctions.CreateApplicationFunction caf = (HyracksClientInterfaceFunctions.CreateApplicationFunction) fn;
-                hci.createApplication(caf.getAppName());
-                return null;
-            }
-
-            case START_APPLICATION: {
-                HyracksClientInterfaceFunctions.StartApplicationFunction saf = (HyracksClientInterfaceFunctions.StartApplicationFunction) fn;
-                hci.startApplication(saf.getAppName());
-                return null;
-            }
-
-            case DESTROY_APPLICATION: {
-                HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = (HyracksClientInterfaceFunctions.DestroyApplicationFunction) fn;
-                hci.destroyApplication(daf.getAppName());
-                return null;
-            }
-
-            case CREATE_JOB: {
-                HyracksClientInterfaceFunctions.CreateJobFunction cjf = (HyracksClientInterfaceFunctions.CreateJobFunction) fn;
-                return hci.createJob(cjf.getAppName(), cjf.getJobSpec(), cjf.getJobFlags());
-            }
-
-            case GET_JOB_STATUS: {
-                HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
-                return hci.getJobStatus(gjsf.getJobId());
-            }
-
-            case START_JOB: {
-                HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
-                hci.startJob(sjf.getJobId());
-                return null;
-            }
-
-            case WAIT_FOR_COMPLETION: {
-                HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
-                hci.waitForCompletion(wfcf.getJobId());
-                return null;
-            }
-
-            case GET_NODE_CONTROLLERS_INFO: {
-                return hci.getNodeControllersInfo();
-            }
-        }
-        throw new IllegalArgumentException("Unknown function " + fn.getFunctionId());
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteOp.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteOp.java
deleted file mode 100644
index ec15186..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteOp.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.remote;
-
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-
-public interface RemoteOp<T> {
-    public String getNodeId();
-
-    public T execute(INodeController node) throws Exception;
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteRunner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteRunner.java
deleted file mode 100644
index a37c786..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/RemoteRunner.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.remote;
-
-import java.util.List;
-import java.util.Vector;
-import java.util.concurrent.Semaphore;
-
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-
-public class RemoteRunner {
-    public static <T, R> R runRemote(ClusterControllerService ccs, final RemoteOp<T>[] remoteOps,
-            final Accumulator<T, R> accumulator) throws Exception {
-        final Semaphore installComplete = new Semaphore(remoteOps.length);
-        final List<Exception> errors = new Vector<Exception>();
-        for (final RemoteOp<T> remoteOp : remoteOps) {
-            NodeControllerState nodeState = ccs.getNodeMap().get(remoteOp.getNodeId());
-            final INodeController node = nodeState.getNodeController();
-
-            installComplete.acquire();
-            Runnable remoteRunner = new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        T t = remoteOp.execute(node);
-                        if (accumulator != null) {
-                            synchronized (accumulator) {
-                                accumulator.accumulate(t);
-                            }
-                        }
-                    } catch (Exception e) {
-                        errors.add(e);
-                    } finally {
-                        installComplete.release();
-                    }
-                }
-            };
-
-            ccs.getExecutor().execute(remoteRunner);
-        }
-        installComplete.acquire(remoteOps.length);
-        if (!errors.isEmpty()) {
-            throw errors.get(0);
-        }
-        return accumulator == null ? null : accumulator.getResult();
-    }
-}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/ApplicationDestroyer.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/ApplicationDestroyer.java
deleted file mode 100644
index 52e726c..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/ApplicationDestroyer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.remote.ops;
-
-import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-
-public class ApplicationDestroyer implements RemoteOp<Void> {
-    private String nodeId;
-    private String appName;
-
-    public ApplicationDestroyer(String nodeId, String appName) {
-        this.nodeId = nodeId;
-        this.appName = appName;
-    }
-
-    @Override
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
-    public Void execute(INodeController node) throws Exception {
-        node.destroyApplication(appName);
-        return null;
-    }
-
-    @Override
-    public String toString() {
-        return "Destroyed application: " + appName;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/ApplicationStarter.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/ApplicationStarter.java
deleted file mode 100644
index d6a1d26..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/ApplicationStarter.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.remote.ops;
-
-import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-
-public class ApplicationStarter implements RemoteOp<Void> {
-    private String nodeId;
-    private String appName;
-    private boolean deployHar;
-    private byte[] distributedState;
-
-    public ApplicationStarter(String nodeId, String appName, boolean deployHar, byte[] distributedState) {
-        this.nodeId = nodeId;
-        this.appName = appName;
-        this.deployHar = deployHar;
-        this.distributedState = distributedState;
-    }
-
-    @Override
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
-    public Void execute(INodeController node) throws Exception {
-        node.createApplication(appName, deployHar, distributedState);
-        return null;
-    }
-
-    @Override
-    public String toString() {
-        return "Started application: " + appName;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
deleted file mode 100644
index d0f9151..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.remote.ops;
-
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-
-public class JobCompleteNotifier implements RemoteOp<Void> {
-    private String nodeId;
-    private JobId jobId;
-    private JobStatus status;
-
-    public JobCompleteNotifier(String nodeId, JobId jobId, JobStatus status) {
-        this.nodeId = nodeId;
-        this.jobId = jobId;
-        this.status = status;
-    }
-
-    @Override
-    public Void execute(INodeController node) throws Exception {
-        node.cleanUpJoblet(jobId, status);
-        return null;
-    }
-
-    @Override
-    public String toString() {
-        return jobId + " Cleaning Up";
-    }
-
-    @Override
-    public String getNodeId() {
-        return nodeId;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java
index 3c8f7d0..15d6d1f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java
@@ -20,34 +20,42 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 
 public class ApplicationCreateWork extends AbstractWork {
     private final ClusterControllerService ccs;
     private final String appName;
-    private FutureValue<Object> fv;
+    private IResultCallback<Object> callback;
 
-    public ApplicationCreateWork(ClusterControllerService ccs, String appName, FutureValue<Object> fv) {
+    public ApplicationCreateWork(ClusterControllerService ccs, String appName, IResultCallback<Object> callback) {
         this.ccs = ccs;
         this.appName = appName;
-        this.fv = fv;
+        this.callback = callback;
     }
 
     @Override
     public void run() {
-        Map<String, CCApplicationContext> applications = ccs.getApplicationMap();
-        if (applications.containsKey(appName)) {
-            fv.setException(new HyracksException("Duplicate application with name: " + appName + " being created."));
-        }
-        CCApplicationContext appCtx;
         try {
-            appCtx = new CCApplicationContext(ccs.getServerContext(), ccs.getCCContext(), appName);
-        } catch (IOException e) {
-            fv.setException(e);
-            return;
+            Map<String, CCApplicationContext> applications = ccs.getApplicationMap();
+            if (applications.containsKey(appName)) {
+                callback.setException(new HyracksException("Duplicate application with name: " + appName
+                        + " being created."));
+                return;
+            }
+            CCApplicationContext appCtx;
+            try {
+                appCtx = new CCApplicationContext(ccs.getServerContext(), ccs.getCCContext(), appName);
+            } catch (IOException e) {
+                callback.setException(e);
+                return;
+            }
+            appCtx.setStatus(ApplicationStatus.CREATED);
+            applications.put(appName, appCtx);
+            callback.setValue(null);
+        } catch (Exception e) {
+            callback.setException(e);
         }
-        applications.put(appName, appCtx);
-        fv.setValue(null);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java
index c6af1b9..ac780c4 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationDestroyWork.java
@@ -14,62 +14,51 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Map;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
-import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
-import edu.uci.ics.hyracks.control.cc.remote.ops.ApplicationDestroyer;
-import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.base.INodeController;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 
 public class ApplicationDestroyWork extends AbstractWork {
     private final ClusterControllerService ccs;
     private final String appName;
-    private FutureValue<Object> fv;
+    private IResultCallback<Object> callback;
 
-    public ApplicationDestroyWork(ClusterControllerService ccs, String appName, FutureValue<Object> fv) {
+    public ApplicationDestroyWork(ClusterControllerService ccs, String appName, IResultCallback<Object> callback) {
         this.ccs = ccs;
         this.appName = appName;
-        this.fv = fv;
+        this.callback = callback;
     }
 
     @Override
     public void run() {
-        final ApplicationContext appCtx = ccs.getApplicationMap().remove(appName);
-        if (appCtx == null) {
-            fv.setException(new HyracksException("No application with name: " + appName));
-            return;
-        }
-        List<RemoteOp<Void>> opList = new ArrayList<RemoteOp<Void>>();
-        for (final String nodeId : ccs.getNodeMap().keySet()) {
-            opList.add(new ApplicationDestroyer(nodeId, appName));
-        }
-        final RemoteOp[] ops = opList.toArray(new RemoteOp[opList.size()]);
-        ccs.getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    RemoteRunner.runRemote(ccs, ops, null);
-                } catch (Exception e) {
-                    fv.setException(e);
-                    return;
-                }
-                ccs.getWorkQueue().schedule(new AbstractWork() {
-                    @Override
-                    public void run() {
-                        try {
-                            appCtx.deinitialize();
-                        } catch (Exception e) {
-                            fv.setException(e);
-                        }
-                        fv.setValue(null);
-                    }
-                });
+        try {
+            final CCApplicationContext appCtx = ccs.getApplicationMap().remove(appName);
+            if (appCtx == null) {
+                callback.setException(new HyracksException("No application with name: " + appName));
+                return;
             }
-        });
+            if (appCtx.getStatus() == ApplicationStatus.IN_DEINITIALIZATION
+                    || appCtx.getStatus() == ApplicationStatus.DEINITIALIZED) {
+                return;
+            }
+            Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+            appCtx.getDeinitializationPendingNodeIds().addAll(nodeMap.keySet());
+            appCtx.setStatus(ApplicationStatus.IN_DEINITIALIZATION);
+            appCtx.setDeinitializationCallback(callback);
+            for (String nodeId : ccs.getNodeMap().keySet()) {
+                NodeControllerState nodeState = nodeMap.get(nodeId);
+                final INodeController node = nodeState.getNodeController();
+                node.destroyApplication(appName);
+            }
+        } catch (Exception e) {
+            callback.setException(e);
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java
index 637c55d..e4ad56c 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStartWork.java
@@ -14,56 +14,67 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
-import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
-import edu.uci.ics.hyracks.control.cc.remote.ops.ApplicationStarter;
-import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.base.INodeController;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 
 public class ApplicationStartWork extends AbstractWork {
     private final ClusterControllerService ccs;
     private final String appName;
-    private final FutureValue<Object> fv;
+    private final IResultCallback<Object> callback;
 
-    public ApplicationStartWork(ClusterControllerService ccs, String appName, FutureValue<Object> fv) {
+    public ApplicationStartWork(ClusterControllerService ccs, String appName, IResultCallback<Object> callback) {
         this.ccs = ccs;
         this.appName = appName;
-        this.fv = fv;
+        this.callback = callback;
     }
 
     @Override
     public void run() {
-        final ApplicationContext appCtx = ccs.getApplicationMap().get(appName);
-        if (appCtx == null) {
-            fv.setException(new HyracksException("No application with name: " + appName));
-            return;
-        }
-        ccs.getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    appCtx.initializeClassPath();
-                    appCtx.initialize();
-                    final byte[] distributedState = JavaSerializationUtils.serialize(appCtx.getDistributedState());
-                    final boolean deployHar = appCtx.containsHar();
-                    List<RemoteOp<Void>> opList = new ArrayList<RemoteOp<Void>>();
-                    for (final String nodeId : ccs.getNodeMap().keySet()) {
-                        opList.add(new ApplicationStarter(nodeId, appName, deployHar, distributedState));
-                    }
-                    final RemoteOp[] ops = opList.toArray(new RemoteOp[opList.size()]);
-                    RemoteRunner.runRemote(ccs, ops, null);
-                    fv.setValue(null);
-                } catch (Exception e) {
-                    fv.setException(e);
-                }
+        try {
+            final CCApplicationContext appCtx = ccs.getApplicationMap().get(appName);
+            if (appCtx == null) {
+                callback.setException(new HyracksException("No application with name: " + appName));
+                return;
             }
-        });
+            if (appCtx.getStatus() != ApplicationStatus.CREATED) {
+                callback.setException(new HyracksException("Application in incorrect state for starting: "
+                        + appCtx.getStatus()));
+            }
+            final Map<String, NodeControllerState> nodeMapCopy = new HashMap<String, NodeControllerState>(
+                    ccs.getNodeMap());
+            appCtx.getInitializationPendingNodeIds().addAll(nodeMapCopy.keySet());
+            appCtx.setStatus(ApplicationStatus.IN_INITIALIZATION);
+            appCtx.setInitializationCallback(callback);
+            ccs.getExecutor().execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        appCtx.initializeClassPath();
+                        appCtx.initialize();
+                        final byte[] distributedState = JavaSerializationUtils.serialize(appCtx.getDistributedState());
+                        final boolean deployHar = appCtx.containsHar();
+                        for (final String nodeId : nodeMapCopy.keySet()) {
+                            NodeControllerState nodeState = nodeMapCopy.get(nodeId);
+                            final INodeController node = nodeState.getNodeController();
+                            node.createApplication(appName, deployHar, distributedState);
+                        }
+                    } catch (Exception e) {
+                        callback.setException(e);
+                    }
+                }
+            });
+        } catch (Exception e) {
+            callback.setException(e);
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
new file mode 100644
index 0000000..2d8ae85
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
@@ -0,0 +1,85 @@
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.Set;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+
+public class ApplicationStateChangeWork extends AbstractWork {
+    private static final Logger LOGGER = Logger.getLogger(ApplicationStateChangeWork.class.getName());
+
+    private final ClusterControllerService ccs;
+    private final ClusterControllerFunctions.ApplicationStateChangeResponseFunction ascrf;
+
+    public ApplicationStateChangeWork(ClusterControllerService ccs,
+            ClusterControllerFunctions.ApplicationStateChangeResponseFunction ascrf) {
+        this.ccs = ccs;
+        this.ascrf = ascrf;
+    }
+
+    @Override
+    public void run() {
+        final CCApplicationContext appCtx = ccs.getApplicationMap().get(ascrf.getApplicationName());
+        if (appCtx == null) {
+            LOGGER.warning("Got ApplicationStateChangeResponse for application " + ascrf.getApplicationName()
+                    + " that does not exist");
+            return;
+        }
+        switch (ascrf.getStatus()) {
+            case INITIALIZED: {
+                Set<String> pendingNodeIds = appCtx.getInitializationPendingNodeIds();
+                boolean changed = pendingNodeIds.remove(ascrf.getNodeId());
+                if (!changed) {
+                    LOGGER.warning("Got ApplicationStateChangeResponse for application " + ascrf.getApplicationName()
+                            + " from unexpected node " + ascrf.getNodeId() + " to state " + ascrf.getStatus());
+                    return;
+                }
+                if (pendingNodeIds.isEmpty()) {
+                    appCtx.setStatus(ApplicationStatus.INITIALIZED);
+                    IResultCallback<Object> callback = appCtx.getInitializationCallback();
+                    appCtx.setInitializationCallback(null);
+                    callback.setValue(null);
+                }
+                return;
+            }
+
+            case DEINITIALIZED: {
+                Set<String> pendingNodeIds = appCtx.getDeinitializationPendingNodeIds();
+                boolean changed = pendingNodeIds.remove(ascrf.getNodeId());
+                if (!changed) {
+                    LOGGER.warning("Got ApplicationStateChangeResponse for application " + ascrf.getApplicationName()
+                            + " from unexpected node " + ascrf.getNodeId() + " to state " + ascrf.getStatus());
+                    return;
+                }
+                if (pendingNodeIds.isEmpty()) {
+                    appCtx.setStatus(ApplicationStatus.DEINITIALIZED);
+                    ccs.getExecutor().execute(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                appCtx.deinitialize();
+                            } catch (Exception e) {
+                                e.printStackTrace();
+                            }
+                            ccs.getWorkQueue().schedule(new AbstractWork() {
+                                @Override
+                                public void run() {
+                                    ccs.getApplicationMap().remove(ascrf.getApplicationName());
+                                    IResultCallback<Object> callback = appCtx.getDeinitializationCallback();
+                                    appCtx.setDeinitializationCallback(null);
+                                    callback.setValue(null);
+                                }
+                            });
+                        }
+                    });
+                }
+                return;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusConditionVariableWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusConditionVariableWork.java
deleted file mode 100644
index ff67928..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusConditionVariableWork.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-
-public class GetJobStatusConditionVariableWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
-    private final JobId jobId;
-    private IJobStatusConditionVariable cVar;
-
-    public GetJobStatusConditionVariableWork(ClusterControllerService ccs, JobId jobId) {
-        this.ccs = ccs;
-        this.jobId = jobId;
-    }
-
-    @Override
-    protected void doRun() throws Exception {
-        cVar = ccs.getActiveRunMap().get(jobId);
-        if (cVar == null) {
-            cVar = ccs.getRunMapArchive().get(jobId);
-        }
-    }
-
-    public IJobStatusConditionVariable getConditionVariable() {
-        return cVar;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusWork.java
index 7a8943a..c70e24f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobStatusWork.java
@@ -18,28 +18,31 @@
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 
 public class GetJobStatusWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
     private final JobId jobId;
-    private JobStatus status;
+    private final IResultCallback<JobStatus> callback;
 
-    public GetJobStatusWork(ClusterControllerService ccs, JobId jobId) {
+    public GetJobStatusWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobStatus> callback) {
         this.ccs = ccs;
         this.jobId = jobId;
+        this.callback = callback;
     }
 
     @Override
     protected void doRun() throws Exception {
-        JobRun run = ccs.getActiveRunMap().get(jobId);
-        if (run == null) {
-            run = ccs.getRunMapArchive().get(jobId);
+        try {
+            JobRun run = ccs.getActiveRunMap().get(jobId);
+            if (run == null) {
+                run = ccs.getRunMapArchive().get(jobId);
+            }
+            JobStatus status = run == null ? null : run.getStatus();
+            callback.setValue(status);
+        } catch (Exception e) {
+            callback.setException(e);
         }
-        status = run == null ? null : run.getStatus();
-    }
-
-    public JobStatus getStatus() {
-        return status;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index 48d9b84..49a3be9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -22,15 +22,16 @@
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 
 public class GetNodeControllersInfoWork extends AbstractWork {
     private final ClusterControllerService ccs;
-    private FutureValue<Map<String, NodeControllerInfo>> fv;
+    private IResultCallback<Map<String, NodeControllerInfo>> callback;
 
-    public GetNodeControllersInfoWork(ClusterControllerService ccs, FutureValue<Map<String, NodeControllerInfo>> fv) {
+    public GetNodeControllersInfoWork(ClusterControllerService ccs,
+            IResultCallback<Map<String, NodeControllerInfo>> callback) {
         this.ccs = ccs;
-        this.fv = fv;
+        this.callback = callback;
     }
 
     @Override
@@ -41,6 +42,6 @@
             result.put(e.getKey(), new NodeControllerInfo(e.getKey(), e.getValue().getDataPort().getIpAddress(),
                     NodeStatus.ALIVE));
         }
-        fv.setValue(result);
+        callback.setValue(result);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index e30a718..07088bb 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -20,9 +20,8 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
-import edu.uci.ics.hyracks.control.cc.remote.ops.JobCompleteNotifier;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 
 public class JobCleanupWork extends AbstractWork {
@@ -54,22 +53,13 @@
         Set<String> targetNodes = run.getParticipatingNodeIds();
         run.getCleanupPendingNodeIds().addAll(targetNodes);
         run.setPendingStatus(status, exception);
-        final JobCompleteNotifier[] jcns = new JobCompleteNotifier[targetNodes.size()];
-        int i = 0;
         for (String n : targetNodes) {
-            jcns[i++] = new JobCompleteNotifier(n, jobId, status);
-        }
-        ccs.getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                if (jcns.length > 0) {
-                    try {
-                        RemoteRunner.runRemote(ccs, jcns, null);
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
+            NodeControllerState ncs = ccs.getNodeMap().get(n);
+            try {
+                ncs.getNodeController().cleanUpJoblet(jobId, status);
+            } catch (Exception e) {
+                e.printStackTrace();
             }
-        });
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java
index 14947cd..b7cf629 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
 import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 
 public class JobCreateWork extends SynchronizableWork {
@@ -38,42 +39,50 @@
     private final EnumSet<JobFlag> jobFlags;
     private final JobId jobId;
     private final String appName;
+    private final IResultCallback<JobId> callback;
 
     public JobCreateWork(ClusterControllerService ccs, JobId jobId, String appName, byte[] jobSpec,
-            EnumSet<JobFlag> jobFlags) {
+            EnumSet<JobFlag> jobFlags, IResultCallback<JobId> callback) {
         this.jobId = jobId;
         this.ccs = ccs;
         this.jobSpec = jobSpec;
         this.jobFlags = jobFlags;
         this.appName = appName;
+        this.callback = callback;
     }
 
     @Override
     protected void doRun() throws Exception {
-        CCApplicationContext appCtx = ccs.getApplicationMap().get(appName);
-        if (appCtx == null) {
-            throw new HyracksException("No application with id " + appName + " found");
-        }
-        JobSpecification spec = appCtx.createJobSpecification(jobSpec);
-
-        final JobActivityGraphBuilder builder = new JobActivityGraphBuilder();
-        builder.init(appName, spec, jobFlags);
-        PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
-            @Override
-            public void visit(IOperatorDescriptor op) {
-                op.contributeActivities(builder);
+        try {
+            CCApplicationContext appCtx = ccs.getApplicationMap().get(appName);
+            if (appCtx == null) {
+                throw new HyracksException("No application with id " + appName + " found");
             }
-        });
-        final JobActivityGraph jag = builder.getActivityGraph();
+            JobSpecification spec = appCtx.createJobSpecification(jobSpec);
 
-        JobRun run = new JobRun(jobId, jag);
+            final JobActivityGraphBuilder builder = new JobActivityGraphBuilder();
+            builder.init(appName, spec, jobFlags);
+            PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
+                @Override
+                public void visit(IOperatorDescriptor op) {
+                    op.contributeActivities(builder);
+                }
+            });
+            final JobActivityGraph jag = builder.getActivityGraph();
 
-        run.setStatus(JobStatus.INITIALIZED, null);
+            JobRun run = new JobRun(jobId, jag);
 
-        ccs.getActiveRunMap().put(jobId, run);
-        JobScheduler jrs = new JobScheduler(ccs, run);
-        run.setScheduler(jrs);
-        appCtx.notifyJobCreation(jobId, spec);
+            run.setStatus(JobStatus.INITIALIZED, null);
+
+            ccs.getActiveRunMap().put(jobId, run);
+            JobScheduler jrs = new JobScheduler(ccs, run);
+            run.setScheduler(jrs);
+            appCtx.notifyJobCreation(jobId, spec);
+            callback.setValue(jobId);
+        } catch (Exception e) {
+            callback.setException(e);
+            return;
+        }
     }
 
     public JobId getJobId() {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
index 855e3e7..dd834ed 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
@@ -18,31 +18,39 @@
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 
 public class JobStartWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
     private final JobId jobId;
+    private final IResultCallback<Object> callback;
 
-    public JobStartWork(ClusterControllerService ccs, JobId jobId) {
+    public JobStartWork(ClusterControllerService ccs, JobId jobId, IResultCallback<Object> callback) {
         this.ccs = ccs;
         this.jobId = jobId;
+        this.callback = callback;
     }
 
     @Override
     protected void doRun() throws Exception {
-        JobRun run = ccs.getActiveRunMap().get(jobId);
-        if (run == null) {
-            throw new Exception("Unable to find job with id = " + jobId);
-        }
-        if (run.getStatus() != JobStatus.INITIALIZED) {
-            throw new Exception("Job already started");
-        }
-        run.setStatus(JobStatus.RUNNING, null);
         try {
-            run.getScheduler().startJob();
+            JobRun run = ccs.getActiveRunMap().get(jobId);
+            if (run == null) {
+                throw new Exception("Unable to find job with id = " + jobId);
+            }
+            if (run.getStatus() != JobStatus.INITIALIZED) {
+                throw new Exception("Job already started");
+            }
+            run.setStatus(JobStatus.RUNNING, null);
+            try {
+                run.getScheduler().startJob();
+            } catch (Exception e) {
+                ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, run.getJobId(), JobStatus.FAILURE, e));
+            }
+            callback.setValue(null);
         } catch (Exception e) {
-            ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, run.getJobId(), JobStatus.FAILURE, e));
+            callback.setException(e);
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
index e65b1de..cba4b4b5 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
@@ -17,36 +17,62 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.ipc.NodeControllerFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.NodeControllerRemoteProxy;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 
 public class RegisterNodeWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
-    private final String nodeId;
-    private final NodeControllerState state;
+    private static final Logger LOGGER = Logger.getLogger(RegisterNodeWork.class.getName());
 
-    public RegisterNodeWork(ClusterControllerService ccs, String nodeId, NodeControllerState state) {
+    private final ClusterControllerService ccs;
+    private final NodeRegistration reg;
+
+    public RegisterNodeWork(ClusterControllerService ccs, NodeRegistration reg) {
         this.ccs = ccs;
-        this.nodeId = nodeId;
-        this.state = state;
+        this.reg = reg;
     }
 
     @Override
     protected void doRun() throws Exception {
-        Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
-        if (nodeMap.containsKey(nodeId)) {
-            throw new Exception("Node with this name already registered.");
+        String id = reg.getNodeId();
+
+        IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
+        NodeControllerFunctions.NodeRegistrationResult result = null;
+        try {
+            INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
+
+            NodeControllerState state = new NodeControllerState(nodeController, reg);
+            Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+            if (nodeMap.containsKey(id)) {
+                throw new Exception("Node with this name already registered.");
+            }
+            nodeMap.put(id, state);
+            Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIpAddressNodeNameMap();
+            String ipAddress = state.getNCConfig().dataIPAddress;
+            Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
+            if (nodes == null) {
+                nodes = new HashSet<String>();
+                ipAddressNodeNameMap.put(ipAddress, nodes);
+            }
+            nodes.add(id);
+            LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
+            NodeParameters params = new NodeParameters();
+            params.setClusterControllerInfo(ccs.getClusterControllerInfo());
+            params.setHeartbeatPeriod(ccs.getCCConfig().heartbeatPeriod);
+            params.setProfileDumpPeriod(ccs.getCCConfig().profileDumpPeriod);
+            result = new NodeControllerFunctions.NodeRegistrationResult(params, null);
+        } catch (Exception e) {
+            result = new NodeControllerFunctions.NodeRegistrationResult(null, e);
         }
-        nodeMap.put(nodeId, state);
-        Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIpAddressNodeNameMap();
-        String ipAddress = state.getNCConfig().dataIPAddress;
-        Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
-        if (nodes == null) {
-            nodes = new HashSet<String>();
-            ipAddressNodeNameMap.put(ipAddress, nodes);
-        }
-        nodes.add(nodeId);
+        ncIPCHandle.send(-1, result, null);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java
new file mode 100644
index 0000000..6cfe025
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.IJobStatusConditionVariable;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class WaitForJobCompletionWork extends SynchronizableWork {
+    private final ClusterControllerService ccs;
+    private final JobId jobId;
+    private final IResultCallback<Object> callback;
+
+    public WaitForJobCompletionWork(ClusterControllerService ccs, JobId jobId, IResultCallback<Object> callback) {
+        this.ccs = ccs;
+        this.jobId = jobId;
+        this.callback = callback;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        final IJobStatusConditionVariable cRunningVar = ccs.getActiveRunMap().get(jobId);
+        if (cRunningVar != null) {
+            ccs.getExecutor().execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        cRunningVar.waitForCompletion();
+                        callback.setValue(null);
+                    } catch (Exception e) {
+                        callback.setException(e);
+                    }
+                }
+            });
+        } else {
+            final IJobStatusConditionVariable cArchivedVar = ccs.getRunMapArchive().get(jobId);
+            ccs.getExecutor().execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        cArchivedVar.waitForCompletion();
+                        callback.setValue(null);
+                    } catch (Exception e) {
+                        callback.setException(e);
+                    }
+                }
+            });
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
index 42d0ecb..21e3e9d 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
@@ -19,4 +19,4 @@
 public abstract class AbstractRemoteService implements IService {
     public AbstractRemoteService() {
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
index d2a06be..9f29fbd 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
@@ -59,7 +59,6 @@
         this.serverCtx = serverCtx;
         this.appName = appName;
         this.applicationRootDir = new File(new File(serverCtx.getBaseDir(), APPLICATION_ROOT), appName);
-        status = ApplicationStatus.CREATED;
         FileUtils.deleteDirectory(applicationRootDir);
         applicationRootDir.mkdirs();
     }
@@ -87,9 +86,6 @@
     }
 
     public void initialize() throws Exception {
-        if (status != ApplicationStatus.CREATED) {
-            throw new IllegalStateException();
-        }
         if (deploymentDescriptor != null) {
             String bootstrapClass = null;
             switch (serverCtx.getServerType()) {
@@ -107,7 +103,6 @@
                 start();
             }
         }
-        status = ApplicationStatus.INITIALIZED;
     }
 
     protected abstract void start() throws Exception;
@@ -168,7 +163,6 @@
     }
 
     public void deinitialize() throws Exception {
-        status = ApplicationStatus.DEINITIALIZED;
         stop();
         File expandedFolder = getExpandedFolder();
         FileUtils.deleteDirectory(expandedFolder);
@@ -203,4 +197,12 @@
     public ClassLoader getClassLoader() {
         return classLoader;
     }
-}
+
+    public void setStatus(ApplicationStatus status) {
+        this.status = status;
+    }
+
+    public ApplicationStatus getStatus() {
+        return status;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationStatus.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationStatus.java
index 8a97ad6..b45c4f1 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationStatus.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationStatus.java
@@ -16,6 +16,8 @@
 
 public enum ApplicationStatus {
     CREATED,
+    IN_INITIALIZATION,
     INITIALIZED,
+    IN_DEINITIALIZATION,
     DEINITIALIZED
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 40e9347..a03f5c9 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -18,7 +18,7 @@
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
 import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
@@ -27,7 +27,7 @@
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
 
 public interface IClusterController {
-    public NodeParameters registerNode(NodeRegistration reg) throws Exception;
+    public void registerNode(NodeRegistration reg) throws Exception;
 
     public void unregisterNode(String nodeId) throws Exception;
 
@@ -45,4 +45,6 @@
     public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception;
 
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
+
+    public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java
deleted file mode 100644
index e35f962..0000000
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.common.ipc;
-
-import edu.uci.ics.hyracks.control.common.base.IClusterController;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions.Function;
-import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.IIPCI;
-
-public class ClusterControllerDelegateIPCI implements IIPCI {
-    private final IClusterController cc;
-
-    public ClusterControllerDelegateIPCI(IClusterController cc) {
-        this.cc = cc;
-    }
-
-    @Override
-    public Object call(IIPCHandle caller, Object req) throws Exception {
-        ClusterControllerFunctions.Function fn = (Function) req;
-        switch (fn.getFunctionId()) {
-            case REGISTER_NODE: {
-                ClusterControllerFunctions.RegisterNodeFunction rnf = (ClusterControllerFunctions.RegisterNodeFunction) fn;
-                return cc.registerNode(rnf.getNodeRegistration());
-            }
-
-            case UNREGISTER_NODE: {
-                ClusterControllerFunctions.UnregisterNodeFunction unf = (ClusterControllerFunctions.UnregisterNodeFunction) fn;
-                cc.unregisterNode(unf.getNodeId());
-                return null;
-            }
-
-            case NODE_HEARTBEAT: {
-                ClusterControllerFunctions.NodeHeartbeatFunction nhf = (ClusterControllerFunctions.NodeHeartbeatFunction) fn;
-                cc.nodeHeartbeat(nhf.getNodeId(), nhf.getHeartbeatData());
-                return null;
-            }
-
-            case NOTIFY_JOBLET_CLEANUP: {
-                ClusterControllerFunctions.NotifyJobletCleanupFunction njcf = (ClusterControllerFunctions.NotifyJobletCleanupFunction) fn;
-                cc.notifyJobletCleanup(njcf.getJobId(), njcf.getNodeId());
-                return null;
-            }
-
-            case REPORT_PROFILE: {
-                ClusterControllerFunctions.ReportProfileFunction rpf = (ClusterControllerFunctions.ReportProfileFunction) fn;
-                cc.reportProfile(rpf.getNodeId(), rpf.getProfiles());
-                return null;
-            }
-
-            case NOTIFY_TASK_COMPLETE: {
-                ClusterControllerFunctions.NotifyTaskCompleteFunction ntcf = (ClusterControllerFunctions.NotifyTaskCompleteFunction) fn;
-                cc.notifyTaskComplete(ntcf.getJobId(), ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics());
-                return null;
-            }
-            case NOTIFY_TASK_FAILURE: {
-                ClusterControllerFunctions.NotifyTaskFailureFunction ntff = (ClusterControllerFunctions.NotifyTaskFailureFunction) fn;
-                cc.notifyTaskFailure(ntff.getJobId(), ntff.getTaskId(), ntff.getDetails(), ntff.getDetails());
-                return null;
-            }
-
-            case REGISTER_PARTITION_PROVIDER: {
-                ClusterControllerFunctions.RegisterPartitionProviderFunction rppf = (ClusterControllerFunctions.RegisterPartitionProviderFunction) fn;
-                cc.registerPartitionProvider(rppf.getPartitionDescriptor());
-                return null;
-            }
-
-            case REGISTER_PARTITION_REQUEST: {
-                ClusterControllerFunctions.RegisterPartitionRequestFunction rprf = (ClusterControllerFunctions.RegisterPartitionRequestFunction) fn;
-                cc.registerPartitionRequest(rprf.getPartitionRequest());
-                return null;
-            }
-        }
-        throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
index 4c76357..2b6792e 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
@@ -19,6 +19,7 @@
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
 import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
@@ -37,6 +38,7 @@
         REPORT_PROFILE,
         REGISTER_PARTITION_PROVIDER,
         REGISTER_PARTITION_REQUEST,
+        APPLICATION_STATE_CHANGE_RESPONSE,
     }
 
     public static abstract class Function implements Serializable {
@@ -269,4 +271,35 @@
             return partitionRequest;
         }
     }
+
+    public static class ApplicationStateChangeResponseFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+        private final String appName;
+        private final ApplicationStatus status;
+
+        public ApplicationStateChangeResponseFunction(String nodeId, String appName, ApplicationStatus status) {
+            this.nodeId = nodeId;
+            this.appName = appName;
+            this.status = status;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.APPLICATION_STATE_CHANGE_RESPONSE;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public String getApplicationName() {
+            return appName;
+        }
+
+        public ApplicationStatus getStatus() {
+            return status;
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 0aeab72..b7dd0af 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,8 +18,8 @@
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.base.IClusterController;
-import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
 import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
@@ -27,7 +27,6 @@
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.SyncRMI;
 
 public class ClusterControllerRemoteProxy implements IClusterController {
     private final IIPCHandle ipcHandle;
@@ -37,75 +36,72 @@
     }
 
     @Override
-    public NodeParameters registerNode(NodeRegistration reg) throws Exception {
-        SyncRMI sync = new SyncRMI();
+    public void registerNode(NodeRegistration reg) throws Exception {
         ClusterControllerFunctions.RegisterNodeFunction fn = new ClusterControllerFunctions.RegisterNodeFunction(reg);
-        NodeParameters result = (NodeParameters) sync.call(ipcHandle, fn);
-        return result;
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void unregisterNode(String nodeId) throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.UnregisterNodeFunction fn = new ClusterControllerFunctions.UnregisterNodeFunction(
                 nodeId);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
             throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.NotifyTaskCompleteFunction fn = new ClusterControllerFunctions.NotifyTaskCompleteFunction(
                 jobId, taskId, nodeId, statistics);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.NotifyTaskFailureFunction fn = new ClusterControllerFunctions.NotifyTaskFailureFunction(
                 jobId, taskId, nodeId, details);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.NotifyJobletCleanupFunction fn = new ClusterControllerFunctions.NotifyJobletCleanupFunction(
                 jobId, nodeId);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.NodeHeartbeatFunction fn = new ClusterControllerFunctions.NodeHeartbeatFunction(id,
                 hbData);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.ReportProfileFunction fn = new ClusterControllerFunctions.ReportProfileFunction(id,
                 profiles);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.RegisterPartitionProviderFunction fn = new ClusterControllerFunctions.RegisterPartitionProviderFunction(
                 partitionDescriptor);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
-        SyncRMI sync = new SyncRMI();
         ClusterControllerFunctions.RegisterPartitionRequestFunction fn = new ClusterControllerFunctions.RegisterPartitionRequestFunction(
                 partitionRequest);
-        sync.call(ipcHandle, fn);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception {
+        ClusterControllerFunctions.ApplicationStateChangeResponseFunction fn = new ClusterControllerFunctions.ApplicationStateChangeResponseFunction(
+                nodeId, appName, status);
+        ipcHandle.send(-1, fn, null);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java
deleted file mode 100644
index f3e51ec..0000000
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.common.ipc;
-
-import edu.uci.ics.hyracks.control.common.base.INodeController;
-import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.IIPCI;
-
-public class NodeControllerDelegateIPCI implements IIPCI {
-    private final INodeController nc;
-
-    public NodeControllerDelegateIPCI(INodeController nc) {
-        this.nc = nc;
-    }
-
-    @Override
-    public Object call(IIPCHandle caller, Object req) throws Exception {
-        NodeControllerFunctions.Function fn = (NodeControllerFunctions.Function) req;
-        switch (fn.getFunctionId()) {
-            case START_TASKS: {
-                NodeControllerFunctions.StartTasksFunction stf = (NodeControllerFunctions.StartTasksFunction) fn;
-                nc.startTasks(stf.getAppName(), stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(),
-                        stf.getConnectorPolicies());
-                return null;
-            }
-
-            case ABORT_TASKS: {
-                NodeControllerFunctions.AbortTasksFunction atf = (NodeControllerFunctions.AbortTasksFunction) fn;
-                nc.abortTasks(atf.getJobId(), atf.getTasks());
-                return null;
-            }
-
-            case CLEANUP_JOBLET: {
-                NodeControllerFunctions.CleanupJobletFunction cjf = (NodeControllerFunctions.CleanupJobletFunction) fn;
-                nc.cleanUpJoblet(cjf.getJobId(), cjf.getStatus());
-                return null;
-            }
-
-            case CREATE_APPLICATION: {
-                NodeControllerFunctions.CreateApplicationFunction caf = (NodeControllerFunctions.CreateApplicationFunction) fn;
-                nc.createApplication(caf.getAppName(), caf.isDeployHar(), caf.getSerializedDistributedState());
-                return null;
-            }
-
-            case DESTROY_APPLICATION: {
-                NodeControllerFunctions.DestroyApplicationFunction daf = (NodeControllerFunctions.DestroyApplicationFunction) fn;
-                nc.destroyApplication(daf.getAppName());
-                return null;
-            }
-
-            case REPORT_PARTITION_AVAILABILITY: {
-                NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = (NodeControllerFunctions.ReportPartitionAvailabilityFunction) fn;
-                nc.reportPartitionAvailability(rpaf.getPartitionId(), rpaf.getNetworkAddress());
-                return null;
-            }
-        }
-        throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
index 0d39c1b..b72d4e5 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
@@ -25,10 +25,12 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 
 public class NodeControllerFunctions {
     public enum FunctionId {
+        NODE_REGISTRATION_RESULT,
         START_TASKS,
         ABORT_TASKS,
         CLEANUP_JOBLET,
@@ -43,6 +45,32 @@
         public abstract FunctionId getFunctionId();
     }
 
+    public static class NodeRegistrationResult extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final NodeParameters params;
+
+        private final Exception exception;
+
+        public NodeRegistrationResult(NodeParameters params, Exception exception) {
+            this.params = params;
+            this.exception = exception;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NODE_REGISTRATION_RESULT;
+        }
+
+        public NodeParameters getNodeParameters() {
+            return params;
+        }
+
+        public Exception getException() {
+            return exception;
+        }
+    }
+
     public static class StartTasksFunction extends Function {
         private static final long serialVersionUID = 1L;
 
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index ccd9468..474fb93 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -27,7 +27,6 @@
 import edu.uci.ics.hyracks.control.common.base.INodeController;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.SyncRMI;
 
 public class NodeControllerRemoteProxy implements INodeController {
     private final IIPCHandle ipcHandle;
@@ -39,48 +38,43 @@
     @Override
     public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
             Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
-        SyncRMI sync = new SyncRMI();
         NodeControllerFunctions.StartTasksFunction stf = new NodeControllerFunctions.StartTasksFunction(appName, jobId,
                 planBytes, taskDescriptors, connectorPolicies);
-        sync.call(ipcHandle, stf);
+        ipcHandle.send(-1, stf, null);
     }
 
     @Override
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
-        SyncRMI sync = new SyncRMI();
         NodeControllerFunctions.AbortTasksFunction atf = new NodeControllerFunctions.AbortTasksFunction(jobId, tasks);
-        sync.call(ipcHandle, atf);
+        ipcHandle.send(-1, atf, null);
     }
 
     @Override
     public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
-        SyncRMI sync = new SyncRMI();
         NodeControllerFunctions.CleanupJobletFunction cjf = new NodeControllerFunctions.CleanupJobletFunction(jobId,
                 status);
-        sync.call(ipcHandle, cjf);
+        ipcHandle.send(-1, cjf, null);
     }
 
     @Override
     public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
             throws Exception {
-        SyncRMI sync = new SyncRMI();
         NodeControllerFunctions.CreateApplicationFunction caf = new NodeControllerFunctions.CreateApplicationFunction(
                 appName, deployHar, serializedDistributedState);
-        sync.call(ipcHandle, caf);
+        ipcHandle.send(-1, caf, null);
     }
 
     @Override
     public void destroyApplication(String appName) throws Exception {
-        SyncRMI sync = new SyncRMI();
         NodeControllerFunctions.DestroyApplicationFunction daf = new NodeControllerFunctions.DestroyApplicationFunction(
                 appName);
-        sync.call(ipcHandle, daf);
+        ipcHandle.send(-1, daf, null);
     }
 
     @Override
     public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
         NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = new NodeControllerFunctions.ReportPartitionAvailabilityFunction(
                 pid, networkAddress);
-        ipcHandle.send(rpaf, null);
+        ipcHandle.send(-1, rpaf, null);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
index 00565b6..7eb4ff6 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
@@ -14,7 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.common.work;
 
-public class FutureValue<T> {
+public class FutureValue<T> implements IResultCallback<T> {
     private boolean done;
 
     private T value;
@@ -27,6 +27,7 @@
         e = null;
     }
 
+    @Override
     public synchronized void setValue(T value) {
         done = true;
         this.value = value;
@@ -34,6 +35,7 @@
         notifyAll();
     }
 
+    @Override
     public synchronized void setException(Exception e) {
         done = true;
         this.e = e;
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IPCResponder.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IPCResponder.java
new file mode 100644
index 0000000..dcea864
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IPCResponder.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hyracks.control.common.work;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public class IPCResponder<T> implements IResultCallback<T> {
+    private final IIPCHandle handle;
+
+    private final long rmid;
+
+    public IPCResponder(IIPCHandle handle, long rmid) {
+        this.handle = handle;
+        this.rmid = rmid;
+    }
+
+    @Override
+    public void setValue(T result) {
+        try {
+            handle.send(rmid, result, null);
+        } catch (IPCException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void setException(Exception e) {
+        try {
+            handle.send(rmid, null, e);
+        } catch (IPCException e1) {
+            e1.printStackTrace();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/Accumulator.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IResultCallback.java
similarity index 78%
rename from hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/Accumulator.java
rename to hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IResultCallback.java
index c728b0b..80c3d76 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/Accumulator.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IResultCallback.java
@@ -12,10 +12,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.control.cc.remote;
+package edu.uci.ics.hyracks.control.common.work;
 
-public interface Accumulator<T, R> {
-    public void accumulate(T o);
+public interface IResultCallback<T> {
+    public void setValue(T result);
 
-    public R getResult();
+    public void setException(Exception e);
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index e211859..ba1d970 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -39,18 +39,11 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
 import edu.uci.ics.hyracks.control.common.base.IClusterController;
-import edu.uci.ics.hyracks.control.common.base.INodeController;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
@@ -58,8 +51,7 @@
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
 import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
-import edu.uci.ics.hyracks.control.common.ipc.NodeControllerDelegateIPCI;
-import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.ipc.NodeControllerFunctions;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.work.FutureValue;
 import edu.uci.ics.hyracks.control.common.work.WorkQueue;
@@ -76,10 +68,11 @@
 import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
 import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
 import edu.uci.ics.hyracks.net.protocols.muxdemux.PerformanceCounters;
 
-public class NodeControllerService extends AbstractRemoteService implements INodeController {
+public class NodeControllerService extends AbstractRemoteService {
     private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
 
     private NCConfig ncConfig;
@@ -98,6 +91,10 @@
 
     private final Timer timer;
 
+    private boolean registrationPending;
+
+    private Exception registrationException;
+
     private IClusterController ccs;
 
     private final Map<JobId, Joblet> jobletMap;
@@ -126,8 +123,8 @@
         this.ncConfig = ncConfig;
         id = ncConfig.nodeId;
         executor = Executors.newCachedThreadPool();
-        NodeControllerDelegateIPCI ipci = new NodeControllerDelegateIPCI(this);
-        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci, executor);
+        NodeControllerIPCI ipci = new NodeControllerIPCI();
+        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci);
         this.ctx = new RootHyracksContext(ncConfig.frameSize, new IOManager(getDevices(ncConfig.ioDevices), executor));
         if (id == null) {
             throw new Exception("id not set");
@@ -146,6 +143,7 @@
         threadMXBean = ManagementFactory.getThreadMXBean();
         runtimeMXBean = ManagementFactory.getRuntimeMXBean();
         osMXBean = ManagementFactory.getOperatingSystemMXBean();
+        registrationPending = true;
     }
 
     public IHyracksRootContext getRootContext() {
@@ -162,6 +160,13 @@
         return devices;
     }
 
+    private synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
+        this.nodeParameters = parameters;
+        this.registrationException = exception;
+        this.registrationPending = false;
+        notifyAll();
+    }
+
     @Override
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting NodeControllerService");
@@ -174,9 +179,19 @@
             gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
         }
         HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
-        this.nodeParameters = ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager
-                .getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean
-                .getAvailableProcessors(), hbSchema));
+        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager.getNetworkAddress(),
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+                hbSchema));
+
+        synchronized (this) {
+            while (registrationPending) {
+                wait();
+            }
+        }
+        if (registrationException != null) {
+            throw registrationException;
+        }
+
         queue.start();
 
         heartbeatTask = new HeartbeatTask(ccs);
@@ -238,53 +253,10 @@
         return executor;
     }
 
-    @Override
-    public void startTasks(String appName, final JobId jobId, byte[] jagBytes,
-            List<TaskAttemptDescriptor> taskDescriptors,
-            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap) throws Exception {
-        StartTasksWork stw = new StartTasksWork(this, appName, jobId, jagBytes, taskDescriptors, connectorPoliciesMap);
-        queue.schedule(stw);
-    }
-
-    @Override
-    public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
-        CleanupJobletWork cjw = new CleanupJobletWork(this, jobId, status);
-        queue.schedule(cjw);
-    }
-
     public NCConfig getConfiguration() throws Exception {
         return ncConfig;
     }
 
-    @Override
-    public synchronized void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
-        AbortTasksWork atw = new AbortTasksWork(this, jobId, tasks);
-        queue.schedule(atw);
-    }
-
-    @Override
-    public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
-            throws Exception {
-        FutureValue<Object> fv = new FutureValue<Object>();
-        CreateApplicationWork caw = new CreateApplicationWork(this, appName, deployHar, serializedDistributedState, fv);
-        queue.schedule(caw);
-        fv.get();
-    }
-
-    @Override
-    public void destroyApplication(String appName) throws Exception {
-        FutureValue<Object> fv = new FutureValue<Object>();
-        DestroyApplicationWork daw = new DestroyApplicationWork(this, appName, fv);
-        queue.schedule(daw);
-        fv.get();
-    }
-
-    @Override
-    public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
-        ReportPartitionAvailabilityWork rpaw = new ReportPartitionAvailabilityWork(this, pid, networkAddress);
-        queue.scheduleAndSync(rpaw);
-    }
-
     private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
         String ipaddrStr = ncConfig.dataIPAddress;
         ipaddrStr = ipaddrStr.trim();
@@ -371,4 +343,59 @@
             }
         }
     }
+
+    private final class NodeControllerIPCI implements IIPCI {
+        @Override
+        public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+            NodeControllerFunctions.Function fn = (NodeControllerFunctions.Function) payload;
+            switch (fn.getFunctionId()) {
+                case START_TASKS: {
+                    NodeControllerFunctions.StartTasksFunction stf = (NodeControllerFunctions.StartTasksFunction) fn;
+                    queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getAppName(), stf.getJobId(), stf
+                            .getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies()));
+                    return;
+                }
+
+                case ABORT_TASKS: {
+                    NodeControllerFunctions.AbortTasksFunction atf = (NodeControllerFunctions.AbortTasksFunction) fn;
+                    queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
+                    return;
+                }
+
+                case CLEANUP_JOBLET: {
+                    NodeControllerFunctions.CleanupJobletFunction cjf = (NodeControllerFunctions.CleanupJobletFunction) fn;
+                    queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
+                    return;
+                }
+
+                case CREATE_APPLICATION: {
+                    NodeControllerFunctions.CreateApplicationFunction caf = (NodeControllerFunctions.CreateApplicationFunction) fn;
+                    queue.schedule(new CreateApplicationWork(NodeControllerService.this, caf.getAppName(), caf
+                            .isDeployHar(), caf.getSerializedDistributedState()));
+                    return;
+                }
+
+                case DESTROY_APPLICATION: {
+                    NodeControllerFunctions.DestroyApplicationFunction daf = (NodeControllerFunctions.DestroyApplicationFunction) fn;
+                    queue.schedule(new DestroyApplicationWork(NodeControllerService.this, daf.getAppName()));
+                    return;
+                }
+
+                case REPORT_PARTITION_AVAILABILITY: {
+                    NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = (NodeControllerFunctions.ReportPartitionAvailabilityFunction) fn;
+                    queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this, rpaf
+                            .getPartitionId(), rpaf.getNetworkAddress()));
+                    return;
+                }
+
+                case NODE_REGISTRATION_RESULT: {
+                    NodeControllerFunctions.NodeRegistrationResult nrrf = (NodeControllerFunctions.NodeRegistrationResult) fn;
+                    setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
+                    return;
+                }
+            }
+            throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
index 5b60a08..eb982df 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
@@ -27,9 +27,9 @@
 import org.apache.http.impl.client.DefaultHttpClient;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
-import edu.uci.ics.hyracks.control.common.work.FutureValue;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
@@ -45,15 +45,12 @@
 
     private final byte[] serializedDistributedState;
 
-    private final FutureValue<Object> fv;
-
     public CreateApplicationWork(NodeControllerService ncs, String appName, boolean deployHar,
-            byte[] serializedDistributedState, FutureValue<Object> fv) {
+            byte[] serializedDistributedState) {
         this.ncs = ncs;
         this.appName = appName;
         this.deployHar = deployHar;
         this.serializedDistributedState = serializedDistributedState;
-        this.fv = fv;
     }
 
     @Override
@@ -85,9 +82,10 @@
             appCtx.initializeClassPath();
             appCtx.setDistributedState((Serializable) appCtx.deserialize(serializedDistributedState));
             appCtx.initialize();
-            fv.setValue(null);
+            ncs.getClusterController()
+                    .notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.INITIALIZED);
         } catch (Exception e) {
-            fv.setException(e);
+            LOGGER.warning("Error creating application: " + e.getMessage());
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
index 8ac0b5c..cfe00f6 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
@@ -18,7 +18,7 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
-import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
@@ -30,12 +30,9 @@
 
     private final String appName;
 
-    private FutureValue<Object> fv;
-
-    public DestroyApplicationWork(NodeControllerService ncs, String appName, FutureValue<Object> fv) {
+    public DestroyApplicationWork(NodeControllerService ncs, String appName) {
         this.ncs = ncs;
         this.appName = appName;
-        this.fv = fv;
     }
 
     @Override
@@ -46,9 +43,9 @@
             if (appCtx != null) {
                 appCtx.deinitialize();
             }
-            fv.setValue(null);
         } catch (Exception e) {
-            fv.setException(e);
+            LOGGER.warning("Error destroying application: " + e.getMessage());
         }
+        ncs.getClusterController().notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.DEINITIALIZED);
     }
 }
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index fac141c..4ed8361 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -116,7 +116,7 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(jobId.toString());
         }
-        cc.waitForCompletion(jobId);
+        hcc.waitForCompletion(jobId);
         dumpOutputFiles();
     }
 
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
index a4adf23..8a3630f 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCHandle.java
@@ -21,7 +21,7 @@
 public interface IIPCHandle {
     public InetSocketAddress getRemoteAddress();
 
-    public void send(Object request, IResponseCallback callback) throws IPCException;
+    public long send(long requestId, Object payload, Exception exception) throws IPCException;
 
     public void setAttachment(Object attachment);
 
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
index ba9f343..24ab943 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IIPCI.java
@@ -15,5 +15,5 @@
 package edu.uci.ics.hyracks.ipc.api;
 
 public interface IIPCI {
-    public Object call(IIPCHandle caller, Object req) throws Exception;
+    public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception);
 }
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java
new file mode 100644
index 0000000..3340516
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/RPCInterface.java
@@ -0,0 +1,72 @@
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RPCInterface implements IIPCI {
+    private final Map<Long, Request> reqMap;
+
+    public RPCInterface() {
+        reqMap = new HashMap<Long, RPCInterface.Request>();
+    }
+
+    public Object call(IIPCHandle handle, Object request) throws Exception {
+        Request req;
+        synchronized (this) {
+            req = new Request();
+            long mid = handle.send(-1, request, null);
+            reqMap.put(mid, req);
+        }
+        return req.getResponse();
+    }
+
+    @Override
+    public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+        Request req;
+        synchronized (this) {
+            req = reqMap.remove(rmid);
+        }
+        assert req != null;
+        if (exception != null) {
+            req.setException(exception);
+        } else {
+            req.setResult(payload);
+        }
+    }
+
+    private static class Request {
+        private boolean pending;
+
+        private Object result;
+
+        private Exception exception;
+
+        Request() {
+            pending = true;
+            result = null;
+            exception = null;
+        }
+
+        synchronized void setResult(Object result) {
+            this.pending = false;
+            this.result = result;
+            notifyAll();
+        }
+
+        synchronized void setException(Exception exception) {
+            this.pending = false;
+            this.exception = exception;
+            notifyAll();
+        }
+
+        synchronized Object getResponse() throws Exception {
+            while (pending) {
+                wait();
+            }
+            if (exception != null) {
+                throw exception;
+            }
+            return result;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java
deleted file mode 100644
index 180b1dd..0000000
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/SyncRMI.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.ipc.api;
-
-public final class SyncRMI implements IResponseCallback {
-    private boolean pending;
-
-    private Object response;
-
-    private Exception exception;
-
-    public SyncRMI() {
-    }
-
-    @Override
-    public synchronized void callback(IIPCHandle handle, Object response, Exception exception) {
-        pending = false;
-        this.response = response;
-        this.exception = exception;
-        notifyAll();
-    }
-
-    public synchronized Object call(IIPCHandle handle, Object request) throws Exception {
-        pending = true;
-        response = null;
-        exception = null;
-        handle.send(request, this);
-        while (pending) {
-            wait();
-        }
-        if (exception != null) {
-            throw exception;
-        }
-        return response;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index 3fb4d60..99ddad5 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -162,7 +162,9 @@
                         LOGGER.fine("Starting Select");
                     }
                     int n = selector.select();
-                    swapReadersAndWriters();
+                    if (pendingConnections[readerIndex].isEmpty() && sendList[readerIndex].isEmpty()) {
+                        swapReadersAndWriters();
+                    }
                     if (!pendingConnections[readerIndex].isEmpty()) {
                         for (IPCHandle handle : pendingConnections[readerIndex]) {
                             SocketChannel channel = SocketChannel.open();
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
index 481a0b0..f3c05c6 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -17,11 +17,8 @@
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-import java.util.HashMap;
-import java.util.Map;
 
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
-import edu.uci.ics.hyracks.ipc.api.IResponseCallback;
 import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
 
 final class IPCHandle implements IIPCHandle {
@@ -31,8 +28,6 @@
 
     private InetSocketAddress remoteAddress;
 
-    private final Map<Long, IResponseCallback> pendingRequestMap;
-
     private HandleState state;
 
     private SelectionKey key;
@@ -48,7 +43,6 @@
     IPCHandle(IPCSystem system, InetSocketAddress remoteAddress) {
         this.system = system;
         this.remoteAddress = remoteAddress;
-        pendingRequestMap = new HashMap<Long, IResponseCallback>();
         inBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
         outBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
         outBuffer.flip();
@@ -65,19 +59,23 @@
     }
 
     @Override
-    public synchronized void send(Object req, IResponseCallback callback) throws IPCException {
+    public synchronized long send(long requestId, Object req, Exception exception) throws IPCException {
         if (state != HandleState.CONNECTED) {
             throw new IPCException("Handle is not in Connected state");
         }
         Message msg = new Message(this);
         long mid = system.createMessageId();
         msg.setMessageId(mid);
-        msg.setRequestMessageId(-1);
-        msg.setPayload(req);
-        if (callback != null) {
-            pendingRequestMap.put(mid, callback);
+        msg.setRequestMessageId(requestId);
+        if (exception != null) {
+            msg.setFlag(Message.ERROR);
+            msg.setPayload(exception);
+        } else {
+            msg.setFlag(Message.NORMAL);
+            msg.setPayload(req);
         }
         system.getConnectionManager().write(msg);
+        return mid;
     }
 
     @Override
@@ -110,7 +108,7 @@
         this.state = state;
         notifyAll();
     }
-    
+
     synchronized void waitTillConnected() throws InterruptedException {
         while (!isConnected()) {
             wait();
@@ -127,9 +125,6 @@
 
     synchronized void close() {
         setState(HandleState.CLOSED);
-        for (IResponseCallback cb : pendingRequestMap.values()) {
-            cb.callback(this, null, new IPCException("IPC Handle Closed"));
-        }
     }
 
     synchronized void processIncomingMessages() {
@@ -157,19 +152,7 @@
                 }
                 continue;
             }
-            long requestMessageId = message.getRequestMessageId();
-            if (requestMessageId < 0) {
-                system.deliverIncomingMessage(message);
-            } else {
-                Long rid = Long.valueOf(requestMessageId);
-                IResponseCallback cb = pendingRequestMap.remove(rid);
-                if (cb != null) {
-                    byte flag = message.getFlag();
-                    Object payload = flag == Message.ERROR ? null : message.getPayload();
-                    Exception exception = (Exception) (flag == Message.ERROR ? message.getPayload() : null);
-                    cb.callback(this, payload, exception);
-                }
-            }
+            system.deliverIncomingMessage(message);
         }
         inBuffer.compact();
     }
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index 9eef8ff..5f60c6d 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -16,7 +16,6 @@
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
@@ -28,18 +27,15 @@
 
     private final IIPCI ipci;
 
-    private final Executor executor;
-
     private final AtomicLong midFactory;
 
     public IPCSystem(InetSocketAddress socketAddress) throws IOException {
-        this(socketAddress, null, null);
+        this(socketAddress, null);
     }
 
-    public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, Executor executor) throws IOException {
+    public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci) throws IOException {
         cMgr = new IPCConnectionManager(this, socketAddress);
         this.ipci = ipci;
-        this.executor = executor;
         midFactory = new AtomicLong();
     }
 
@@ -66,25 +62,16 @@
     }
 
     void deliverIncomingMessage(final Message message) {
-        assert message.getFlag() == Message.NORMAL;
-        executor.execute(new Runnable() {
-            @Override
-            public void run() {
-                IPCHandle handle = message.getIPCHandle();
-                Message response = new Message(handle);
-                response.setMessageId(createMessageId());
-                response.setRequestMessageId(message.getMessageId());
-                response.setFlag(Message.NORMAL);
-                try {
-                    Object result = ipci.call(handle, message.getPayload());
-                    response.setPayload(result);
-                } catch (Exception e) {
-                    response.setFlag(Message.ERROR);
-                    response.setPayload(e);
-                }
-                cMgr.write(response);
-            }
-        });
+        long mid = message.getMessageId();
+        long rmid = message.getRequestMessageId();
+        Object payload = null;
+        Exception exception = null;
+        if (message.getFlag() == Message.ERROR) {
+            exception = (Exception) message.getPayload();
+        } else {
+            payload = message.getPayload();
+        }
+        ipci.deliverIncomingMessage(message.getIPCHandle(), mid, rmid, payload, exception);
     }
 
     IPCConnectionManager getConnectionManager() {
diff --git a/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
index 24cea1d..dac93dd 100644
--- a/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
+++ b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
@@ -25,7 +25,8 @@
 
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.IIPCI;
-import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+import edu.uci.ics.hyracks.ipc.api.RPCInterface;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
 import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
 
 public class IPCTest {
@@ -35,20 +36,18 @@
         server.start();
         InetSocketAddress serverAddr = server.getSocketAddress();
 
-        IPCSystem client = createClientIPCSystem();
+        RPCInterface rpci = new RPCInterface();
+        IPCSystem client = createClientIPCSystem(rpci);
         client.start();
 
         IIPCHandle handle = client.getHandle(serverAddr);
 
-        SyncRMI rmi = new SyncRMI();
         for (int i = 0; i < 100; ++i) {
-            Assert.assertEquals(rmi.call(handle, Integer.valueOf(i)), Integer.valueOf(2 * i));
+            Assert.assertEquals(rpci.call(handle, Integer.valueOf(i)), Integer.valueOf(2 * i));
         }
 
-        IIPCHandle rHandle = server.getHandle(client.getSocketAddress());
-
         try {
-            rmi.call(rHandle, "Foo");
+            rpci.call(handle, "Foo");
             Assert.assertTrue(false);
         } catch (Exception e) {
             Assert.assertTrue(true);
@@ -56,25 +55,35 @@
     }
 
     private IPCSystem createServerIPCSystem() throws IOException {
-        Executor executor = Executors.newCachedThreadPool();
+        final Executor executor = Executors.newCachedThreadPool();
         IIPCI ipci = new IIPCI() {
             @Override
-            public Object call(IIPCHandle caller, Object req) throws Exception {
-                Integer i = (Integer) req;
-                return i.intValue() * 2;
+            public void deliverIncomingMessage(final IIPCHandle handle, final long mid, long rmid,
+                    final Object payload, Exception exception) {
+                executor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        Object result = null;
+                        Exception exception = null;
+                        try {
+                            Integer i = (Integer) payload;
+                            result = i.intValue() * 2;
+                        } catch (Exception e) {
+                            exception = e;
+                        }
+                        try {
+                            handle.send(mid, result, exception);
+                        } catch (IPCException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
             }
         };
-        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci, executor);
+        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci);
     }
 
-    private IPCSystem createClientIPCSystem() throws IOException {
-        Executor executor = Executors.newCachedThreadPool();
-        IIPCI ipci = new IIPCI() {
-            @Override
-            public Object call(IIPCHandle caller, Object req) throws Exception {
-                throw new IllegalStateException();
-            }
-        };
-        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci, executor);
+    private IPCSystem createClientIPCSystem(RPCInterface rpci) throws IOException {
+        return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci);
     }
 }
\ No newline at end of file