Extract IPCIs out of ClusterControllerService
moving the two IPCIs out of cluster controller service is a good
start to cleanup the class. In addition, this change renames queue
to workQueue in NodeControllerService for consistency.
Change-Id: I403e61cc054a860bef6a71fa04393f4d9c368b36
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1325
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index a7c2a36..ef0af26 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -33,13 +33,11 @@
public enum FunctionId {
GET_CLUSTER_CONTROLLER_INFO,
GET_CLUSTER_TOPOLOGY,
- CREATE_JOB,
GET_JOB_STATUS,
GET_JOB_INFO,
START_JOB,
GET_DATASET_DIRECTORY_SERIVICE_INFO,
GET_DATASET_RESULT_STATUS,
- GET_DATASET_RECORD_DESCRIPTOR,
GET_DATASET_RESULT_LOCATIONS,
WAIT_FOR_COMPLETION,
GET_NODE_CONTROLLERS_INFO,
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
new file mode 100644
index 0000000..01c3bf5
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobIdFactory;
+import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
+import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
+import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
+import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
+import org.apache.hyracks.control.cc.work.GetJobInfoWork;
+import org.apache.hyracks.control.cc.work.GetJobStatusWork;
+import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
+import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork;
+import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork;
+import org.apache.hyracks.control.cc.work.GetResultStatusWork;
+import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
+import org.apache.hyracks.control.cc.work.JobStartWork;
+import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
+import org.apache.hyracks.control.common.work.IPCResponder;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.IIPCI;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+
+class ClientInterfaceIPCI implements IIPCI {
+
+ private static final Logger LOGGER = Logger.getLogger(ClientInterfaceIPCI.class.getName());
+ private final ClusterControllerService ccs;
+ private final JobIdFactory jobIdFactory;
+
+ ClientInterfaceIPCI(ClusterControllerService ccs) {
+ this.ccs = ccs;
+ jobIdFactory = new JobIdFactory();
+ }
+
+ @Override
+ public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload,
+ Exception exception) {
+ HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload;
+ switch (fn.getFunctionId()) {
+ case GET_CLUSTER_CONTROLLER_INFO:
+ try {
+ handle.send(mid, ccs.getClusterControllerInfo(), null);
+ } catch (IPCException e) {
+ LOGGER.log(Level.WARNING, "Error sending response to GET_CLUSTER_CONTROLLER_INFO request", e);
+ }
+ break;
+ case GET_JOB_STATUS:
+ HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
+ (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
+ ccs.getWorkQueue().schedule(new GetJobStatusWork(ccs, gjsf.getJobId(),
+ new IPCResponder<JobStatus>(handle, mid)));
+ break;
+ case GET_JOB_INFO:
+ HyracksClientInterfaceFunctions.GetJobInfoFunction gjif =
+ (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
+ ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs, gjif.getJobId(),
+ new IPCResponder<JobInfo>(handle, mid)));
+ break;
+ case START_JOB:
+ HyracksClientInterfaceFunctions.StartJobFunction sjf =
+ (HyracksClientInterfaceFunctions.StartJobFunction) fn;
+ JobId jobId = jobIdFactory.create();
+ ccs.getWorkQueue().schedule(new JobStartWork(ccs, sjf.getDeploymentId(),
+ sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
+ break;
+ case GET_DATASET_DIRECTORY_SERIVICE_INFO:
+ ccs.getWorkQueue().schedule(new GetDatasetDirectoryServiceInfoWork(ccs,
+ new IPCResponder<NetworkAddress>(handle, mid)));
+ break;
+ case GET_DATASET_RESULT_STATUS:
+ HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrsf =
+ (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
+ ccs.getWorkQueue().schedule(new GetResultStatusWork(ccs, gdrsf.getJobId(),
+ gdrsf.getResultSetId(), new IPCResponder<Status>(handle, mid)));
+ break;
+ case GET_DATASET_RESULT_LOCATIONS:
+ HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
+ (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
+ ccs.getWorkQueue().schedule(new GetResultPartitionLocationsWork(ccs,
+ gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
+ new IPCResponder<>(handle, mid)));
+ break;
+ case WAIT_FOR_COMPLETION:
+ HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
+ (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
+ ccs.getWorkQueue().schedule(new WaitForJobCompletionWork(ccs, wfcf.getJobId(),
+ new IPCResponder<>(handle, mid)));
+ break;
+ case GET_NODE_CONTROLLERS_INFO:
+ ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs,
+ new IPCResponder<>(handle, mid)));
+ break;
+ case GET_CLUSTER_TOPOLOGY:
+ try {
+ handle.send(mid, ccs.getCCContext().getClusterTopology(), null);
+ } catch (IPCException e) {
+ LOGGER.log(Level.WARNING, "Error sending response to GET_CLUSTER_TOPOLOGY request", e);
+ }
+ break;
+ case CLI_DEPLOY_BINARY:
+ HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
+ (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
+ ccs.getWorkQueue().schedule(new CliDeployBinaryWork(ccs, dbf.getBinaryURLs(),
+ dbf.getDeploymentId(), new IPCResponder<>(handle, mid)));
+ break;
+ case CLI_UNDEPLOY_BINARY:
+ HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf =
+ (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
+ ccs.getWorkQueue().schedule(new CliUnDeployBinaryWork(ccs, udbf.getDeploymentId(),
+ new IPCResponder<>(handle, mid)));
+ break;
+ case CLUSTER_SHUTDOWN:
+ HyracksClientInterfaceFunctions.ClusterShutdownFunction csf =
+ (HyracksClientInterfaceFunctions.ClusterShutdownFunction) fn;
+ ccs.getWorkQueue().schedule(new ClusterShutdownWork(ccs,
+ csf.isTerminateNCService(), new IPCResponder<>(handle, mid)));
+ break;
+ case GET_NODE_DETAILS_JSON:
+ HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gndjf =
+ (HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn;
+ ccs.getWorkQueue().schedule(new GetNodeDetailsJSONWork(ccs, gndjf.getNodeId(),
+ gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new IPCResponder<>(handle, mid)));
+ break;
+ case THREAD_DUMP:
+ HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
+ (HyracksClientInterfaceFunctions.ThreadDumpFunction) fn;
+ ccs.getWorkQueue().schedule(new GetThreadDumpWork(ccs, tdf.getNode(),
+ new IPCResponder<String>(handle, mid)));
+ break;
+ default:
+ try {
+ handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId()));
+ } catch (IPCException e) {
+ LOGGER.log(Level.WARNING, "Error sending Unknown function response", e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
new file mode 100644
index 0000000..b6c9a08
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.control.cc.work.ApplicationMessageWork;
+import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
+import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
+import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
+import org.apache.hyracks.control.cc.work.NotifyDeployBinaryWork;
+import org.apache.hyracks.control.cc.work.NotifyShutdownWork;
+import org.apache.hyracks.control.cc.work.NotifyStateDumpResponse;
+import org.apache.hyracks.control.cc.work.NotifyThreadDumpResponse;
+import org.apache.hyracks.control.cc.work.RegisterNodeWork;
+import org.apache.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
+import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork;
+import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
+import org.apache.hyracks.control.cc.work.ReportProfilesWork;
+import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork;
+import org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
+import org.apache.hyracks.control.cc.work.TaskCompleteWork;
+import org.apache.hyracks.control.cc.work.TaskFailureWork;
+import org.apache.hyracks.control.cc.work.UnregisterNodeWork;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.Function;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
+import org.apache.hyracks.control.common.work.IPCResponder;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.IIPCI;
+
+class ClusterControllerIPCI implements IIPCI {
+ private static final Logger LOGGER = Logger.getLogger(ClusterControllerIPCI.class.getName());
+ private final ClusterControllerService ccs;
+
+ ClusterControllerIPCI(ClusterControllerService ccs) {
+ this.ccs = ccs;
+ }
+
+ @Override
+ public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
+ Exception exception) {
+ CCNCFunctions.Function fn = (Function) payload;
+ switch (fn.getFunctionId()) {
+ case REGISTER_NODE:
+ CCNCFunctions.RegisterNodeFunction rnf = (CCNCFunctions.RegisterNodeFunction) fn;
+ ccs.getWorkQueue().schedule(new RegisterNodeWork(ccs, rnf.getNodeRegistration()));
+ break;
+ case UNREGISTER_NODE:
+ CCNCFunctions.UnregisterNodeFunction unf = (CCNCFunctions.UnregisterNodeFunction) fn;
+ ccs.getWorkQueue().schedule(new UnregisterNodeWork(ccs, unf.getNodeId()));
+ break;
+ case NODE_HEARTBEAT:
+ CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
+ ccs.getWorkQueue().schedule(new NodeHeartbeatWork(ccs, nhf.getNodeId(),
+ nhf.getHeartbeatData()));
+ break;
+ case NOTIFY_JOBLET_CLEANUP:
+ CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
+ ccs.getWorkQueue().schedule(new JobletCleanupNotificationWork(ccs, njcf.getJobId(),
+ njcf.getNodeId()));
+ break;
+ case NOTIFY_DEPLOY_BINARY:
+ CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn;
+ ccs.getWorkQueue().schedule(new NotifyDeployBinaryWork(ccs, ndbf.getDeploymentId(),
+ ndbf.getNodeId(), ndbf.getDeploymentStatus()));
+ break;
+ case REPORT_PROFILE:
+ CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
+ ccs.getWorkQueue().schedule(new ReportProfilesWork(ccs, rpf.getProfiles()));
+ break;
+ case NOTIFY_TASK_COMPLETE:
+ CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
+ ccs.getWorkQueue().schedule(new TaskCompleteWork(ccs, ntcf.getJobId(),
+ ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
+ break;
+ case NOTIFY_TASK_FAILURE:
+ CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
+ ccs.getWorkQueue().schedule(new TaskFailureWork(ccs, ntff.getJobId(),
+ ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
+ break;
+ case REGISTER_PARTITION_PROVIDER:
+ CCNCFunctions.RegisterPartitionProviderFunction rppf =
+ (CCNCFunctions.RegisterPartitionProviderFunction) fn;
+ ccs.getWorkQueue().schedule(new RegisterPartitionAvailibilityWork(ccs,
+ rppf.getPartitionDescriptor()));
+ break;
+ case REGISTER_PARTITION_REQUEST:
+ CCNCFunctions.RegisterPartitionRequestFunction rprf =
+ (CCNCFunctions.RegisterPartitionRequestFunction) fn;
+ ccs.getWorkQueue().schedule(new RegisterPartitionRequestWork(ccs,
+ rprf.getPartitionRequest()));
+ break;
+ case REGISTER_RESULT_PARTITION_LOCATION:
+ CCNCFunctions.RegisterResultPartitionLocationFunction rrplf =
+ (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
+ ccs.getWorkQueue().schedule(new RegisterResultPartitionLocationWork(ccs,
+ rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
+ rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
+ break;
+ case REPORT_RESULT_PARTITION_WRITE_COMPLETION:
+ CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrpwc =
+ (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
+ ccs.getWorkQueue().schedule(new ReportResultPartitionWriteCompletionWork(ccs,
+ rrpwc.getJobId(), rrpwc.getResultSetId(), rrpwc.getPartition()));
+ break;
+ case REPORT_RESULT_PARTITION_FAILURE:
+ CCNCFunctions.ReportResultPartitionFailureFunction rrpf =
+ (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
+ ccs.getWorkQueue().schedule(new ReportResultPartitionFailureWork(ccs,
+ rrpf.getJobId(), rrpf.getResultSetId(), rrpf.getPartition()));
+ break;
+ case SEND_APPLICATION_MESSAGE:
+ CCNCFunctions.SendApplicationMessageFunction rsf =
+ (CCNCFunctions.SendApplicationMessageFunction) fn;
+ ccs.getWorkQueue().schedule(new ApplicationMessageWork(ccs, rsf.getMessage(),
+ rsf.getDeploymentId(), rsf.getNodeId()));
+ break;
+ case GET_NODE_CONTROLLERS_INFO:
+ ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs,
+ new IResultCallback<Map<String, NodeControllerInfo>>() {
+ @Override
+ public void setValue(Map<String, NodeControllerInfo> result) {
+ new IPCResponder<CCNCFunctions.GetNodeControllersInfoResponseFunction>(handle, -1)
+ .setValue(new CCNCFunctions.GetNodeControllersInfoResponseFunction(result));
+ }
+
+ @Override
+ public void setException(Exception e) {
+ }
+ }));
+ break;
+ case STATE_DUMP_RESPONSE:
+ CCNCFunctions.StateDumpResponseFunction dsrf = (StateDumpResponseFunction) fn;
+ ccs.getWorkQueue().schedule(new NotifyStateDumpResponse(ccs, dsrf.getNodeId(),
+ dsrf.getStateDumpId(), dsrf.getState()));
+ break;
+ case SHUTDOWN_RESPONSE:
+ CCNCFunctions.ShutdownResponseFunction sdrf = (ShutdownResponseFunction) fn;
+ ccs.getWorkQueue().schedule(new NotifyShutdownWork(ccs, sdrf.getNodeId()));
+ break;
+ case THREAD_DUMP_RESPONSE:
+ CCNCFunctions.ThreadDumpResponseFunction tdrf =
+ (CCNCFunctions.ThreadDumpResponseFunction)fn;
+ ccs.getWorkQueue().schedule(new NotifyThreadDumpResponse(ccs,
+ tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
+ break;
+ default:
+ LOGGER.warning("Unknown function: " + fn.getFunctionId());
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index ac38523..c76534b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -39,17 +39,11 @@
import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
import org.apache.hyracks.api.client.ClusterControllerInfo;
-import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
-import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.context.ICCContext;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobIdFactory;
-import org.apache.hyracks.api.job.JobInfo;
-import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.api.topology.ClusterTopology;
import org.apache.hyracks.api.topology.TopologyDefinitionParser;
@@ -58,58 +52,21 @@
import org.apache.hyracks.control.cc.dataset.IDatasetDirectoryService;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.web.WebServer;
-import org.apache.hyracks.control.cc.work.ApplicationMessageWork;
-import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
-import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
-import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
import org.apache.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
-import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import org.apache.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
-import org.apache.hyracks.control.cc.work.GetJobInfoWork;
-import org.apache.hyracks.control.cc.work.GetJobStatusWork;
-import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
-import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
import org.apache.hyracks.control.cc.work.GetThreadDumpWork.ThreadDumpRun;
-import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork;
-import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork;
-import org.apache.hyracks.control.cc.work.GetResultStatusWork;
-import org.apache.hyracks.control.cc.work.JobStartWork;
-import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
-import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
-import org.apache.hyracks.control.cc.work.NotifyDeployBinaryWork;
-import org.apache.hyracks.control.cc.work.NotifyShutdownWork;
-import org.apache.hyracks.control.cc.work.NotifyStateDumpResponse;
-import org.apache.hyracks.control.cc.work.NotifyThreadDumpResponse;
-import org.apache.hyracks.control.cc.work.RegisterNodeWork;
-import org.apache.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
-import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork;
-import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork;
-import org.apache.hyracks.control.cc.work.ReportProfilesWork;
-import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork;
-import org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork;
-import org.apache.hyracks.control.cc.work.TaskCompleteWork;
-import org.apache.hyracks.control.cc.work.TaskFailureWork;
import org.apache.hyracks.control.cc.work.TriggerNCWork;
-import org.apache.hyracks.control.cc.work.UnregisterNodeWork;
-import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
import org.apache.hyracks.control.common.context.ServerContext;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.IniUtils;
import org.apache.hyracks.control.common.deployment.DeploymentRun;
import org.apache.hyracks.control.common.ipc.CCNCFunctions;
-import org.apache.hyracks.control.common.ipc.CCNCFunctions.Function;
-import org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
-import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
import org.apache.hyracks.control.common.logs.LogFile;
import org.apache.hyracks.control.common.shutdown.ShutdownRun;
-import org.apache.hyracks.control.common.work.IPCResponder;
-import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.hyracks.control.common.work.WorkQueue;
-import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.api.IIPCI;
-import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
import org.ini4j.Ini;
@@ -156,8 +113,6 @@
private final IDatasetDirectoryService datasetDirectoryService;
- private final JobIdFactory jobIdFactory;
-
private final Map<DeploymentId, DeploymentRun> deploymentRunMap;
private final Map<String, StateDumpRun> stateDumpRunMap;
@@ -175,10 +130,10 @@
nodeRegistry = new LinkedHashMap<>();
ipAddressNodeNameMap = new HashMap<>();
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
- IIPCI ccIPCI = new ClusterControllerIPCI();
+ IIPCI ccIPCI = new ClusterControllerIPCI(this);
clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
new CCNCFunctions.SerializerDeserializer());
- IIPCI ciIPCI = new HyracksClientInterfaceIPCI();
+ IIPCI ciIPCI = new ClientInterfaceIPCI(this);
clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
new JavaSerializationBasedPayloadSerializerDeserializer());
webServer = new WebServer(this);
@@ -208,7 +163,6 @@
ccContext = new ClusterControllerContext(topology);
sweeper = new DeadNodeSweeper();
datasetDirectoryService = new DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
- jobIdFactory = new JobIdFactory();
deploymentRunMap = new HashMap<>();
stateDumpRunMap = new HashMap<>();
@@ -442,291 +396,6 @@
return datasetDirectoryService;
}
- private class HyracksClientInterfaceIPCI implements IIPCI {
-
- @Override
- public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload,
- Exception exception) {
- HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload;
- switch (fn.getFunctionId()) {
- case GET_CLUSTER_CONTROLLER_INFO: {
- try {
- handle.send(mid, info, null);
- } catch (IPCException e) {
- e.printStackTrace();
- }
- return;
- }
-
- case CREATE_JOB:
- break;
- case GET_JOB_STATUS: {
- HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
- (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
- workQueue.schedule(new GetJobStatusWork(ClusterControllerService.this, gjsf.getJobId(),
- new IPCResponder<JobStatus>(handle, mid)));
- return;
- }
-
- case GET_JOB_INFO: {
- HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
- (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
- workQueue.schedule(new GetJobInfoWork(ClusterControllerService.this, gjsf.getJobId(),
- new IPCResponder<JobInfo>(handle, mid)));
- return;
- }
-
- case START_JOB: {
- HyracksClientInterfaceFunctions.StartJobFunction sjf =
- (HyracksClientInterfaceFunctions.StartJobFunction) fn;
- JobId jobId = jobIdFactory.create();
- workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(),
- sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
- return;
- }
-
- case GET_DATASET_DIRECTORY_SERIVICE_INFO: {
- workQueue.schedule(new GetDatasetDirectoryServiceInfoWork(ClusterControllerService.this,
- new IPCResponder<NetworkAddress>(handle, mid)));
- return;
- }
-
- case GET_DATASET_RESULT_STATUS: {
- HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf =
- (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
- workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(),
- gdrlf.getResultSetId(), new IPCResponder<Status>(handle, mid)));
- return;
- }
-
- case GET_DATASET_RECORD_DESCRIPTOR:
- break;
- case GET_DATASET_RESULT_LOCATIONS: {
- HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
- (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
- workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this,
- gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
- new IPCResponder<>(handle, mid)));
- return;
- }
-
- case WAIT_FOR_COMPLETION: {
- HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
- (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
- workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
- new IPCResponder<Object>(handle, mid)));
- return;
- }
-
- case GET_NODE_CONTROLLERS_INFO: {
- workQueue.schedule(new GetNodeControllersInfoWork(ClusterControllerService.this,
- new IPCResponder<>(handle, mid)));
- return;
- }
-
- case GET_CLUSTER_TOPOLOGY: {
- try {
- handle.send(mid, ccContext.getClusterTopology(), null);
- } catch (IPCException e) {
- e.printStackTrace();
- }
- return;
- }
-
- case CLI_DEPLOY_BINARY: {
- HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
- (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
- workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(),
- dbf.getDeploymentId(), new IPCResponder<>(handle, mid)));
- return;
- }
-
- case CLI_UNDEPLOY_BINARY: {
- HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf =
- (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
- workQueue.schedule(new CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId(),
- new IPCResponder<>(handle, mid)));
- return;
- }
- case CLUSTER_SHUTDOWN: {
- HyracksClientInterfaceFunctions.ClusterShutdownFunction csf =
- (HyracksClientInterfaceFunctions.ClusterShutdownFunction) fn;
- workQueue.schedule(new ClusterShutdownWork(ClusterControllerService.this,
- csf.isTerminateNCService(), new IPCResponder<>(handle, mid)));
- return;
- }
-
- case GET_NODE_DETAILS_JSON:
- HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction gndjf =
- (HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn;
- workQueue.schedule(new GetNodeDetailsJSONWork(ClusterControllerService.this, gndjf.getNodeId(),
- gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new IPCResponder<>(handle, mid)));
- return;
-
- case THREAD_DUMP:
- HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
- (HyracksClientInterfaceFunctions.ThreadDumpFunction) fn;
- workQueue.schedule(new GetThreadDumpWork(ClusterControllerService.this, tdf.getNode(),
- new IPCResponder<String>(handle, mid)));
- return;
- }
- try {
- handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId()));
- } catch (IPCException e) {
- e.printStackTrace();
- }
- }
- }
-
- private class ClusterControllerIPCI implements IIPCI {
- @Override
- public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
- Exception exception) {
- CCNCFunctions.Function fn = (Function) payload;
- switch (fn.getFunctionId()) {
- case REGISTER_NODE: {
- CCNCFunctions.RegisterNodeFunction rnf = (CCNCFunctions.RegisterNodeFunction) fn;
- workQueue.schedule(new RegisterNodeWork(ClusterControllerService.this, rnf.getNodeRegistration()));
- return;
- }
-
- case UNREGISTER_NODE: {
- CCNCFunctions.UnregisterNodeFunction unf = (CCNCFunctions.UnregisterNodeFunction) fn;
- workQueue.schedule(new UnregisterNodeWork(ClusterControllerService.this, unf.getNodeId()));
- return;
- }
-
- case NODE_HEARTBEAT: {
- CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
- workQueue.schedule(new NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(),
- nhf.getHeartbeatData()));
- return;
- }
-
- case NOTIFY_JOBLET_CLEANUP: {
- CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
- workQueue.schedule(new JobletCleanupNotificationWork(ClusterControllerService.this, njcf.getJobId(),
- njcf.getNodeId()));
- return;
- }
-
- case NOTIFY_DEPLOY_BINARY: {
- CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn;
- workQueue.schedule(new NotifyDeployBinaryWork(ClusterControllerService.this, ndbf.getDeploymentId(),
- ndbf.getNodeId(), ndbf.getDeploymentStatus()));
- return;
- }
-
- case REPORT_PROFILE: {
- CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
- workQueue.schedule(new ReportProfilesWork(ClusterControllerService.this, rpf.getProfiles()));
- return;
- }
-
- case NOTIFY_TASK_COMPLETE: {
- CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
- workQueue.schedule(new TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(),
- ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
- return;
- }
- case NOTIFY_TASK_FAILURE: {
- CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
- workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(),
- ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
- return;
- }
-
- case REGISTER_PARTITION_PROVIDER: {
- CCNCFunctions.RegisterPartitionProviderFunction rppf =
- (CCNCFunctions.RegisterPartitionProviderFunction) fn;
- workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this,
- rppf.getPartitionDescriptor()));
- return;
- }
-
- case REGISTER_PARTITION_REQUEST: {
- CCNCFunctions.RegisterPartitionRequestFunction rprf =
- (CCNCFunctions.RegisterPartitionRequestFunction) fn;
- workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this,
- rprf.getPartitionRequest()));
- return;
- }
-
- case REGISTER_RESULT_PARTITION_LOCATION: {
- CCNCFunctions.RegisterResultPartitionLocationFunction rrplf =
- (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
- workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this,
- rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
- rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
- return;
- }
-
- case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
- CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf =
- (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
- workQueue.schedule(new ReportResultPartitionWriteCompletionWork(ClusterControllerService.this,
- rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
- return;
- }
-
- case REPORT_RESULT_PARTITION_FAILURE: {
- CCNCFunctions.ReportResultPartitionFailureFunction rrplf =
- (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
- workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this,
- rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
- return;
- }
-
- case SEND_APPLICATION_MESSAGE: {
- CCNCFunctions.SendApplicationMessageFunction rsf =
- (CCNCFunctions.SendApplicationMessageFunction) fn;
- workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(),
- rsf.getDeploymentId(), rsf.getNodeId()));
- return;
- }
-
- case GET_NODE_CONTROLLERS_INFO: {
- workQueue.schedule(new GetNodeControllersInfoWork(ClusterControllerService.this,
- new IResultCallback<Map<String, NodeControllerInfo>>() {
- @Override
- public void setValue(Map<String, NodeControllerInfo> result) {
- new IPCResponder<CCNCFunctions.GetNodeControllersInfoResponseFunction>(handle, -1)
- .setValue(new CCNCFunctions.GetNodeControllersInfoResponseFunction(result));
- }
-
- @Override
- public void setException(Exception e) {
- }
- }));
- return;
- }
-
- case STATE_DUMP_RESPONSE: {
- CCNCFunctions.StateDumpResponseFunction dsrf = (StateDumpResponseFunction) fn;
- workQueue.schedule(new NotifyStateDumpResponse(ClusterControllerService.this, dsrf.getNodeId(),
- dsrf.getStateDumpId(), dsrf.getState()));
- return;
- }
-
- case SHUTDOWN_RESPONSE: {
- CCNCFunctions.ShutdownResponseFunction sdrf = (ShutdownResponseFunction) fn;
- workQueue.schedule(new NotifyShutdownWork(ClusterControllerService.this, sdrf.getNodeId()));
- return;
- }
-
- case THREAD_DUMP_RESPONSE: {
- CCNCFunctions.ThreadDumpResponseFunction tdrf =
- (CCNCFunctions.ThreadDumpResponseFunction)fn;
- workQueue.schedule(new NotifyThreadDumpResponse(ClusterControllerService.this,
- tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
- return;
-
- }
- }
- LOGGER.warning("Unknown function: " + fn.getFunctionId());
- }
- }
-
public synchronized void addStateDumpRun(String id, StateDumpRun sdr) {
stateDumpRunMap.put(id, sdr);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 21bf9c2..c6b415b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -116,7 +116,7 @@
private DatasetNetworkManager datasetNetworkManager;
- private final WorkQueue queue;
+ private final WorkQueue workQueue;
private final Timer timer;
@@ -179,7 +179,7 @@
FullFrameChannelInterfaceFactory.INSTANCE);
lccm = new LifeCycleComponentManager();
- queue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
+ workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
jobletMap = new Hashtable<>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
@@ -303,7 +303,7 @@
}
appCtx.setDistributedState(nodeParameters.getDistributedState());
- queue.start();
+ workQueue.start();
heartbeatTask = new HeartbeatTask(ccs);
@@ -354,7 +354,7 @@
if (messagingNetManager != null) {
messagingNetManager.stop();
}
- queue.stop();
+ workQueue.stop();
if (ncAppEntryPoint != null) {
ncAppEntryPoint.stop();
}
@@ -409,7 +409,7 @@
}
public WorkQueue getWorkQueue() {
- return queue;
+ return workQueue;
}
public ThreadMXBean getThreadMXBean() {
@@ -492,7 +492,7 @@
try {
FutureValue<List<JobProfile>> fv = new FutureValue<>();
BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv);
- queue.scheduleAndSync(bjpw);
+ workQueue.scheduleAndSync(bjpw);
List<JobProfile> profiles = fv.get();
if (!profiles.isEmpty()) {
cc.reportProfile(id, profiles);
@@ -512,30 +512,32 @@
case SEND_APPLICATION_MESSAGE:
CCNCFunctions.SendApplicationMessageFunction amf =
(CCNCFunctions.SendApplicationMessageFunction) fn;
- queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
+ workQueue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
amf.getDeploymentId(), amf.getNodeId()));
return;
case START_TASKS:
CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
- queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), stf.getJobId(),
- stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
+ workQueue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(),
+ stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(),
+ stf.getFlags()));
return;
case ABORT_TASKS:
CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
- queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
+ workQueue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
return;
case CLEANUP_JOBLET:
CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
- queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
+ workQueue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(),
+ cjf.getStatus()));
return;
case REPORT_PARTITION_AVAILABILITY:
CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
(CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
- queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
+ workQueue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
rpaf.getPartitionId(), rpaf.getNetworkAddress()));
return;
@@ -552,18 +554,18 @@
case DEPLOY_BINARY:
CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn;
- queue.schedule(new DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(),
+ workQueue.schedule(new DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(),
dbf.getBinaryURLs()));
return;
case UNDEPLOY_BINARY:
CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn;
- queue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
+ workQueue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
return;
case STATE_DUMP_REQUEST:
final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn;
- queue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
+ workQueue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
return;
case SHUTDOWN_REQUEST: