[NO ISSUE][OTH] Add CC/NC Ping Function
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add ping function that can be used to ask a node
to ping the CC.
Change-Id: I676e523dccbf94d1e5af4ea408e026af260c9b06
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2831
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 84cb4bd..7e5d22c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -161,6 +161,10 @@
ccs.getWorkQueue()
.schedule(new NotifyThreadDumpResponse(ccs, tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
break;
+ case PING_RESPONSE:
+ CCNCFunctions.PingResponseFunction prf = (CCNCFunctions.PingResponseFunction) fn;
+ LOGGER.debug("Received ping response from node {}", prf.getNodeId());
+ break;
default:
LOGGER.warn("Unknown function: " + fn.getFunctionId());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index fc0154e..fbaff55 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -72,4 +72,6 @@
void getNodeControllerInfos() throws Exception;
void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception;
+
+ void notifyPingResponse(String nodeId) throws Exception;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index fa835f4..d7941f2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -84,4 +84,13 @@
* @throws IPCException
*/
void sendRegistrationResult(NodeParameters parameters, Exception regFailure) throws IPCException;
+
+ /**
+ * Sends a request to this {@link INodeController} to ping the
+ * cluster controller with id {@code ccId}
+ *
+ * @param ccId
+ * @throws IPCException
+ */
+ void ping(CcId ccId) throws IPCException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index ce4578d..2522ebe 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -113,6 +113,9 @@
THREAD_DUMP_REQUEST,
THREAD_DUMP_RESPONSE,
+ PING_REQUEST,
+ PING_RESPONSE,
+
OTHER
}
@@ -1316,6 +1319,19 @@
}
}
+ public static class PingFunction extends CCIdentifiedFunction {
+ private static final long serialVersionUID = 1L;
+
+ public PingFunction(CcId ccId) {
+ super(ccId);
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.PING_REQUEST;
+ }
+ }
+
public static class ShutdownRequestFunction extends CCIdentifiedFunction {
private static final long serialVersionUID = 1L;
@@ -1355,6 +1371,25 @@
}
}
+ public static class PingResponseFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+
+ public PingResponseFunction(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.PING_RESPONSE;
+ }
+ }
+
public static class SerializerDeserializer implements IPayloadSerializerDeserializer {
private final JavaSerializationBasedPayloadSerializerDeserializer javaSerde;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 8e2ec22..06904c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -175,6 +175,12 @@
}
@Override
+ public void notifyPingResponse(String nodeId) throws Exception {
+ CCNCFunctions.PingResponseFunction fn = new CCNCFunctions.PingResponseFunction(nodeId);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
public String toString() {
return getClass().getSimpleName() + " [" + ipcHandle.getRemoteAddress() + "]";
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 8242bdc..dd10020 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -148,6 +148,11 @@
ipcHandle.send(-1, new CCNCFunctions.NodeRegistrationResult(parameters, regFailure), null);
}
+ @Override
+ public void ping(CcId ccId) throws IPCException {
+ ipcHandle.send(-1, new CCNCFunctions.PingFunction(ccId), null);
+ }
+
public InetSocketAddress getAddress() {
return ipcHandle.getRemoteAddress();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 08cd5d8..cdc16fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.control.nc.work.CleanupJobletWork;
import org.apache.hyracks.control.nc.work.DeployBinaryWork;
import org.apache.hyracks.control.nc.work.DeployJobSpecWork;
+import org.apache.hyracks.control.nc.task.PingTask;
import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
import org.apache.hyracks.control.nc.work.StartTasksWork;
import org.apache.hyracks.control.nc.work.StateDumpWork;
@@ -133,6 +134,11 @@
ncs.getExecutor().submit(new ThreadDumpTask(ncs, tdrf.getRequestId(), tdrf.getCcId()));
return;
+ case PING_REQUEST:
+ final CCNCFunctions.PingFunction pcf = (CCNCFunctions.PingFunction) fn;
+ ncs.getExecutor().submit(new PingTask(ncs, pcf.getCcId()));
+ return;
+
default:
throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/PingTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/PingTask.java
new file mode 100644
index 0000000..15c62bd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/PingTask.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.task;
+
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class PingTask implements Runnable {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final NodeControllerService ncs;
+ private final CcId ccId;
+
+ public PingTask(NodeControllerService ncs, CcId ccId) {
+ this.ncs = ncs;
+ this.ccId = ccId;
+ }
+
+ @Override
+ public void run() {
+ try {
+ ncs.getClusterController(ccId).notifyPingResponse(ncs.getId());
+ } catch (Exception e) {
+ LOGGER.info("failed to respond to ping from cc {}", ccId, e);
+ }
+ }
+}
\ No newline at end of file