Attempt to reconnect IPCHandle on connection failure

IPCHandles can become invalid due to network interruption or node
crash/restart.  Automatically retry connection in event of attempt
to use disconnected handle.

Change-Id: I069dcd59898021054462c8213fb623df2deec598
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1828
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 79033d8..dc7bad0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -54,8 +54,8 @@
         CCNCFunctions.NodeRegistrationResult result;
         Map<IOption, Object> ncConfiguration = new HashMap<>();
         try {
-            INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
-            NodeControllerState state = new NodeControllerState(nodeController, reg);
+            INodeController nc = new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress());
+            NodeControllerState state = new NodeControllerState(nc, reg);
             INodeManager nodeManager = ccs.getNodeManager();
             nodeManager.addNode(id, state);
             IApplicationConfig cfg = state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 4eb1732..620033c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -1396,7 +1396,8 @@
         int cdid = dis.readInt();
         int senderIndex = dis.readInt();
         int receiverIndex = dis.readInt();
-        PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, receiverIndex);
+        PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex,
+                receiverIndex);
         return pid;
     }
 
@@ -1412,8 +1413,8 @@
         int aid = dis.readInt();
         int partition = dis.readInt();
         int attempt = dis.readInt();
-        TaskAttemptId taId = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid),
-                partition), attempt);
+        TaskAttemptId taId = new TaskAttemptId(
+                new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid), partition), attempt);
         return taId;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 83ef32b..98d258f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,7 +18,11 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
+import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*;
+
+import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
@@ -33,141 +37,153 @@
 import org.apache.hyracks.control.common.job.PartitionRequest;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
-import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.impl.IPCSystem;
 
-public class ClusterControllerRemoteProxy implements IClusterController {
-    private final IIPCHandle ipcHandle;
+public class ClusterControllerRemoteProxy extends ControllerRemoteProxy implements IClusterController {
+    private static final Logger LOGGER = Logger.getLogger(ClusterControllerRemoteProxy.class.getName());
 
-    public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
-        this.ipcHandle = ipcHandle;
+    private final int clusterConnectRetries;
+
+    public ClusterControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress, int clusterConnectRetries) {
+        super(ipc, inetSocketAddress);
+        this.clusterConnectRetries = clusterConnectRetries;
+    }
+
+    @Override
+    protected int getRetries(boolean first) {
+        return first ? clusterConnectRetries : 0;
+    }
+
+    @Override
+    protected Logger getLogger() {
+        return LOGGER;
     }
 
     @Override
     public void registerNode(NodeRegistration reg) throws Exception {
-        CCNCFunctions.RegisterNodeFunction fn = new CCNCFunctions.RegisterNodeFunction(reg);
-        ipcHandle.send(-1, fn, null);
+        RegisterNodeFunction fn = new RegisterNodeFunction(reg);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void unregisterNode(String nodeId) throws Exception {
-        CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(nodeId);
-        ipcHandle.send(-1, fn, null);
+        UnregisterNodeFunction fn = new UnregisterNodeFunction(nodeId);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
             throws Exception {
-        CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(jobId, taskId,
+        NotifyTaskCompleteFunction fn = new NotifyTaskCompleteFunction(jobId, taskId,
                 nodeId, statistics);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
             throws Exception {
-        CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(jobId, taskId, nodeId,
+        NotifyTaskFailureFunction fn = new NotifyTaskFailureFunction(jobId, taskId, nodeId,
                 exceptions);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
-        CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(jobId, nodeId);
-        ipcHandle.send(-1, fn, null);
+        NotifyJobletCleanupFunction fn = new NotifyJobletCleanupFunction(jobId, nodeId);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception {
-        CCNCFunctions.NotifyDeployBinaryFunction fn = new CCNCFunctions.NotifyDeployBinaryFunction(deploymentId,
-                nodeId, status);
-        ipcHandle.send(-1, fn, null);
+        NotifyDeployBinaryFunction fn = new NotifyDeployBinaryFunction(deploymentId, nodeId,
+                status);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
-        CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id, hbData);
-        ipcHandle.send(-1, fn, null);
+        NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
-        CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id, profiles);
-        ipcHandle.send(-1, fn, null);
+        ReportProfileFunction fn = new ReportProfileFunction(id, profiles);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
-        CCNCFunctions.RegisterPartitionProviderFunction fn = new CCNCFunctions.RegisterPartitionProviderFunction(
+        RegisterPartitionProviderFunction fn = new RegisterPartitionProviderFunction(
                 partitionDescriptor);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
-        CCNCFunctions.RegisterPartitionRequestFunction fn = new CCNCFunctions.RegisterPartitionRequestFunction(
+        RegisterPartitionRequestFunction fn = new RegisterPartitionRequestFunction(
                 partitionRequest);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
-        CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data,
+        SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data,
                 deploymentId, nodeId);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
-                                                boolean emptyResult, int partition, int nPartitions,
-                                                NetworkAddress networkAddress) throws Exception {
-        CCNCFunctions.RegisterResultPartitionLocationFunction fn =
-                new CCNCFunctions.RegisterResultPartitionLocationFunction(jobId, rsId, orderedResult, emptyResult,
-                        partition, nPartitions, networkAddress);
-        ipcHandle.send(-1, fn, null);
+            boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws Exception {
+        RegisterResultPartitionLocationFunction fn = new RegisterResultPartitionLocationFunction(
+                jobId, rsId, orderedResult, emptyResult, partition, nPartitions, networkAddress);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception {
-        CCNCFunctions.ReportResultPartitionWriteCompletionFunction fn =
-                new CCNCFunctions.ReportResultPartitionWriteCompletionFunction(jobId, rsId, partition);
-        ipcHandle.send(-1, fn, null);
+        ReportResultPartitionWriteCompletionFunction fn = new ReportResultPartitionWriteCompletionFunction(
+                jobId, rsId, partition);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception {
-        CCNCFunctions.ReportResultPartitionFailureFunction fn =
-                new CCNCFunctions.ReportResultPartitionFailureFunction(jobId, rsId, partition);
-        ipcHandle.send(-1, fn, null);
+        ReportResultPartitionFailureFunction fn = new ReportResultPartitionFailureFunction(
+                jobId, rsId, partition);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void notifyDistributedJobFailure(JobId jobId, String nodeId) throws Exception {
-        CCNCFunctions.ReportDistributedJobFailureFunction fn =
-                new CCNCFunctions.ReportDistributedJobFailureFunction(jobId, nodeId);
-        ipcHandle.send(-1, fn, null);
+        ReportDistributedJobFailureFunction fn = new ReportDistributedJobFailureFunction(
+                jobId, nodeId);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void getNodeControllerInfos() throws Exception {
-        ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), null);
+        ensureIpcHandle().send(-1, new GetNodeControllersInfoFunction(), null);
     }
 
     @Override
     public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception {
-        CCNCFunctions.StateDumpResponseFunction fn =
-                new CCNCFunctions.StateDumpResponseFunction(nodeId, stateDumpId, state);
-        ipcHandle.send(-1, fn, null);
+        StateDumpResponseFunction fn = new StateDumpResponseFunction(nodeId, stateDumpId,
+                state);
+        ensureIpcHandle().send(-1, fn, null);
     }
+
     @Override
-    public void notifyShutdown(String nodeId) throws Exception{
-        CCNCFunctions.ShutdownResponseFunction sdrf = new CCNCFunctions.ShutdownResponseFunction(nodeId);
-        ipcHandle.send(-1, sdrf, null);
+    public void notifyShutdown(String nodeId) throws Exception {
+        ShutdownResponseFunction sdrf = new ShutdownResponseFunction(nodeId);
+        ensureIpcHandle().send(-1, sdrf, null);
     }
 
     @Override
     public void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception {
-        CCNCFunctions.ThreadDumpResponseFunction tdrf =
-                new CCNCFunctions.ThreadDumpResponseFunction(nodeId, requestId, threadDumpJSON);
-        ipcHandle.send(-1, tdrf, null);
+        ThreadDumpResponseFunction tdrf = new ThreadDumpResponseFunction(nodeId, requestId,
+                threadDumpJSON);
+        ensureIpcHandle().send(-1, tdrf, null);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
new file mode 100644
index 0000000..44b0e4a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.ipc;
+
+import java.net.InetSocketAddress;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+
+public abstract class ControllerRemoteProxy {
+    protected final IPCSystem ipc;
+    protected final InetSocketAddress inetSocketAddress;
+    private IIPCHandle ipcHandle;
+
+    protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress) {
+        this.ipc = ipc;
+        this.inetSocketAddress = inetSocketAddress;
+    }
+
+    protected IIPCHandle ensureIpcHandle() throws IPCException {
+        final boolean first = ipcHandle == null;
+        if (first || !ipcHandle.isConnected()) {
+            if (!first) {
+                getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection");
+            }
+            ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first));
+            if (!first && ipcHandle.isConnected()) {
+                getLogger().warning("ipcHandle " + ipcHandle + " restored");
+            }
+        }
+        return ipcHandle;
+    }
+
+    protected abstract int getRetries(boolean first);
+
+    protected abstract Logger getLogger();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 2a8464e..68a5b76 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -18,10 +18,14 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
+import static org.apache.hyracks.control.common.ipc.CCNCFunctions.*;
+
+import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -34,89 +38,99 @@
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.common.base.INodeController;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
-import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.impl.IPCSystem;
 
-public class NodeControllerRemoteProxy implements INodeController {
-    private final IIPCHandle ipcHandle;
+public class NodeControllerRemoteProxy extends ControllerRemoteProxy implements INodeController {
+    private static final Logger LOGGER = Logger.getLogger(NodeControllerRemoteProxy.class.getName());
 
-    public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
-        this.ipcHandle = ipcHandle;
+    public NodeControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress) {
+        super(ipc, inetSocketAddress);
+    }
+
+    @Override
+    protected int getRetries(boolean first) {
+        return 0;
+    }
+
+    @Override
+    protected Logger getLogger() {
+        return LOGGER;
     }
 
     @Override
     public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
             List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
             Set<JobFlag> flags) throws Exception {
-        CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(deploymentId, jobId, planBytes,
+        StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId, planBytes,
                 taskDescriptors, connectorPolicies, flags);
-        ipcHandle.send(-1, stf, null);
+        ensureIpcHandle().send(-1, stf, null);
     }
 
     @Override
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
-        CCNCFunctions.AbortTasksFunction atf = new CCNCFunctions.AbortTasksFunction(jobId, tasks);
-        ipcHandle.send(-1, atf, null);
+        AbortTasksFunction atf = new AbortTasksFunction(jobId, tasks);
+        ensureIpcHandle().send(-1, atf, null);
     }
 
     @Override
     public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
-        CCNCFunctions.CleanupJobletFunction cjf = new CCNCFunctions.CleanupJobletFunction(jobId, status);
-        ipcHandle.send(-1, cjf, null);
+        CleanupJobletFunction cjf = new CleanupJobletFunction(jobId, status);
+        ensureIpcHandle().send(-1, cjf, null);
     }
 
     @Override
     public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
-        CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = new CCNCFunctions.ReportPartitionAvailabilityFunction(
+        ReportPartitionAvailabilityFunction rpaf = new ReportPartitionAvailabilityFunction(
                 pid, networkAddress);
-        ipcHandle.send(-1, rpaf, null);
+        ensureIpcHandle().send(-1, rpaf, null);
     }
 
     @Override
     public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception {
-        CCNCFunctions.DeployBinaryFunction rpaf = new CCNCFunctions.DeployBinaryFunction(deploymentId, binaryURLs);
-        ipcHandle.send(-1, rpaf, null);
+        DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs);
+        ensureIpcHandle().send(-1, rpaf, null);
     }
 
     @Override
     public void undeployBinary(DeploymentId deploymentId) throws Exception {
-        CCNCFunctions.UnDeployBinaryFunction rpaf = new CCNCFunctions.UnDeployBinaryFunction(deploymentId);
-        ipcHandle.send(-1, rpaf, null);
+        UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId);
+        ensureIpcHandle().send(-1, rpaf, null);
     }
 
     @Override
     public void distributeJob(JobId jobId, byte[] planBytes) throws Exception {
-        CCNCFunctions.DistributeJobFunction fn = new CCNCFunctions.DistributeJobFunction(jobId, planBytes);
-        ipcHandle.send(-1, fn, null);
+        DistributeJobFunction fn = new DistributeJobFunction(jobId, planBytes);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void destroyJob(JobId jobId) throws Exception {
-        CCNCFunctions.DestroyJobFunction fn = new CCNCFunctions.DestroyJobFunction(jobId);
-        ipcHandle.send(-1, fn, null);
+        DestroyJobFunction fn = new DestroyJobFunction(jobId);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void dumpState(String stateDumpId) throws Exception {
-        CCNCFunctions.StateDumpRequestFunction dsf = new CCNCFunctions.StateDumpRequestFunction(stateDumpId);
-        ipcHandle.send(-1, dsf, null);
+        StateDumpRequestFunction dsf = new StateDumpRequestFunction(stateDumpId);
+        ensureIpcHandle().send(-1, dsf, null);
     }
 
     @Override
     public void shutdown(boolean terminateNCService) throws Exception {
-        CCNCFunctions.ShutdownRequestFunction sdrf = new CCNCFunctions.ShutdownRequestFunction(terminateNCService);
-        ipcHandle.send(-1, sdrf, null);
+        ShutdownRequestFunction sdrf = new ShutdownRequestFunction(terminateNCService);
+        ensureIpcHandle().send(-1, sdrf, null);
     }
 
     @Override
     public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
-        CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data,
+        SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data,
                 deploymentId, nodeId);
-        ipcHandle.send(-1, fn, null);
+        ensureIpcHandle().send(-1, fn, null);
     }
 
     @Override
     public void takeThreadDump(String requestId) throws Exception {
-        CCNCFunctions.ThreadDumpRequestFunction fn = new CCNCFunctions.ThreadDumpRequestFunction(requestId);
-        ipcHandle.send(-1, fn, null);
+        ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId);
+        ensureIpcHandle().send(-1, fn, null);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 2fe0e27..0587a55 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -80,7 +80,6 @@
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
 import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
-import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
@@ -279,10 +278,9 @@
         if (messagingNetManager != null) {
             messagingNetManager.start();
         }
-        IIPCHandle ccIPCHandle = ipc.getHandle(
+        this.ccs = new ClusterControllerRemoteProxy(ipc,
                 new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()),
                 ncConfig.getClusterConnectRetries());
-        this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
             gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index fe2bcae..9efd70e 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -99,7 +99,7 @@
         while (true) {
             synchronized (this) {
                 handle = ipcHandleMap.get(remoteAddress);
-                if (handle == null) {
+                if (handle == null || !handle.isConnected()) {
                     handle = new IPCHandle(system, remoteAddress);
                     pendingConnections.add(handle);
                     networkThread.selector.wakeup();