Extract NodeControllerIPCI out of NodeControllerService
Change-Id: I9e7f160dfa7418cd29693a990fa17d80a14e5841
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1324
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
new file mode 100644
index 0000000..93ccaa4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc;
+
+import org.apache.hyracks.control.common.ipc.CCNCFunctions;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
+import org.apache.hyracks.control.nc.task.ShutdownTask;
+import org.apache.hyracks.control.nc.task.ThreadDumpTask;
+import org.apache.hyracks.control.nc.work.AbortTasksWork;
+import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
+import org.apache.hyracks.control.nc.work.CleanupJobletWork;
+import org.apache.hyracks.control.nc.work.DeployBinaryWork;
+import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
+import org.apache.hyracks.control.nc.work.StartTasksWork;
+import org.apache.hyracks.control.nc.work.StateDumpWork;
+import org.apache.hyracks.control.nc.work.UnDeployBinaryWork;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.IIPCI;
+
+/**
+ * Interprocess communication in a node controller
+ * This class must be refactored with each function carrying its own implementation
+ */
+final class NodeControllerIPCI implements IIPCI {
+ private final NodeControllerService ncs;
+
+ /**
+ * @param nodeControllerService
+ */
+ NodeControllerIPCI(NodeControllerService nodeControllerService) {
+ ncs = nodeControllerService;
+ }
+
+ @Override
+ public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
+ Exception exception) {
+ CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
+ switch (fn.getFunctionId()) {
+ case SEND_APPLICATION_MESSAGE:
+ CCNCFunctions.SendApplicationMessageFunction amf =
+ (CCNCFunctions.SendApplicationMessageFunction) fn;
+ ncs.getWorkQueue().schedule(new ApplicationMessageWork(ncs, amf.getMessage(),
+ amf.getDeploymentId(), amf.getNodeId()));
+ return;
+ case START_TASKS:
+ CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
+ ncs.getWorkQueue().schedule(new StartTasksWork(ncs, stf.getDeploymentId(), stf.getJobId(),
+ stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
+ return;
+ case ABORT_TASKS:
+ CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
+ ncs.getWorkQueue().schedule(new AbortTasksWork(ncs, atf.getJobId(), atf.getTasks()));
+ return;
+ case CLEANUP_JOBLET:
+ CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
+ ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, cjf.getJobId(), cjf.getStatus()));
+ return;
+ case REPORT_PARTITION_AVAILABILITY:
+ CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
+ (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
+ ncs.getWorkQueue().schedule(new ReportPartitionAvailabilityWork(ncs,
+ rpaf.getPartitionId(), rpaf.getNetworkAddress()));
+ return;
+ case NODE_REGISTRATION_RESULT:
+ CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
+ ncs.setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
+ return;
+
+ case GET_NODE_CONTROLLERS_INFO_RESPONSE:
+ CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf =
+ (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
+ ncs.setNodeControllersInfo(gncirf.getNodeControllerInfos());
+ return;
+
+ case DEPLOY_BINARY:
+ CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn;
+ ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(),
+ dbf.getBinaryURLs()));
+ return;
+
+ case UNDEPLOY_BINARY:
+ CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn;
+ ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId()));
+ return;
+
+ case STATE_DUMP_REQUEST:
+ final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn;
+ ncs.getWorkQueue().schedule(new StateDumpWork(ncs, dsrf.getStateDumpId()));
+ return;
+
+ case SHUTDOWN_REQUEST:
+ final CCNCFunctions.ShutdownRequestFunction sdrf = (CCNCFunctions.ShutdownRequestFunction) fn;
+ ncs.getExecutorService().submit(new ShutdownTask(ncs, sdrf.isTerminateNCService()));
+ return;
+
+ case THREAD_DUMP_REQUEST:
+ final CCNCFunctions.ThreadDumpRequestFunction tdrf = (CCNCFunctions.ThreadDumpRequestFunction) fn;
+ ncs.getExecutorService().submit(new ThreadDumpTask(ncs, tdrf.getRequestId()));
+ return;
+
+ default:
+ throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index c6b415b..598110a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -61,7 +61,6 @@
import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
import org.apache.hyracks.control.common.ipc.CCNCFunctions;
-import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
import org.apache.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.utils.PidHelper;
@@ -77,19 +76,8 @@
import org.apache.hyracks.control.nc.net.NetworkManager;
import org.apache.hyracks.control.nc.partitions.PartitionManager;
import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
-import org.apache.hyracks.control.nc.task.ShutdownTask;
-import org.apache.hyracks.control.nc.task.ThreadDumpTask;
-import org.apache.hyracks.control.nc.work.AbortTasksWork;
-import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
-import org.apache.hyracks.control.nc.work.CleanupJobletWork;
-import org.apache.hyracks.control.nc.work.DeployBinaryWork;
-import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
-import org.apache.hyracks.control.nc.work.StartTasksWork;
-import org.apache.hyracks.control.nc.work.StateDumpWork;
-import org.apache.hyracks.control.nc.work.UnDeployBinaryWork;
import org.apache.hyracks.ipc.api.IIPCHandle;
-import org.apache.hyracks.ipc.api.IIPCI;
import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
@@ -165,8 +153,8 @@
public NodeControllerService(NCConfig ncConfig) throws Exception {
this.ncConfig = ncConfig;
id = ncConfig.nodeId;
- NodeControllerIPCI ipci = new NodeControllerIPCI();
- ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort), ipci,
+ ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort),
+ new NodeControllerIPCI(this),
new CCNCFunctions.SerializerDeserializer());
ioManager = new IOManager(getDevices(ncConfig.ioDevices));
@@ -217,7 +205,7 @@
return devices;
}
- private synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
+ synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
this.nodeParameters = parameters;
this.registrationException = exception;
this.registrationPending = false;
@@ -236,7 +224,7 @@
return fv.get();
}
- private void setNodeControllersInfo(Map<String, NodeControllerInfo> ncInfos) {
+ void setNodeControllersInfo(Map<String, NodeControllerInfo> ncInfos) {
FutureValue<Map<String, NodeControllerInfo>> fv;
synchronized (getNodeControllerInfosAcceptor) {
fv = getNodeControllerInfosAcceptor.getValue();
@@ -503,88 +491,6 @@
}
}
- private final class NodeControllerIPCI implements IIPCI {
- @Override
- public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
- Exception exception) {
- CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
- switch (fn.getFunctionId()) {
- case SEND_APPLICATION_MESSAGE:
- CCNCFunctions.SendApplicationMessageFunction amf =
- (CCNCFunctions.SendApplicationMessageFunction) fn;
- workQueue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
- amf.getDeploymentId(), amf.getNodeId()));
- return;
-
- case START_TASKS:
- CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
- workQueue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(),
- stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(),
- stf.getFlags()));
- return;
-
- case ABORT_TASKS:
- CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
- workQueue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
- return;
-
- case CLEANUP_JOBLET:
- CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
- workQueue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(),
- cjf.getStatus()));
- return;
-
- case REPORT_PARTITION_AVAILABILITY:
- CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
- (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
- workQueue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
- rpaf.getPartitionId(), rpaf.getNetworkAddress()));
- return;
-
- case NODE_REGISTRATION_RESULT:
- CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
- setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
- return;
-
- case GET_NODE_CONTROLLERS_INFO_RESPONSE:
- CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf =
- (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
- setNodeControllersInfo(gncirf.getNodeControllerInfos());
- return;
-
- case DEPLOY_BINARY:
- CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn;
- workQueue.schedule(new DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(),
- dbf.getBinaryURLs()));
- return;
-
- case UNDEPLOY_BINARY:
- CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn;
- workQueue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
- return;
-
- case STATE_DUMP_REQUEST:
- final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn;
- workQueue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
- return;
-
- case SHUTDOWN_REQUEST:
- final CCNCFunctions.ShutdownRequestFunction sdrf = (CCNCFunctions.ShutdownRequestFunction) fn;
- executor.submit(new ShutdownTask(NodeControllerService.this, sdrf.isTerminateNCService()));
- return;
-
- case THREAD_DUMP_REQUEST:
- final CCNCFunctions.ThreadDumpRequestFunction tdrf = (CCNCFunctions.ThreadDumpRequestFunction) fn;
- executor.submit(new ThreadDumpTask(NodeControllerService.this, tdrf.getRequestId()));
- return;
-
- default:
- throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
- }
-
- }
- }
-
public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId) throws Exception {
ccs.sendApplicationMessageToCC(data, deploymentId, id);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java
index bfc46df..02698fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java
@@ -18,6 +18,24 @@
*/
package org.apache.hyracks.ipc.api;
+/**
+ * The interprocess communication interface that handles communication between different processes across the cluster
+ */
+@FunctionalInterface
public interface IIPCI {
- public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception);
+
+ /**
+ * handles the incoming message
+ * @param handle
+ * the message IPC handle
+ * @param mid
+ * the message id
+ * @param rmid
+ * the request message id (if the message is a response to a request)
+ * @param payload
+ * the message payload
+ * @param exception
+ * an exception if the message was an error message
+ */
+ void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception);
}