Attempt to reconnect IPCHandle on connection failure
IPCHandles can become invalid due to network interruption or node
crash/restart. Automatically retry connection in event of attempt
to use disconnected handle.
Change-Id: I069dcd59898021054462c8213fb623df2deec598
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1828
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 79033d8..dc7bad0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -54,8 +54,8 @@
CCNCFunctions.NodeRegistrationResult result;
Map<IOption, Object> ncConfiguration = new HashMap<>();
try {
- INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
- NodeControllerState state = new NodeControllerState(nodeController, reg);
+ INodeController nc = new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress());
+ NodeControllerState state = new NodeControllerState(nc, reg);
INodeManager nodeManager = ccs.getNodeManager();
nodeManager.addNode(id, state);
IApplicationConfig cfg = state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 4eb1732..620033c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -1396,7 +1396,8 @@
int cdid = dis.readInt();
int senderIndex = dis.readInt();
int receiverIndex = dis.readInt();
- PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, receiverIndex);
+ PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex,
+ receiverIndex);
return pid;
}
@@ -1412,8 +1413,8 @@
int aid = dis.readInt();
int partition = dis.readInt();
int attempt = dis.readInt();
- TaskAttemptId taId = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid),
- partition), attempt);
+ TaskAttemptId taId = new TaskAttemptId(
+ new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid), partition), attempt);
return taId;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 83ef32b..98d258f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,7 +18,11 @@
*/
package org.apache.hyracks.control.common.ipc;
+import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*;
+
+import java.net.InetSocketAddress;
import java.util.List;
+import java.util.logging.Logger;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
@@ -33,141 +37,153 @@
import org.apache.hyracks.control.common.job.PartitionRequest;
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
-import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.impl.IPCSystem;
-public class ClusterControllerRemoteProxy implements IClusterController {
- private final IIPCHandle ipcHandle;
+public class ClusterControllerRemoteProxy extends ControllerRemoteProxy implements IClusterController {
+ private static final Logger LOGGER = Logger.getLogger(ClusterControllerRemoteProxy.class.getName());
- public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
- this.ipcHandle = ipcHandle;
+ private final int clusterConnectRetries;
+
+ public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress, int clusterConnectRetries) {
+ super(ipc, inetSocketAddress);
+ this.clusterConnectRetries = clusterConnectRetries;
+ }
+
+ @Override
+ protected int getRetries(boolean first) {
+ return first ? clusterConnectRetries : 0;
+ }
+
+ @Override
+ protected Logger getLogger() {
+ return LOGGER;
}
@Override
public void registerNode(NodeRegistration reg) throws Exception {
- CCNCFunctions.RegisterNodeFunction fn = new CCNCFunctions.RegisterNodeFunction(reg);
- ipcHandle.send(-1, fn, null);
+ RegisterNodeFunction fn = new RegisterNodeFunction(reg);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void unregisterNode(String nodeId) throws Exception {
- CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(nodeId);
- ipcHandle.send(-1, fn, null);
+ UnregisterNodeFunction fn = new UnregisterNodeFunction(nodeId);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception {
- CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(jobId, taskId,
+ NotifyTaskCompleteFunction fn = new NotifyTaskCompleteFunction(jobId, taskId,
nodeId, statistics);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
throws Exception {
- CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId,
+ NotifyTaskFailureFunction fn = new NotifyTaskFailureFunction(jobId, taskId, nodeId,
exceptions);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
- CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(jobId, nodeId);
- ipcHandle.send(-1, fn, null);
+ NotifyJobletCleanupFunction fn = new NotifyJobletCleanupFunction(jobId, nodeId);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception {
- CCNCFunctions.NotifyDeployBinaryFunction fn = new CCNCFunctions.NotifyDeployBinaryFunction(deploymentId,
- nodeId, status);
- ipcHandle.send(-1, fn, null);
+ NotifyDeployBinaryFunction fn = new NotifyDeployBinaryFunction(deploymentId, nodeId,
+ status);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
- CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id, hbData);
- ipcHandle.send(-1, fn, null);
+ NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
- CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id, profiles);
- ipcHandle.send(-1, fn, null);
+ ReportProfileFunction fn = new ReportProfileFunction(id, profiles);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
- CCNCFunctions.RegisterPartitionProviderFunction fn = new CCNCFunctions.RegisterPartitionProviderFunction(
+ RegisterPartitionProviderFunction fn = new RegisterPartitionProviderFunction(
partitionDescriptor);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
- CCNCFunctions.RegisterPartitionRequestFunction fn = new CCNCFunctions.RegisterPartitionRequestFunction(
+ RegisterPartitionRequestFunction fn = new RegisterPartitionRequestFunction(
partitionRequest);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
- CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data,
+ SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data,
deploymentId, nodeId);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
- boolean emptyResult, int partition, int nPartitions,
- NetworkAddress networkAddress) throws Exception {
- CCNCFunctions.RegisterResultPartitionLocationFunction fn =
- new CCNCFunctions.RegisterResultPartitionLocationFunction(jobId, rsId, orderedResult, emptyResult,
- partition, nPartitions, networkAddress);
- ipcHandle.send(-1, fn, null);
+ boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws Exception {
+ RegisterResultPartitionLocationFunction fn = new RegisterResultPartitionLocationFunction(
+ jobId, rsId, orderedResult, emptyResult, partition, nPartitions, networkAddress);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception {
- CCNCFunctions.ReportResultPartitionWriteCompletionFunction fn =
- new CCNCFunctions.ReportResultPartitionWriteCompletionFunction(jobId, rsId, partition);
- ipcHandle.send(-1, fn, null);
+ ReportResultPartitionWriteCompletionFunction fn = new ReportResultPartitionWriteCompletionFunction(
+ jobId, rsId, partition);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception {
- CCNCFunctions.ReportResultPartitionFailureFunction fn =
- new CCNCFunctions.ReportResultPartitionFailureFunction(jobId, rsId, partition);
- ipcHandle.send(-1, fn, null);
+ ReportResultPartitionFailureFunction fn = new ReportResultPartitionFailureFunction(
+ jobId, rsId, partition);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception {
- CCNCFunctions.ReportDistributedJobFailureFunction fn =
- new CCNCFunctions.ReportDistributedJobFailureFunction(jobId, nodeId);
- ipcHandle.send(-1, fn, null);
+ ReportDistributedJobFailureFunction fn = new ReportDistributedJobFailureFunction(
+ jobId, nodeId);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void getNodeControllerInfos() throws Exception {
- ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), null);
+ ensureIpcHandle().send(-1, new GetNodeControllersInfoFunction(), null);
}
@Override
public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception {
- CCNCFunctions.StateDumpResponseFunction fn =
- new CCNCFunctions.StateDumpResponseFunction(nodeId, stateDumpId, state);
- ipcHandle.send(-1, fn, null);
+ StateDumpResponseFunction fn = new StateDumpResponseFunction(nodeId, stateDumpId,
+ state);
+ ensureIpcHandle().send(-1, fn, null);
}
+
@Override
- public void notifyShutdown(String nodeId) throws Exception{
- CCNCFunctions.ShutdownResponseFunction sdrf = new CCNCFunctions.ShutdownResponseFunction(nodeId);
- ipcHandle.send(-1, sdrf, null);
+ public void notifyShutdown(String nodeId) throws Exception {
+ ShutdownResponseFunction sdrf = new ShutdownResponseFunction(nodeId);
+ ensureIpcHandle().send(-1, sdrf, null);
}
@Override
public void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception {
- CCNCFunctions.ThreadDumpResponseFunction tdrf =
- new CCNCFunctions.ThreadDumpResponseFunction(nodeId, requestId, threadDumpJSON);
- ipcHandle.send(-1, tdrf, null);
+ ThreadDumpResponseFunction tdrf = new ThreadDumpResponseFunction(nodeId, requestId,
+ threadDumpJSON);
+ ensureIpcHandle().send(-1, tdrf, null);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
new file mode 100644
index 0000000..44b0e4a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.ipc;
+
+import java.net.InetSocketAddress;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+
+public abstract class ControllerRemoteProxy {
+ protected final IPCSystem ipc;
+ protected final InetSocketAddress inetSocketAddress;
+ private IIPCHandle ipcHandle;
+
+ protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress) {
+ this.ipc = ipc;
+ this.inetSocketAddress = inetSocketAddress;
+ }
+
+ protected IIPCHandle ensureIpcHandle() throws IPCException {
+ final boolean first = ipcHandle == null;
+ if (first || !ipcHandle.isConnected()) {
+ if (!first) {
+ getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection");
+ }
+ ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first));
+ if (!first && ipcHandle.isConnected()) {
+ getLogger().warning("ipcHandle " + ipcHandle + " restored");
+ }
+ }
+ return ipcHandle;
+ }
+
+ protected abstract int getRetries(boolean first);
+
+ protected abstract Logger getLogger();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 2a8464e..68a5b76 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -18,10 +18,14 @@
*/
package org.apache.hyracks.control.common.ipc;
+import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*;
+
+import java.net.InetSocketAddress;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.logging.Logger;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -34,89 +38,99 @@
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.common.base.INodeController;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
-import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.impl.IPCSystem;
-public class NodeControllerRemoteProxy implements INodeController {
- private final IIPCHandle ipcHandle;
+public class NodeControllerRemoteProxy extends ControllerRemoteProxy implements INodeController {
+ private static final Logger LOGGER = Logger.getLogger(NodeControllerRemoteProxy.class.getName());
- public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
- this.ipcHandle = ipcHandle;
+ public NodeControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress) {
+ super(ipc, inetSocketAddress);
+ }
+
+ @Override
+ protected int getRetries(boolean first) {
+ return 0;
+ }
+
+ @Override
+ protected Logger getLogger() {
+ return LOGGER;
}
@Override
public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
Set<JobFlag> flags) throws Exception {
- CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(deploymentId, jobId, planBytes,
+ StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId, planBytes,
taskDescriptors, connectorPolicies, flags);
- ipcHandle.send(-1, stf, null);
+ ensureIpcHandle().send(-1, stf, null);
}
@Override
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
- CCNCFunctions.AbortTasksFunction atf = new CCNCFunctions.AbortTasksFunction(jobId, tasks);
- ipcHandle.send(-1, atf, null);
+ AbortTasksFunction atf = new AbortTasksFunction(jobId, tasks);
+ ensureIpcHandle().send(-1, atf, null);
}
@Override
public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
- CCNCFunctions.CleanupJobletFunction cjf = new CCNCFunctions.CleanupJobletFunction(jobId, status);
- ipcHandle.send(-1, cjf, null);
+ CleanupJobletFunction cjf = new CleanupJobletFunction(jobId, status);
+ ensureIpcHandle().send(-1, cjf, null);
}
@Override
public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
- CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = new CCNCFunctions.ReportPartitionAvailabilityFunction(
+ ReportPartitionAvailabilityFunction rpaf = new ReportPartitionAvailabilityFunction(
pid, networkAddress);
- ipcHandle.send(-1, rpaf, null);
+ ensureIpcHandle().send(-1, rpaf, null);
}
@Override
public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception {
- CCNCFunctions.DeployBinaryFunction rpaf = new CCNCFunctions.DeployBinaryFunction(deploymentId, binaryURLs);
- ipcHandle.send(-1, rpaf, null);
+ DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs);
+ ensureIpcHandle().send(-1, rpaf, null);
}
@Override
public void undeployBinary(DeploymentId deploymentId) throws Exception {
- CCNCFunctions.UnDeployBinaryFunction rpaf = new CCNCFunctions.UnDeployBinaryFunction(deploymentId);
- ipcHandle.send(-1, rpaf, null);
+ UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId);
+ ensureIpcHandle().send(-1, rpaf, null);
}
@Override
public void distributeJob(JobId jobId, byte[] planBytes) throws Exception {
- CCNCFunctions.DistributeJobFunction fn = new CCNCFunctions.DistributeJobFunction(jobId, planBytes);
- ipcHandle.send(-1, fn, null);
+ DistributeJobFunction fn = new DistributeJobFunction(jobId, planBytes);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void destroyJob(JobId jobId) throws Exception {
- CCNCFunctions.DestroyJobFunction fn = new CCNCFunctions.DestroyJobFunction(jobId);
- ipcHandle.send(-1, fn, null);
+ DestroyJobFunction fn = new DestroyJobFunction(jobId);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void dumpState(String stateDumpId) throws Exception {
- CCNCFunctions.StateDumpRequestFunction dsf = new CCNCFunctions.StateDumpRequestFunction(stateDumpId);
- ipcHandle.send(-1, dsf, null);
+ StateDumpRequestFunction dsf = new StateDumpRequestFunction(stateDumpId);
+ ensureIpcHandle().send(-1, dsf, null);
}
@Override
public void shutdown(boolean terminateNCService) throws Exception {
- CCNCFunctions.ShutdownRequestFunction sdrf = new CCNCFunctions.ShutdownRequestFunction(terminateNCService);
- ipcHandle.send(-1, sdrf, null);
+ ShutdownRequestFunction sdrf = new ShutdownRequestFunction(terminateNCService);
+ ensureIpcHandle().send(-1, sdrf, null);
}
@Override
public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
- CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data,
+ SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data,
deploymentId, nodeId);
- ipcHandle.send(-1, fn, null);
+ ensureIpcHandle().send(-1, fn, null);
}
@Override
public void takeThreadDump(String requestId) throws Exception {
- CCNCFunctions.ThreadDumpRequestFunction fn = new CCNCFunctions.ThreadDumpRequestFunction(requestId);
- ipcHandle.send(-1, fn, null);
+ ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId);
+ ensureIpcHandle().send(-1, fn, null);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 2fe0e27..0587a55 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -80,7 +80,6 @@
import org.apache.hyracks.control.nc.partitions.PartitionManager;
import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
-import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
@@ -279,10 +278,9 @@
if (messagingNetManager != null) {
messagingNetManager.start();
}
- IIPCHandle ccIPCHandle = ipc.getHandle(
+ this.ccs = new ClusterControllerRemoteProxy(ipc,
new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()),
ncConfig.getClusterConnectRetries());
- this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
for (int i = 0; i < gcInfos.length; ++i) {
gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index fe2bcae..9efd70e 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -99,7 +99,7 @@
while (true) {
synchronized (this) {
handle = ipcHandleMap.get(remoteAddress);
- if (handle == null) {
+ if (handle == null || !handle.isConnected()) {
handle = new IPCHandle(system, remoteAddress);
pendingConnections.add(handle);
networkThread.selector.wakeup();