[NO ISSUE][OTH] Add CC/NC Ping Function

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Add ping function that can be used to ask a node
  to ping the CC.

Change-Id: I676e523dccbf94d1e5af4ea408e026af260c9b06
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2831
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 84cb4bd..7e5d22c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -161,6 +161,10 @@
                 ccs.getWorkQueue()
                         .schedule(new NotifyThreadDumpResponse(ccs, tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
                 break;
+            case PING_RESPONSE:
+                CCNCFunctions.PingResponseFunction prf = (CCNCFunctions.PingResponseFunction) fn;
+                LOGGER.debug("Received ping response from node {}", prf.getNodeId());
+                break;
             default:
                 LOGGER.warn("Unknown function: " + fn.getFunctionId());
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index fc0154e..fbaff55 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -72,4 +72,6 @@
     void getNodeControllerInfos() throws Exception;
 
     void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception;
+
+    void notifyPingResponse(String nodeId) throws Exception;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index fa835f4..d7941f2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -84,4 +84,13 @@
      * @throws IPCException
      */
     void sendRegistrationResult(NodeParameters parameters, Exception regFailure) throws IPCException;
+
+    /**
+     * Sends a request to this {@link INodeController} to ping the
+     * cluster controller with id {@code ccId}
+     *
+     * @param ccId
+     * @throws IPCException
+     */
+    void ping(CcId ccId) throws IPCException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index ce4578d..2522ebe 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -113,6 +113,9 @@
         THREAD_DUMP_REQUEST,
         THREAD_DUMP_RESPONSE,
 
+        PING_REQUEST,
+        PING_RESPONSE,
+
         OTHER
     }
 
@@ -1316,6 +1319,19 @@
         }
     }
 
+    public static class PingFunction extends CCIdentifiedFunction {
+        private static final long serialVersionUID = 1L;
+
+        public PingFunction(CcId ccId) {
+            super(ccId);
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.PING_REQUEST;
+        }
+    }
+
     public static class ShutdownRequestFunction extends CCIdentifiedFunction {
         private static final long serialVersionUID = 1L;
 
@@ -1355,6 +1371,25 @@
         }
     }
 
+    public static class PingResponseFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+
+        public PingResponseFunction(String nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.PING_RESPONSE;
+        }
+    }
+
     public static class SerializerDeserializer implements IPayloadSerializerDeserializer {
         private final JavaSerializationBasedPayloadSerializerDeserializer javaSerde;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 8e2ec22..06904c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -175,6 +175,12 @@
     }
 
     @Override
+    public void notifyPingResponse(String nodeId) throws Exception {
+        CCNCFunctions.PingResponseFunction fn = new CCNCFunctions.PingResponseFunction(nodeId);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
     public String toString() {
         return getClass().getSimpleName() + " [" + ipcHandle.getRemoteAddress() + "]";
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 8242bdc..dd10020 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -148,6 +148,11 @@
         ipcHandle.send(-1, new CCNCFunctions.NodeRegistrationResult(parameters, regFailure), null);
     }
 
+    @Override
+    public void ping(CcId ccId) throws IPCException {
+        ipcHandle.send(-1, new CCNCFunctions.PingFunction(ccId), null);
+    }
+
     public InetSocketAddress getAddress() {
         return ipcHandle.getRemoteAddress();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 08cd5d8..cdc16fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.control.nc.work.CleanupJobletWork;
 import org.apache.hyracks.control.nc.work.DeployBinaryWork;
 import org.apache.hyracks.control.nc.work.DeployJobSpecWork;
+import org.apache.hyracks.control.nc.task.PingTask;
 import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 import org.apache.hyracks.control.nc.work.StartTasksWork;
 import org.apache.hyracks.control.nc.work.StateDumpWork;
@@ -133,6 +134,11 @@
                 ncs.getExecutor().submit(new ThreadDumpTask(ncs, tdrf.getRequestId(), tdrf.getCcId()));
                 return;
 
+            case PING_REQUEST:
+                final CCNCFunctions.PingFunction pcf = (CCNCFunctions.PingFunction) fn;
+                ncs.getExecutor().submit(new PingTask(ncs, pcf.getCcId()));
+                return;
+
             default:
                 throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/PingTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/PingTask.java
new file mode 100644
index 0000000..15c62bd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/PingTask.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.task;
+
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class PingTask implements Runnable {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final NodeControllerService ncs;
+    private final CcId ccId;
+
+    public PingTask(NodeControllerService ncs, CcId ccId) {
+        this.ncs = ncs;
+        this.ccId = ccId;
+    }
+
+    @Override
+    public void run() {
+        try {
+            ncs.getClusterController(ccId).notifyPingResponse(ncs.getId());
+        } catch (Exception e) {
+            LOGGER.info("failed to respond to ping from cc {}", ccId, e);
+        }
+    }
+}
\ No newline at end of file