[NO ISSUE] Refactor IPC reconnect logic to be usable by all IPC connections
Change-Id: I2430510b22f936b89879df98322ef51ec87c6da6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2284
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 8a8342e..83a40f0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -45,7 +45,6 @@
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.ipc.exceptions.IPCException;
@@ -154,7 +153,7 @@
@Override
protected void handleExecuteStatementException(Throwable t, RequestExecutionState execution) {
if (t instanceof TimeoutException
- || (t instanceof HyracksDataException && ExceptionUtils.getRootCause(t) instanceof IPCException)) {
+ || ExceptionUtils.matchingCause(t, candidate -> candidate instanceof IPCException)) {
GlobalConfig.ASTERIX_LOGGER.log(Level.WARN, t.toString(), t);
execution.setStatus(ResultStatus.FAILED, HttpResponseStatus.SERVICE_UNAVAILABLE);
} else {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
index 3105b3f..256ce08 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.exceptions;
+import java.util.function.Predicate;
+
public class ExceptionUtils {
public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
public static final String PARAMETER_NAME = "Parameter name: ";
@@ -49,4 +51,21 @@
}
return current;
}
+
+ /**
+ * Determines whether supplied exception contains a matching cause in its hierarchy, or is itself a match
+ */
+ public static boolean matchingCause(Throwable e, Predicate<Throwable> test) {
+ Throwable current = e;
+ Throwable cause = e.getCause();
+ while (cause != null && cause != current) {
+ if (test.test(cause)) {
+ return true;
+ }
+ Throwable nextCause = current.getCause();
+ current = cause;
+ cause = nextCause;
+ }
+ return test.test(e);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 85ef927..80b61f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -83,7 +83,8 @@
RPCInterface rpci = new RPCInterface();
ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
ipc.start();
- hci = new HyracksClientInterfaceRemoteProxy(ipc.getHandle(new InetSocketAddress(ccHost, ccPort)), rpci);
+ hci = new HyracksClientInterfaceRemoteProxy(ipc.getReconnectingHandle(new InetSocketAddress(ccHost, ccPort)),
+ rpci);
ccInfo = hci.getClusterControllerInfo();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
index 075747d..63139d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -40,7 +40,7 @@
RPCInterface rpci = new RPCInterface();
ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
ipc.start();
- IIPCHandle ddsIpchandle = ipc.getHandle(new InetSocketAddress(ddsHost, ddsPort));
+ IIPCHandle ddsIpchandle = ipc.getReconnectingHandle(new InetSocketAddress(ddsHost, ddsPort));
this.ddsi = new HyracksDatasetDirectoryServiceInterfaceRemoteProxy(ddsIpchandle, rpci);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 590a0f3..712d2ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -101,6 +101,7 @@
removeDeadNode(nodeId);
} else {
try {
+ // TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress());
ncIPCHandle.send(-1, new AbortCCJobsFunction(), null);
} catch (IPCException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 07b0f04..3a38287 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -50,13 +50,15 @@
@Override
protected void doRun() throws Exception {
String id = reg.getNodeId();
+ // TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
CCNCFunctions.NodeRegistrationResult result;
Map<IOption, Object> ncConfiguration = new HashMap<>();
try {
LOGGER.log(Level.WARN, "Registering INodeController: id = " + id);
NodeControllerRemoteProxy nc =
- new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress());
+ new NodeControllerRemoteProxy(
+ ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
NodeControllerState state = new NodeControllerState(nc, reg);
INodeManager nodeManager = ccs.getNodeManager();
nodeManager.addNode(id, state);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 447d678..f2e7d87 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,7 +18,6 @@
*/
package org.apache.hyracks.control.common.ipc;
-import java.net.InetSocketAddress;
import java.util.List;
import org.apache.hyracks.api.comm.NetworkAddress;
@@ -53,42 +52,26 @@
import org.apache.hyracks.control.common.job.PartitionRequest;
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
-import org.apache.hyracks.ipc.impl.IPCSystem;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.hyracks.ipc.api.IIPCHandle;
-public class ClusterControllerRemoteProxy extends ControllerRemoteProxy implements IClusterController {
- private static final Logger LOGGER = LogManager.getLogger();
+public class ClusterControllerRemoteProxy implements IClusterController {
- private final int clusterConnectRetries;
+ private IIPCHandle ipcHandle;
- public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress, int clusterConnectRetries,
- IControllerRemoteProxyIPCEventListener eventListener) {
- super(ipc, inetSocketAddress, eventListener);
- this.clusterConnectRetries = clusterConnectRetries;
- }
-
- @Override
- protected int getMaxRetries(boolean first) {
- // -1 == retry forever
- return first ? clusterConnectRetries : 0;
- }
-
- @Override
- protected Logger getLogger() {
- return LOGGER;
+ public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
+ this.ipcHandle = ipcHandle;
}
@Override
public void registerNode(NodeRegistration reg) throws Exception {
RegisterNodeFunction fn = new RegisterNodeFunction(reg);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void unregisterNode(String nodeId) throws Exception {
UnregisterNodeFunction fn = new UnregisterNodeFunction(nodeId);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
@@ -96,7 +79,7 @@
throws Exception {
NotifyTaskCompleteFunction fn = new NotifyTaskCompleteFunction(jobId, taskId,
nodeId, statistics);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
@@ -104,53 +87,53 @@
throws Exception {
NotifyTaskFailureFunction fn = new NotifyTaskFailureFunction(jobId, taskId, nodeId,
exceptions);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
NotifyJobletCleanupFunction fn = new NotifyJobletCleanupFunction(jobId, nodeId);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception {
NotifyDeployBinaryFunction fn = new NotifyDeployBinaryFunction(deploymentId, nodeId,
status);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData);
- ensureIpcHandle(0).send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
ReportProfileFunction fn = new ReportProfileFunction(id, profiles);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
RegisterPartitionProviderFunction fn = new RegisterPartitionProviderFunction(
partitionDescriptor);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
RegisterPartitionRequestFunction fn = new RegisterPartitionRequestFunction(
partitionRequest);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data,
deploymentId, nodeId);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
@@ -158,44 +141,44 @@
boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws Exception {
RegisterResultPartitionLocationFunction fn = new RegisterResultPartitionLocationFunction(
jobId, rsId, orderedResult, emptyResult, partition, nPartitions, networkAddress);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception {
ReportResultPartitionWriteCompletionFunction fn = new ReportResultPartitionWriteCompletionFunction(
jobId, rsId, partition);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception {
ReportDeployedJobSpecFailureFunction fn = new ReportDeployedJobSpecFailureFunction(deployedJobSpecId, nodeId);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void getNodeControllerInfos() throws Exception {
- ensureIpcHandle().send(-1, new GetNodeControllersInfoFunction(), null);
+ ipcHandle.send(-1, new GetNodeControllersInfoFunction(), null);
}
@Override
public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception {
StateDumpResponseFunction fn = new StateDumpResponseFunction(nodeId, stateDumpId,
state);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void notifyShutdown(String nodeId) throws Exception {
ShutdownResponseFunction sdrf = new ShutdownResponseFunction(nodeId);
- ensureIpcHandle().send(-1, sdrf, null);
+ ipcHandle.send(-1, sdrf, null);
}
@Override
public void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception {
ThreadDumpResponseFunction tdrf = new ThreadDumpResponseFunction(nodeId, requestId,
threadDumpJSON);
- ensureIpcHandle().send(-1, tdrf, null);
+ ipcHandle.send(-1, tdrf, null);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
deleted file mode 100644
index fe9e85a..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.common.ipc;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.ipc.api.IIPCHandle;
-import org.apache.hyracks.ipc.exceptions.IPCException;
-import org.apache.hyracks.ipc.impl.IPCSystem;
-import org.apache.logging.log4j.Logger;
-
-public abstract class ControllerRemoteProxy {
- protected final IPCSystem ipc;
- private final InetSocketAddress inetSocketAddress;
- private final IControllerRemoteProxyIPCEventListener eventListener;
- private IIPCHandle ipcHandle;
-
- protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress) {
- this(ipc, inetSocketAddress, null);
- }
-
- protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress,
- IControllerRemoteProxyIPCEventListener eventListener) {
- this.ipc = ipc;
- this.inetSocketAddress = inetSocketAddress;
- this.eventListener = eventListener == null ? new IControllerRemoteProxyIPCEventListener() {
- } : eventListener;
- }
-
- protected IIPCHandle ensureIpcHandle() throws HyracksDataException {
- return ensureIpcHandle(getMaxRetries(ipcHandle == null));
- }
-
- protected IIPCHandle ensureIpcHandle(int maxRetries) throws HyracksDataException {
- if (ipcHandle != null && ipcHandle.isConnected()) {
- return ipcHandle;
- }
- try {
- final boolean first = ipcHandle == null;
- if (!first) {
- getLogger().warn("ipcHandle " + ipcHandle + " disconnected; retrying connection");
- eventListener.ipcHandleDisconnected(ipcHandle);
- }
- ipcHandle = ipc.getHandle(inetSocketAddress, maxRetries);
- if (first) {
- eventListener.ipcHandleConnected(ipcHandle);
- } else {
- getLogger().warn("ipcHandle " + ipcHandle + " restored");
- eventListener.ipcHandleRestored(ipcHandle);
- }
- } catch (IPCException e) {
- throw HyracksDataException.create(e);
- }
- return ipcHandle;
- }
-
- /**
- * Maximum number of times to retry a failed connection attempt
- * @param first true if the initial connection attempt (i.e. server start)
- * @return the maximum number of retries, if any. <0 means retry forever
- */
- protected abstract int getMaxRetries(boolean first);
-
- protected abstract Logger getLogger();
-
- public InetSocketAddress getAddress() {
- return inetSocketAddress;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index b4aaf45..b6b9b4b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -48,26 +48,13 @@
import org.apache.hyracks.control.common.ipc.CCNCFunctions.UnDeployBinaryFunction;
import org.apache.hyracks.control.common.ipc.CCNCFunctions.UndeployJobSpecFunction;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
-import org.apache.hyracks.ipc.impl.IPCSystem;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.hyracks.ipc.api.IIPCHandle;
-public class NodeControllerRemoteProxy extends ControllerRemoteProxy implements INodeController {
- private static final Logger LOGGER = LogManager.getLogger();
+public class NodeControllerRemoteProxy implements INodeController {
+ private final IIPCHandle ipcHandle;
- public NodeControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress) {
- super(ipc, inetSocketAddress);
- }
-
- @Override
- protected int getMaxRetries(boolean first) {
- // -1 == retry forever
- return 0;
- }
-
- @Override
- protected Logger getLogger() {
- return LOGGER;
+ public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
+ this.ipcHandle = ipcHandle;
}
@Override
@@ -77,74 +64,78 @@
throws Exception {
StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId, planBytes,
taskDescriptors, connectorPolicies, flags, jobParameters, deployedJobSpecId);
- ensureIpcHandle().send(-1, stf, null);
+ ipcHandle.send(-1, stf, null);
}
@Override
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
AbortTasksFunction atf = new AbortTasksFunction(jobId, tasks);
- ensureIpcHandle().send(-1, atf, null);
+ ipcHandle.send(-1, atf, null);
}
@Override
public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
CleanupJobletFunction cjf = new CleanupJobletFunction(jobId, status);
- ensureIpcHandle().send(-1, cjf, null);
+ ipcHandle.send(-1, cjf, null);
}
@Override
public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
ReportPartitionAvailabilityFunction rpaf = new ReportPartitionAvailabilityFunction(
pid, networkAddress);
- ensureIpcHandle().send(-1, rpaf, null);
+ ipcHandle.send(-1, rpaf, null);
}
@Override
public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception {
DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs);
- ensureIpcHandle().send(-1, rpaf, null);
+ ipcHandle.send(-1, rpaf, null);
}
@Override
public void undeployBinary(DeploymentId deploymentId) throws Exception {
UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId);
- ensureIpcHandle().send(-1, rpaf, null);
+ ipcHandle.send(-1, rpaf, null);
}
@Override
public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception {
DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
UndeployJobSpecFunction fn = new UndeployJobSpecFunction(deployedJobSpecId);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void dumpState(String stateDumpId) throws Exception {
StateDumpRequestFunction dsf = new StateDumpRequestFunction(stateDumpId);
- ensureIpcHandle().send(-1, dsf, null);
+ ipcHandle.send(-1, dsf, null);
}
@Override
public void shutdown(boolean terminateNCService) throws Exception {
ShutdownRequestFunction sdrf = new ShutdownRequestFunction(terminateNCService);
- ensureIpcHandle().send(-1, sdrf, null);
+ ipcHandle.send(-1, sdrf, null);
}
@Override
public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data,
deploymentId, nodeId);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
}
@Override
public void takeThreadDump(String requestId) throws Exception {
ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId);
- ensureIpcHandle().send(-1, fn, null);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ public InetSocketAddress getAddress() {
+ return ipcHandle.getRemoteAddress();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 01e34c1..18a6b20 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -68,7 +68,6 @@
import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
import org.apache.hyracks.control.common.ipc.CCNCFunctions;
import org.apache.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
-import org.apache.hyracks.control.common.ipc.IControllerRemoteProxyIPCEventListener;
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.work.FutureValue;
import org.apache.hyracks.control.common.work.WorkQueue;
@@ -83,6 +82,7 @@
import org.apache.hyracks.control.nc.partitions.PartitionManager;
import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
+import org.apache.hyracks.ipc.api.IIPCEventListener;
import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
import org.apache.hyracks.ipc.exceptions.IPCException;
@@ -297,9 +297,10 @@
if (messagingNetManager != null) {
messagingNetManager.start();
}
- this.ccs = new ClusterControllerRemoteProxy(ipc,
+ this.ccs = new ClusterControllerRemoteProxy(
+ ipc.getHandle(
new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()),
- ncConfig.getClusterConnectRetries(), new IControllerRemoteProxyIPCEventListener() {
+ ncConfig.getClusterConnectRetries(), 1, new IIPCEventListener() {
@Override
public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
// we need to re-register in case of NC -> CC connection reset
@@ -310,7 +311,7 @@
throw new IPCException(e);
}
}
- });
+ }));
registerNode();
workQueue.start();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCEventListener.java
similarity index 88%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
rename to hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCEventListener.java
index ec4f9e4..a6ba545 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCEventListener.java
@@ -16,12 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.common.ipc;
+package org.apache.hyracks.ipc.api;
-import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.exceptions.IPCException;
-public interface IControllerRemoteProxyIPCEventListener {
+public interface IIPCEventListener {
default void ipcHandleConnected(IIPCHandle handle) throws IPCException {
// no-op
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index 8e38651..b36e645 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -22,6 +22,7 @@
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hyracks.ipc.api.IIPCEventListener;
import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.api.IIPCI;
import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
@@ -70,6 +71,28 @@
}
public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries) throws IPCException {
+ return getHandle(remoteAddress, maxRetries, 0);
+ }
+
+ public IIPCHandle getReconnectingHandle(InetSocketAddress remoteAddress) throws IPCException {
+ return getReconnectingHandle(remoteAddress, 1);
+ }
+
+ public IIPCHandle getReconnectingHandle(InetSocketAddress remoteAddress, int reconnectAttempts)
+ throws IPCException {
+ return getHandle(remoteAddress, 0, reconnectAttempts, NoOpIPCEventListener.INSTANCE);
+ }
+
+ public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries, int reconnectAttempts)
+ throws IPCException {
+ return getHandle(remoteAddress, maxRetries, reconnectAttempts, NoOpIPCEventListener.INSTANCE);
+ }
+
+ public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries, int reconnectAttempts,
+ IIPCEventListener eventListener) throws IPCException {
+ if (reconnectAttempts > 0) {
+ return new ReconnectingIPCHandle(this, eventListener, remoteAddress, maxRetries, reconnectAttempts);
+ }
try {
return cMgr.getIPCHandle(remoteAddress, maxRetries);
} catch (IOException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/NoOpIPCEventListener.java
similarity index 61%
copy from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
copy to hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/NoOpIPCEventListener.java
index ec4f9e4..156641f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/IControllerRemoteProxyIPCEventListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/NoOpIPCEventListener.java
@@ -16,22 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.common.ipc;
+package org.apache.hyracks.ipc.impl;
-import org.apache.hyracks.ipc.api.IIPCHandle;
-import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.ipc.api.IIPCEventListener;
-public interface IControllerRemoteProxyIPCEventListener {
+public class NoOpIPCEventListener implements IIPCEventListener {
+ public static final IIPCEventListener INSTANCE = new NoOpIPCEventListener();
- default void ipcHandleConnected(IIPCHandle handle) throws IPCException {
- // no-op
- }
-
- default void ipcHandleDisconnected(IIPCHandle handle) throws IPCException {
- // no-op
- }
-
- default void ipcHandleRestored(IIPCHandle handle) throws IPCException {
- // no-op
+ private NoOpIPCEventListener() {
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
new file mode 100644
index 0000000..f0da860
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.impl;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hyracks.ipc.api.IIPCEventListener;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+class ReconnectingIPCHandle implements IIPCHandle {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final IPCSystem ipc;
+ private final int reconnectAttempts;
+ private final IIPCEventListener listener;
+ private IIPCHandle delegate;
+
+ ReconnectingIPCHandle(IPCSystem ipc, IIPCEventListener listener, InetSocketAddress remoteAddress, int maxRetries,
+ int reconnectAttempts) throws IPCException {
+ this.ipc = ipc;
+ this.listener = listener;
+ this.reconnectAttempts = reconnectAttempts;
+ this.delegate = ipc.getHandle(remoteAddress, maxRetries);
+ listener.ipcHandleConnected(delegate);
+ }
+
+ @Override
+ public InetSocketAddress getRemoteAddress() {
+ return delegate.getRemoteAddress();
+ }
+
+ @Override
+ public long send(long requestId, Object payload, Exception exception) throws IPCException {
+ return ensureConnected().send(requestId, payload, exception);
+ }
+
+ @Override
+ public void setAttachment(Object attachment) {
+ delegate.setAttachment(attachment);
+ }
+
+ @Override
+ public Object getAttachment() {
+ return delegate.getAttachment();
+ }
+
+ @Override
+ public boolean isConnected() {
+ return delegate.isConnected();
+ }
+
+ private IIPCHandle ensureConnected() throws IPCException {
+ if (delegate.isConnected()) {
+ return delegate;
+ }
+ listener.ipcHandleDisconnected(delegate);
+ delegate = ipc.getHandle(getRemoteAddress(), reconnectAttempts);
+ LOGGER.warn("ipcHandle " + delegate + " restored");
+ listener.ipcHandleRestored(delegate);
+
+ return delegate;
+ }
+
+}