Extract NodeControllerIPCI out of NodeControllerService

Change-Id: I9e7f160dfa7418cd29693a990fa17d80a14e5841
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1324
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
new file mode 100644
index 0000000..93ccaa4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc;
+
+import org.apache.hyracks.control.common.ipc.CCNCFunctions;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
+import org.apache.hyracks.control.nc.task.ShutdownTask;
+import org.apache.hyracks.control.nc.task.ThreadDumpTask;
+import org.apache.hyracks.control.nc.work.AbortTasksWork;
+import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
+import org.apache.hyracks.control.nc.work.CleanupJobletWork;
+import org.apache.hyracks.control.nc.work.DeployBinaryWork;
+import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
+import org.apache.hyracks.control.nc.work.StartTasksWork;
+import org.apache.hyracks.control.nc.work.StateDumpWork;
+import org.apache.hyracks.control.nc.work.UnDeployBinaryWork;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.IIPCI;
+
+/**
+ * Interprocess communication in a node controller
+ * This class must be refactored with each function carrying its own implementation
+ */
+final class NodeControllerIPCI implements IIPCI {
+    private final NodeControllerService ncs;
+
+    /**
+     * @param nodeControllerService
+     */
+    NodeControllerIPCI(NodeControllerService nodeControllerService) {
+        ncs = nodeControllerService;
+    }
+
+    @Override
+    public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
+            Exception exception) {
+        CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
+        switch (fn.getFunctionId()) {
+            case SEND_APPLICATION_MESSAGE:
+                CCNCFunctions.SendApplicationMessageFunction amf =
+                        (CCNCFunctions.SendApplicationMessageFunction) fn;
+                ncs.getWorkQueue().schedule(new ApplicationMessageWork(ncs, amf.getMessage(),
+                        amf.getDeploymentId(), amf.getNodeId()));
+                return;
+            case START_TASKS:
+                CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
+                ncs.getWorkQueue().schedule(new StartTasksWork(ncs, stf.getDeploymentId(), stf.getJobId(),
+                        stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
+                return;
+            case ABORT_TASKS:
+                CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
+                ncs.getWorkQueue().schedule(new AbortTasksWork(ncs, atf.getJobId(), atf.getTasks()));
+                return;
+            case CLEANUP_JOBLET:
+                CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
+                ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, cjf.getJobId(), cjf.getStatus()));
+                return;
+            case REPORT_PARTITION_AVAILABILITY:
+                CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
+                        (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
+                ncs.getWorkQueue().schedule(new ReportPartitionAvailabilityWork(ncs,
+                        rpaf.getPartitionId(), rpaf.getNetworkAddress()));
+                return;
+            case NODE_REGISTRATION_RESULT:
+                CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
+                ncs.setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
+                return;
+
+            case GET_NODE_CONTROLLERS_INFO_RESPONSE:
+                CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf =
+                        (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
+                ncs.setNodeControllersInfo(gncirf.getNodeControllerInfos());
+                return;
+
+            case DEPLOY_BINARY:
+                CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn;
+                ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(),
+                        dbf.getBinaryURLs()));
+                return;
+
+            case UNDEPLOY_BINARY:
+                CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn;
+                ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId()));
+                return;
+
+            case STATE_DUMP_REQUEST:
+                final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn;
+                ncs.getWorkQueue().schedule(new StateDumpWork(ncs, dsrf.getStateDumpId()));
+                return;
+
+            case SHUTDOWN_REQUEST:
+                final CCNCFunctions.ShutdownRequestFunction sdrf = (CCNCFunctions.ShutdownRequestFunction) fn;
+                ncs.getExecutorService().submit(new ShutdownTask(ncs, sdrf.isTerminateNCService()));
+                return;
+
+            case THREAD_DUMP_REQUEST:
+                final CCNCFunctions.ThreadDumpRequestFunction tdrf = (CCNCFunctions.ThreadDumpRequestFunction) fn;
+                ncs.getExecutorService().submit(new ThreadDumpTask(ncs, tdrf.getRequestId()));
+                return;
+
+            default:
+                throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index c6b415b..598110a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -61,7 +61,6 @@
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
-import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
 import org.apache.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.utils.PidHelper;
@@ -77,19 +76,8 @@
 import org.apache.hyracks.control.nc.net.NetworkManager;
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
-import org.apache.hyracks.control.nc.task.ShutdownTask;
-import org.apache.hyracks.control.nc.task.ThreadDumpTask;
-import org.apache.hyracks.control.nc.work.AbortTasksWork;
-import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
 import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
-import org.apache.hyracks.control.nc.work.CleanupJobletWork;
-import org.apache.hyracks.control.nc.work.DeployBinaryWork;
-import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
-import org.apache.hyracks.control.nc.work.StartTasksWork;
-import org.apache.hyracks.control.nc.work.StateDumpWork;
-import org.apache.hyracks.control.nc.work.UnDeployBinaryWork;
 import org.apache.hyracks.ipc.api.IIPCHandle;
-import org.apache.hyracks.ipc.api.IIPCI;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
@@ -165,8 +153,8 @@
     public NodeControllerService(NCConfig ncConfig) throws Exception {
         this.ncConfig = ncConfig;
         id = ncConfig.nodeId;
-        NodeControllerIPCI ipci = new NodeControllerIPCI();
-        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort), ipci,
+        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort),
+                new NodeControllerIPCI(this),
                 new CCNCFunctions.SerializerDeserializer());
 
         ioManager = new IOManager(getDevices(ncConfig.ioDevices));
@@ -217,7 +205,7 @@
         return devices;
     }
 
-    private synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
+    synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
         this.nodeParameters = parameters;
         this.registrationException = exception;
         this.registrationPending = false;
@@ -236,7 +224,7 @@
         return fv.get();
     }
 
-    private void setNodeControllersInfo(Map<String, NodeControllerInfo> ncInfos) {
+    void setNodeControllersInfo(Map<String, NodeControllerInfo> ncInfos) {
         FutureValue<Map<String, NodeControllerInfo>> fv;
         synchronized (getNodeControllerInfosAcceptor) {
             fv = getNodeControllerInfosAcceptor.getValue();
@@ -503,88 +491,6 @@
         }
     }
 
-    private final class NodeControllerIPCI implements IIPCI {
-        @Override
-        public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
-                Exception exception) {
-            CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
-            switch (fn.getFunctionId()) {
-                case SEND_APPLICATION_MESSAGE:
-                    CCNCFunctions.SendApplicationMessageFunction amf =
-                            (CCNCFunctions.SendApplicationMessageFunction) fn;
-                    workQueue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
-                            amf.getDeploymentId(), amf.getNodeId()));
-                    return;
-
-                case START_TASKS:
-                    CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
-                    workQueue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(),
-                            stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(),
-                            stf.getFlags()));
-                    return;
-
-                case ABORT_TASKS:
-                    CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
-                    workQueue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
-                    return;
-
-                case CLEANUP_JOBLET:
-                    CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
-                    workQueue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(),
-                            cjf.getStatus()));
-                    return;
-
-                case REPORT_PARTITION_AVAILABILITY:
-                    CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
-                            (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
-                    workQueue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
-                            rpaf.getPartitionId(), rpaf.getNetworkAddress()));
-                    return;
-
-                case NODE_REGISTRATION_RESULT:
-                    CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
-                    setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
-                    return;
-
-                case GET_NODE_CONTROLLERS_INFO_RESPONSE:
-                    CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf =
-                            (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
-                    setNodeControllersInfo(gncirf.getNodeControllerInfos());
-                    return;
-
-                case DEPLOY_BINARY:
-                    CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn;
-                    workQueue.schedule(new DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(),
-                            dbf.getBinaryURLs()));
-                    return;
-
-                case UNDEPLOY_BINARY:
-                    CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn;
-                    workQueue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
-                    return;
-
-                case STATE_DUMP_REQUEST:
-                    final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn;
-                    workQueue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
-                    return;
-
-                case SHUTDOWN_REQUEST:
-                    final CCNCFunctions.ShutdownRequestFunction sdrf = (CCNCFunctions.ShutdownRequestFunction) fn;
-                    executor.submit(new ShutdownTask(NodeControllerService.this, sdrf.isTerminateNCService()));
-                    return;
-
-                case THREAD_DUMP_REQUEST:
-                    final CCNCFunctions.ThreadDumpRequestFunction tdrf = (CCNCFunctions.ThreadDumpRequestFunction) fn;
-                    executor.submit(new ThreadDumpTask(NodeControllerService.this, tdrf.getRequestId()));
-                    return;
-
-                default:
-                    throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
-            }
-
-        }
-    }
-
     public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId) throws Exception {
         ccs.sendApplicationMessageToCC(data, deploymentId, id);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java
index bfc46df..02698fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java
@@ -18,6 +18,24 @@
  */
 package org.apache.hyracks.ipc.api;
 
+/**
+ * The interprocess communication interface that handles communication between different processes across the cluster
+ */
+@FunctionalInterface
 public interface IIPCI {
-    public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception);
+
+    /**
+     * handles the incoming message
+     * @param handle
+     *            the message IPC handle
+     * @param mid
+     *            the message id
+     * @param rmid
+     *            the request message id (if the message is a response to a request)
+     * @param payload
+     *            the message payload
+     * @param exception
+     *            an exception if the message was an error message
+     */
+    void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception);
 }