Extract IPCIs out of ClusterControllerService

moving the two IPCIs out of cluster controller service is a good
start to cleanup the class. In addition, this change renames queue
to workQueue in NodeControllerService for consistency.

Change-Id: I403e61cc054a860bef6a71fa04393f4d9c368b36
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1325
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index a7c2a36..ef0af26 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -33,13 +33,11 @@
     public enum FunctionId {
         GET_CLUSTER_CONTROLLER_INFO,
         GET_CLUSTER_TOPOLOGY,
-        CREATE_JOB,
         GET_JOB_STATUS,
         GET_JOB_INFO,
         START_JOB,
         GET_DATASET_DIRECTORY_SERIVICE_INFO,
         GET_DATASET_RESULT_STATUS,
-        GET_DATASET_RECORD_DESCRIPTOR,
         GET_DATASET_RESULT_LOCATIONS,
         WAIT_FOR_COMPLETION,
         GET_NODE_CONTROLLERS_INFO,
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
new file mode 100644
index 0000000..01c3bf5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobIdFactory;
+import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
+import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
+import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
+import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
+import org.apache.hyracks.control.cc.work.GetJobInfoWork;
+import org.apache.hyracks.control.cc.work.GetJobStatusWork;
+import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
+import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork;
+import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork;
+import org.apache.hyracks.control.cc.work.GetResultStatusWork;
+import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
+import org.apache.hyracks.control.cc.work.JobStartWork;
+import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
+import org.apache.hyracks.control.common.work.IPCResponder;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.IIPCI;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+
+class ClientInterfaceIPCI implements IIPCI {
+
+    private static final Logger LOGGER = Logger.getLogger(ClientInterfaceIPCI.class.getName());
+    private final ClusterControllerService ccs;
+    private final JobIdFactory jobIdFactory;
+
+    ClientInterfaceIPCI(ClusterControllerService ccs) {
+        this.ccs = ccs;
+        jobIdFactory = new JobIdFactory();
+    }
+
+    @Override
+    public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload,
+            Exception exception) {
+        HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload;
+        switch (fn.getFunctionId()) {
+            case GET_CLUSTER_CONTROLLER_INFO:
+                try {
+                    handle.send(mid, ccs.getClusterControllerInfo(), null);
+                } catch (IPCException e) {
+                    LOGGER.log(Level.WARNING, "Error sending response to GET_CLUSTER_CONTROLLER_INFO request", e);
+                }
+                break;
+            case GET_JOB_STATUS:
+                HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
+                        (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
+                ccs.getWorkQueue().schedule(new GetJobStatusWork(ccs, gjsf.getJobId(),
+                        new IPCResponder<JobStatus>(handle, mid)));
+                break;
+            case GET_JOB_INFO:
+                HyracksClientInterfaceFunctions.GetJobInfoFunction gjif =
+                        (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
+                ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs, gjif.getJobId(),
+                        new IPCResponder<JobInfo>(handle, mid)));
+                break;
+            case START_JOB:
+                HyracksClientInterfaceFunctions.StartJobFunction sjf =
+                        (HyracksClientInterfaceFunctions.StartJobFunction) fn;
+                JobId jobId = jobIdFactory.create();
+                ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(),
+                        sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
+                break;
+            case GET_DATASET_DIRECTORY_SERIVICE_INFO:
+                ccs.getWorkQueue().schedule(new GetDatasetDirectoryServiceInfoWork(ccs,
+                        new IPCResponder<NetworkAddress>(handle, mid)));
+                break;
+            case GET_DATASET_RESULT_STATUS:
+                HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrsf =
+                        (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
+                ccs.getWorkQueue().schedule(new GetResultStatusWork(ccs, gdrsf.getJobId(),
+                        gdrsf.getResultSetId(), new IPCResponder<Status>(handle, mid)));
+                break;
+            case GET_DATASET_RESULT_LOCATIONS:
+                HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
+                        (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
+                ccs.getWorkQueue().schedule(new GetResultPartitionLocationsWork(ccs,
+                        gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
+                        new IPCResponder<>(handle, mid)));
+                break;
+            case WAIT_FOR_COMPLETION:
+                HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
+                        (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
+                ccs.getWorkQueue().schedule(new WaitForJobCompletionWork(ccs, wfcf.getJobId(),
+                        new IPCResponder<>(handle, mid)));
+                break;
+            case GET_NODE_CONTROLLERS_INFO:
+                ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs,
+                        new IPCResponder<>(handle, mid)));
+                break;
+            case GET_CLUSTER_TOPOLOGY:
+                try {
+                    handle.send(mid, ccs.getCCContext().getClusterTopology(), null);
+                } catch (IPCException e) {
+                    LOGGER.log(Level.WARNING, "Error sending response to GET_CLUSTER_TOPOLOGY request", e);
+                }
+                break;
+            case CLI_DEPLOY_BINARY:
+                HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
+                        (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
+                ccs.getWorkQueue().schedule(new CliDeployBinaryWork(ccs, dbf.getBinaryURLs(),
+                        dbf.getDeploymentId(), new IPCResponder<>(handle, mid)));
+                break;
+            case CLI_UNDEPLOY_BINARY:
+                HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf =
+                        (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
+                ccs.getWorkQueue().schedule(new CliUnDeployBinaryWork(ccs, udbf.getDeploymentId(),
+                        new IPCResponder<>(handle, mid)));
+                break;
+            case CLUSTER_SHUTDOWN:
+                HyracksClientInterfaceFunctions.ClusterShutdownFunction csf =
+                        (HyracksClientInterfaceFunctions.ClusterShutdownFunction) fn;
+                ccs.getWorkQueue().schedule(new ClusterShutdownWork(ccs,
+                        csf.isTerminateNCService(), new IPCResponder<>(handle, mid)));
+                break;
+            case GET_NODE_DETAILS_JSON:
+                HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gndjf =
+                        (HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn;
+                ccs.getWorkQueue().schedule(new GetNodeDetailsJSONWork(ccs, gndjf.getNodeId(),
+                        gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new IPCResponder<>(handle, mid)));
+                break;
+            case THREAD_DUMP:
+                HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
+                        (HyracksClientInterfaceFunctions.ThreadDumpFunction) fn;
+                ccs.getWorkQueue().schedule(new GetThreadDumpWork(ccs, tdf.getNode(),
+                        new IPCResponder<String>(handle, mid)));
+                break;
+            default:
+                try {
+                    handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId()));
+                } catch (IPCException e) {
+                    LOGGER.log(Level.WARNING, "Error sending Unknown function response", e);
+                }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
new file mode 100644
index 0000000..b6c9a08
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.control.cc.work.ApplicationMessageWork;
+import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
+import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
+import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
+import org.apache.hyracks.control.cc.work.NotifyDeployBinaryWork;
+import org.apache.hyracks.control.cc.work.NotifyShutdownWork;
+import org.apache.hyracks.control.cc.work.NotifyStateDumpResponse;
+import org.apache.hyracks.control.cc.work.NotifyThreadDumpResponse;
+import org.apache.hyracks.control.cc.work.RegisterNodeWork;
+import org.apache.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
+import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork;
+import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
+import org.apache.hyracks.control.cc.work.ReportProfilesWork;
+import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork;
+import org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
+import org.apache.hyracks.control.cc.work.TaskCompleteWork;
+import org.apache.hyracks.control.cc.work.TaskFailureWork;
+import org.apache.hyracks.control.cc.work.UnregisterNodeWork;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.Function;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
+import org.apache.hyracks.control.common.work.IPCResponder;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.IIPCI;
+
+class ClusterControllerIPCI implements IIPCI {
+    private static final Logger LOGGER = Logger.getLogger(ClusterControllerIPCI.class.getName());
+    private final ClusterControllerService ccs;
+
+    ClusterControllerIPCI(ClusterControllerService ccs) {
+        this.ccs = ccs;
+    }
+
+    @Override
+    public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
+            Exception exception) {
+        CCNCFunctions.Function fn = (Function) payload;
+        switch (fn.getFunctionId()) {
+            case REGISTER_NODE:
+                CCNCFunctions.RegisterNodeFunction rnf = (CCNCFunctions.RegisterNodeFunction) fn;
+                ccs.getWorkQueue().schedule(new RegisterNodeWork(ccs, rnf.getNodeRegistration()));
+                break;
+            case UNREGISTER_NODE:
+                CCNCFunctions.UnregisterNodeFunction unf = (CCNCFunctions.UnregisterNodeFunction) fn;
+                ccs.getWorkQueue().schedule(new UnregisterNodeWork(ccs, unf.getNodeId()));
+                break;
+            case NODE_HEARTBEAT:
+                CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
+                ccs.getWorkQueue().schedule(new NodeHeartbeatWork(ccs, nhf.getNodeId(),
+                        nhf.getHeartbeatData()));
+                break;
+            case NOTIFY_JOBLET_CLEANUP:
+                CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
+                ccs.getWorkQueue().schedule(new JobletCleanupNotificationWork(ccs, njcf.getJobId(),
+                        njcf.getNodeId()));
+                break;
+            case NOTIFY_DEPLOY_BINARY:
+                CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn;
+                ccs.getWorkQueue().schedule(new NotifyDeployBinaryWork(ccs, ndbf.getDeploymentId(),
+                        ndbf.getNodeId(), ndbf.getDeploymentStatus()));
+                break;
+            case REPORT_PROFILE:
+                CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
+                ccs.getWorkQueue().schedule(new ReportProfilesWork(ccs, rpf.getProfiles()));
+                break;
+            case NOTIFY_TASK_COMPLETE:
+                CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
+                ccs.getWorkQueue().schedule(new TaskCompleteWork(ccs, ntcf.getJobId(),
+                        ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
+                break;
+            case NOTIFY_TASK_FAILURE:
+                CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
+                ccs.getWorkQueue().schedule(new TaskFailureWork(ccs, ntff.getJobId(),
+                        ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
+                break;
+            case REGISTER_PARTITION_PROVIDER:
+                CCNCFunctions.RegisterPartitionProviderFunction rppf =
+                        (CCNCFunctions.RegisterPartitionProviderFunction) fn;
+                ccs.getWorkQueue().schedule(new RegisterPartitionAvailibilityWork(ccs,
+                        rppf.getPartitionDescriptor()));
+                break;
+            case REGISTER_PARTITION_REQUEST:
+                CCNCFunctions.RegisterPartitionRequestFunction rprf =
+                        (CCNCFunctions.RegisterPartitionRequestFunction) fn;
+                ccs.getWorkQueue().schedule(new RegisterPartitionRequestWork(ccs,
+                        rprf.getPartitionRequest()));
+                break;
+            case REGISTER_RESULT_PARTITION_LOCATION:
+                CCNCFunctions.RegisterResultPartitionLocationFunction rrplf =
+                        (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
+                ccs.getWorkQueue().schedule(new RegisterResultPartitionLocationWork(ccs,
+                        rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
+                        rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
+                break;
+            case REPORT_RESULT_PARTITION_WRITE_COMPLETION:
+                CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrpwc =
+                        (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
+                ccs.getWorkQueue().schedule(new ReportResultPartitionWriteCompletionWork(ccs,
+                        rrpwc.getJobId(), rrpwc.getResultSetId(), rrpwc.getPartition()));
+                break;
+            case REPORT_RESULT_PARTITION_FAILURE:
+                CCNCFunctions.ReportResultPartitionFailureFunction rrpf =
+                        (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
+                ccs.getWorkQueue().schedule(new ReportResultPartitionFailureWork(ccs,
+                        rrpf.getJobId(), rrpf.getResultSetId(), rrpf.getPartition()));
+                break;
+            case SEND_APPLICATION_MESSAGE:
+                CCNCFunctions.SendApplicationMessageFunction rsf =
+                        (CCNCFunctions.SendApplicationMessageFunction) fn;
+                ccs.getWorkQueue().schedule(new ApplicationMessageWork(ccs, rsf.getMessage(),
+                        rsf.getDeploymentId(), rsf.getNodeId()));
+                break;
+            case GET_NODE_CONTROLLERS_INFO:
+                ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs,
+                        new IResultCallback<Map<String, NodeControllerInfo>>() {
+                            @Override
+                            public void setValue(Map<String, NodeControllerInfo> result) {
+                                new IPCResponder<CCNCFunctions.GetNodeControllersInfoResponseFunction>(handle, -1)
+                                        .setValue(new CCNCFunctions.GetNodeControllersInfoResponseFunction(result));
+                            }
+
+                            @Override
+                            public void setException(Exception e) {
+                            }
+                        }));
+                break;
+            case STATE_DUMP_RESPONSE:
+                CCNCFunctions.StateDumpResponseFunction dsrf = (StateDumpResponseFunction) fn;
+                ccs.getWorkQueue().schedule(new NotifyStateDumpResponse(ccs, dsrf.getNodeId(),
+                        dsrf.getStateDumpId(), dsrf.getState()));
+                break;
+            case SHUTDOWN_RESPONSE:
+                CCNCFunctions.ShutdownResponseFunction sdrf = (ShutdownResponseFunction) fn;
+                ccs.getWorkQueue().schedule(new NotifyShutdownWork(ccs, sdrf.getNodeId()));
+                break;
+            case THREAD_DUMP_RESPONSE:
+                CCNCFunctions.ThreadDumpResponseFunction tdrf =
+                        (CCNCFunctions.ThreadDumpResponseFunction)fn;
+                ccs.getWorkQueue().schedule(new NotifyThreadDumpResponse(ccs,
+                        tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
+                break;
+            default:
+                LOGGER.warning("Unknown function: " + fn.getFunctionId());
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index ac38523..c76534b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -39,17 +39,11 @@
 
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.ClusterControllerInfo;
-import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
-import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.context.ICCContext;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobIdFactory;
-import org.apache.hyracks.api.job.JobInfo;
-import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.api.topology.TopologyDefinitionParser;
@@ -58,58 +52,21 @@
 import org.apache.hyracks.control.cc.dataset.IDatasetDirectoryService;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.web.WebServer;
-import org.apache.hyracks.control.cc.work.ApplicationMessageWork;
-import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
-import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
-import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
 import org.apache.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
-import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
 import org.apache.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
-import org.apache.hyracks.control.cc.work.GetJobInfoWork;
-import org.apache.hyracks.control.cc.work.GetJobStatusWork;
-import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
-import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
 import org.apache.hyracks.control.cc.work.GetThreadDumpWork.ThreadDumpRun;
-import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork;
-import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork;
-import org.apache.hyracks.control.cc.work.GetResultStatusWork;
-import org.apache.hyracks.control.cc.work.JobStartWork;
-import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
-import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
-import org.apache.hyracks.control.cc.work.NotifyDeployBinaryWork;
-import org.apache.hyracks.control.cc.work.NotifyShutdownWork;
-import org.apache.hyracks.control.cc.work.NotifyStateDumpResponse;
-import org.apache.hyracks.control.cc.work.NotifyThreadDumpResponse;
-import org.apache.hyracks.control.cc.work.RegisterNodeWork;
-import org.apache.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
-import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork;
-import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
 import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork;
-import org.apache.hyracks.control.cc.work.ReportProfilesWork;
-import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork;
-import org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
 import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork;
-import org.apache.hyracks.control.cc.work.TaskCompleteWork;
-import org.apache.hyracks.control.cc.work.TaskFailureWork;
 import org.apache.hyracks.control.cc.work.TriggerNCWork;
-import org.apache.hyracks.control.cc.work.UnregisterNodeWork;
-import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
 import org.apache.hyracks.control.common.context.ServerContext;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.controllers.IniUtils;
 import org.apache.hyracks.control.common.deployment.DeploymentRun;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
-import org.apache.hyracks.control.common.ipc.CCNCFunctions.Function;
-import org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
-import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
 import org.apache.hyracks.control.common.logs.LogFile;
 import org.apache.hyracks.control.common.shutdown.ShutdownRun;
-import org.apache.hyracks.control.common.work.IPCResponder;
-import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.WorkQueue;
-import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IIPCI;
-import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
 import org.ini4j.Ini;
@@ -156,8 +113,6 @@
 
     private final IDatasetDirectoryService datasetDirectoryService;
 
-    private final JobIdFactory jobIdFactory;
-
     private final Map<DeploymentId, DeploymentRun> deploymentRunMap;
 
     private final Map<String, StateDumpRun> stateDumpRunMap;
@@ -175,10 +130,10 @@
         nodeRegistry = new LinkedHashMap<>();
         ipAddressNodeNameMap = new HashMap<>();
         serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
-        IIPCI ccIPCI = new ClusterControllerIPCI();
+        IIPCI ccIPCI = new ClusterControllerIPCI(this);
         clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
                 new CCNCFunctions.SerializerDeserializer());
-        IIPCI ciIPCI = new HyracksClientInterfaceIPCI();
+        IIPCI ciIPCI = new ClientInterfaceIPCI(this);
         clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
                 new JavaSerializationBasedPayloadSerializerDeserializer());
         webServer = new WebServer(this);
@@ -208,7 +163,6 @@
         ccContext = new ClusterControllerContext(topology);
         sweeper = new DeadNodeSweeper();
         datasetDirectoryService = new DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
-        jobIdFactory = new JobIdFactory();
 
         deploymentRunMap = new HashMap<>();
         stateDumpRunMap = new HashMap<>();
@@ -442,291 +396,6 @@
         return datasetDirectoryService;
     }
 
-    private class HyracksClientInterfaceIPCI implements IIPCI {
-
-        @Override
-        public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload,
-                Exception exception) {
-            HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload;
-            switch (fn.getFunctionId()) {
-                case GET_CLUSTER_CONTROLLER_INFO: {
-                    try {
-                        handle.send(mid, info, null);
-                    } catch (IPCException e) {
-                        e.printStackTrace();
-                    }
-                    return;
-                }
-
-                case CREATE_JOB:
-                    break;
-                case GET_JOB_STATUS: {
-                    HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
-                            (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
-                    workQueue.schedule(new GetJobStatusWork(ClusterControllerService.this, gjsf.getJobId(),
-                            new IPCResponder<JobStatus>(handle, mid)));
-                    return;
-                }
-
-                case GET_JOB_INFO: {
-                    HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
-                            (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
-                    workQueue.schedule(new GetJobInfoWork(ClusterControllerService.this, gjsf.getJobId(),
-                            new IPCResponder<JobInfo>(handle, mid)));
-                    return;
-                }
-
-                case START_JOB: {
-                    HyracksClientInterfaceFunctions.StartJobFunction sjf =
-                            (HyracksClientInterfaceFunctions.StartJobFunction) fn;
-                    JobId jobId = jobIdFactory.create();
-                    workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(),
-                            sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
-                    return;
-                }
-
-                case GET_DATASET_DIRECTORY_SERIVICE_INFO: {
-                    workQueue.schedule(new GetDatasetDirectoryServiceInfoWork(ClusterControllerService.this,
-                            new IPCResponder<NetworkAddress>(handle, mid)));
-                    return;
-                }
-
-                case GET_DATASET_RESULT_STATUS: {
-                    HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf =
-                            (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
-                    workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(),
-                            gdrlf.getResultSetId(), new IPCResponder<Status>(handle, mid)));
-                    return;
-                }
-
-                case GET_DATASET_RECORD_DESCRIPTOR:
-                    break;
-                case GET_DATASET_RESULT_LOCATIONS: {
-                    HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
-                            (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
-                    workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this,
-                            gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
-                            new IPCResponder<>(handle, mid)));
-                    return;
-                }
-
-                case WAIT_FOR_COMPLETION: {
-                    HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
-                            (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
-                    workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
-                            new IPCResponder<Object>(handle, mid)));
-                    return;
-                }
-
-                case GET_NODE_CONTROLLERS_INFO: {
-                    workQueue.schedule(new GetNodeControllersInfoWork(ClusterControllerService.this,
-                            new IPCResponder<>(handle, mid)));
-                    return;
-                }
-
-                case GET_CLUSTER_TOPOLOGY: {
-                    try {
-                        handle.send(mid, ccContext.getClusterTopology(), null);
-                    } catch (IPCException e) {
-                        e.printStackTrace();
-                    }
-                    return;
-                }
-
-                case CLI_DEPLOY_BINARY: {
-                    HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
-                            (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
-                    workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(),
-                            dbf.getDeploymentId(), new IPCResponder<>(handle, mid)));
-                    return;
-                }
-
-                case CLI_UNDEPLOY_BINARY: {
-                    HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf =
-                            (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
-                    workQueue.schedule(new CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId(),
-                            new IPCResponder<>(handle, mid)));
-                    return;
-                }
-                case CLUSTER_SHUTDOWN: {
-                    HyracksClientInterfaceFunctions.ClusterShutdownFunction csf =
-                            (HyracksClientInterfaceFunctions.ClusterShutdownFunction) fn;
-                    workQueue.schedule(new ClusterShutdownWork(ClusterControllerService.this,
-                            csf.isTerminateNCService(), new IPCResponder<>(handle, mid)));
-                    return;
-                }
-
-                case GET_NODE_DETAILS_JSON:
-                    HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gndjf =
-                            (HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn;
-                    workQueue.schedule(new GetNodeDetailsJSONWork(ClusterControllerService.this, gndjf.getNodeId(),
-                            gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new IPCResponder<>(handle, mid)));
-                    return;
-
-                case THREAD_DUMP:
-                    HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
-                            (HyracksClientInterfaceFunctions.ThreadDumpFunction) fn;
-                    workQueue.schedule(new GetThreadDumpWork(ClusterControllerService.this, tdf.getNode(),
-                            new IPCResponder<String>(handle, mid)));
-                    return;
-            }
-            try {
-                handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId()));
-            } catch (IPCException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    private class ClusterControllerIPCI implements IIPCI {
-        @Override
-        public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
-                Exception exception) {
-            CCNCFunctions.Function fn = (Function) payload;
-            switch (fn.getFunctionId()) {
-                case REGISTER_NODE: {
-                    CCNCFunctions.RegisterNodeFunction rnf = (CCNCFunctions.RegisterNodeFunction) fn;
-                    workQueue.schedule(new RegisterNodeWork(ClusterControllerService.this, rnf.getNodeRegistration()));
-                    return;
-                }
-
-                case UNREGISTER_NODE: {
-                    CCNCFunctions.UnregisterNodeFunction unf = (CCNCFunctions.UnregisterNodeFunction) fn;
-                    workQueue.schedule(new UnregisterNodeWork(ClusterControllerService.this, unf.getNodeId()));
-                    return;
-                }
-
-                case NODE_HEARTBEAT: {
-                    CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
-                    workQueue.schedule(new NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(),
-                            nhf.getHeartbeatData()));
-                    return;
-                }
-
-                case NOTIFY_JOBLET_CLEANUP: {
-                    CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
-                    workQueue.schedule(new JobletCleanupNotificationWork(ClusterControllerService.this, njcf.getJobId(),
-                            njcf.getNodeId()));
-                    return;
-                }
-
-                case NOTIFY_DEPLOY_BINARY: {
-                    CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn;
-                    workQueue.schedule(new NotifyDeployBinaryWork(ClusterControllerService.this, ndbf.getDeploymentId(),
-                            ndbf.getNodeId(), ndbf.getDeploymentStatus()));
-                    return;
-                }
-
-                case REPORT_PROFILE: {
-                    CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
-                    workQueue.schedule(new ReportProfilesWork(ClusterControllerService.this, rpf.getProfiles()));
-                    return;
-                }
-
-                case NOTIFY_TASK_COMPLETE: {
-                    CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
-                    workQueue.schedule(new TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(),
-                            ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
-                    return;
-                }
-                case NOTIFY_TASK_FAILURE: {
-                    CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
-                    workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(),
-                            ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
-                    return;
-                }
-
-                case REGISTER_PARTITION_PROVIDER: {
-                    CCNCFunctions.RegisterPartitionProviderFunction rppf =
-                            (CCNCFunctions.RegisterPartitionProviderFunction) fn;
-                    workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this,
-                            rppf.getPartitionDescriptor()));
-                    return;
-                }
-
-                case REGISTER_PARTITION_REQUEST: {
-                    CCNCFunctions.RegisterPartitionRequestFunction rprf =
-                            (CCNCFunctions.RegisterPartitionRequestFunction) fn;
-                    workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this,
-                            rprf.getPartitionRequest()));
-                    return;
-                }
-
-                case REGISTER_RESULT_PARTITION_LOCATION: {
-                    CCNCFunctions.RegisterResultPartitionLocationFunction rrplf =
-                            (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
-                    workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this,
-                            rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
-                            rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
-                    return;
-                }
-
-                case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
-                    CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf =
-                            (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
-                    workQueue.schedule(new ReportResultPartitionWriteCompletionWork(ClusterControllerService.this,
-                            rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
-                    return;
-                }
-
-                case REPORT_RESULT_PARTITION_FAILURE: {
-                    CCNCFunctions.ReportResultPartitionFailureFunction rrplf =
-                            (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
-                    workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this,
-                            rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
-                    return;
-                }
-
-                case SEND_APPLICATION_MESSAGE: {
-                    CCNCFunctions.SendApplicationMessageFunction rsf =
-                            (CCNCFunctions.SendApplicationMessageFunction) fn;
-                    workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(),
-                            rsf.getDeploymentId(), rsf.getNodeId()));
-                    return;
-                }
-
-                case GET_NODE_CONTROLLERS_INFO: {
-                    workQueue.schedule(new GetNodeControllersInfoWork(ClusterControllerService.this,
-                            new IResultCallback<Map<String, NodeControllerInfo>>() {
-                                @Override
-                                public void setValue(Map<String, NodeControllerInfo> result) {
-                                    new IPCResponder<CCNCFunctions.GetNodeControllersInfoResponseFunction>(handle, -1)
-                                            .setValue(new CCNCFunctions.GetNodeControllersInfoResponseFunction(result));
-                                }
-
-                                @Override
-                                public void setException(Exception e) {
-                                }
-                            }));
-                    return;
-                }
-
-                case STATE_DUMP_RESPONSE: {
-                    CCNCFunctions.StateDumpResponseFunction dsrf = (StateDumpResponseFunction) fn;
-                    workQueue.schedule(new NotifyStateDumpResponse(ClusterControllerService.this, dsrf.getNodeId(),
-                            dsrf.getStateDumpId(), dsrf.getState()));
-                    return;
-                }
-
-                case SHUTDOWN_RESPONSE: {
-                    CCNCFunctions.ShutdownResponseFunction sdrf = (ShutdownResponseFunction) fn;
-                    workQueue.schedule(new NotifyShutdownWork(ClusterControllerService.this, sdrf.getNodeId()));
-                    return;
-                }
-
-                case THREAD_DUMP_RESPONSE: {
-                    CCNCFunctions.ThreadDumpResponseFunction tdrf =
-                            (CCNCFunctions.ThreadDumpResponseFunction)fn;
-                    workQueue.schedule(new NotifyThreadDumpResponse(ClusterControllerService.this,
-                            tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
-                    return;
-
-                }
-            }
-            LOGGER.warning("Unknown function: " + fn.getFunctionId());
-        }
-    }
-
     public synchronized void addStateDumpRun(String id, StateDumpRun sdr) {
         stateDumpRunMap.put(id, sdr);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 21bf9c2..c6b415b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -116,7 +116,7 @@
 
     private DatasetNetworkManager datasetNetworkManager;
 
-    private final WorkQueue queue;
+    private final WorkQueue workQueue;
 
     private final Timer timer;
 
@@ -179,7 +179,7 @@
                 FullFrameChannelInterfaceFactory.INSTANCE);
 
         lccm = new LifeCycleComponentManager();
-        queue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
+        workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
         jobletMap = new Hashtable<>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
@@ -303,7 +303,7 @@
         }
         appCtx.setDistributedState(nodeParameters.getDistributedState());
 
-        queue.start();
+        workQueue.start();
 
         heartbeatTask = new HeartbeatTask(ccs);
 
@@ -354,7 +354,7 @@
             if (messagingNetManager != null) {
                 messagingNetManager.stop();
             }
-            queue.stop();
+            workQueue.stop();
             if (ncAppEntryPoint != null) {
                 ncAppEntryPoint.stop();
             }
@@ -409,7 +409,7 @@
     }
 
     public WorkQueue getWorkQueue() {
-        return queue;
+        return workQueue;
     }
 
     public ThreadMXBean getThreadMXBean() {
@@ -492,7 +492,7 @@
             try {
                 FutureValue<List<JobProfile>> fv = new FutureValue<>();
                 BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv);
-                queue.scheduleAndSync(bjpw);
+                workQueue.scheduleAndSync(bjpw);
                 List<JobProfile> profiles = fv.get();
                 if (!profiles.isEmpty()) {
                     cc.reportProfile(id, profiles);
@@ -512,30 +512,32 @@
                 case SEND_APPLICATION_MESSAGE:
                     CCNCFunctions.SendApplicationMessageFunction amf =
                             (CCNCFunctions.SendApplicationMessageFunction) fn;
-                    queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
+                    workQueue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
                             amf.getDeploymentId(), amf.getNodeId()));
                     return;
 
                 case START_TASKS:
                     CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
-                    queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), stf.getJobId(),
-                            stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
+                    workQueue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(),
+                            stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(),
+                            stf.getFlags()));
                     return;
 
                 case ABORT_TASKS:
                     CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
-                    queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
+                    workQueue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
                     return;
 
                 case CLEANUP_JOBLET:
                     CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
-                    queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
+                    workQueue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(),
+                            cjf.getStatus()));
                     return;
 
                 case REPORT_PARTITION_AVAILABILITY:
                     CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
                             (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
-                    queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
+                    workQueue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
                             rpaf.getPartitionId(), rpaf.getNetworkAddress()));
                     return;
 
@@ -552,18 +554,18 @@
 
                 case DEPLOY_BINARY:
                     CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn;
-                    queue.schedule(new DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(),
+                    workQueue.schedule(new DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(),
                             dbf.getBinaryURLs()));
                     return;
 
                 case UNDEPLOY_BINARY:
                     CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn;
-                    queue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
+                    workQueue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
                     return;
 
                 case STATE_DUMP_REQUEST:
                     final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn;
-                    queue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
+                    workQueue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
                     return;
 
                 case SHUTDOWN_REQUEST: