diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index 99747fa..69601cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Objects;
 
 import org.apache.hyracks.api.util.ErrorMessageUtil;
 
@@ -148,4 +149,14 @@
         }
         return msgCache;
     }
+
+    public boolean matches(String component, int errorCode) {
+        Objects.requireNonNull(component, "component");
+        return component.equals(this.component) && errorCode == this.errorCode;
+    }
+
+    @Override
+    public String toString() {
+        return getLocalizedMessage();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 7e5d22c..85aa8ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -74,7 +74,8 @@
                 break;
             case NODE_HEARTBEAT:
                 CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
-                ccs.getExecutor().execute(new NodeHeartbeatWork(ccs, nhf.getNodeId(), nhf.getHeartbeatData()));
+                ccs.getExecutor().execute(
+                        new NodeHeartbeatWork(ccs, nhf.getNodeId(), nhf.getHeartbeatData(), nhf.getNcAddress()));
                 break;
             case NOTIFY_JOBLET_CLEANUP:
                 CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
index 8e7faff..401360c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractHeartbeatWork.java
@@ -27,9 +27,9 @@
 
 public abstract class AbstractHeartbeatWork extends SynchronizableWork {
 
-    private final ClusterControllerService ccs;
-    private final String nodeId;
-    private final HeartbeatData hbData;
+    protected final ClusterControllerService ccs;
+    protected final String nodeId;
+    protected final HeartbeatData hbData;
 
     public AbstractHeartbeatWork(ClusterControllerService ccs, String nodeId, HeartbeatData hbData) {
         this.ccs = ccs;
@@ -38,7 +38,7 @@
     }
 
     @Override
-    public void doRun() {
+    public void doRun() throws Exception {
         INodeManager nodeManager = ccs.getNodeManager();
         NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
         if (state != null) {
@@ -51,6 +51,6 @@
         runWork();
     }
 
-    public abstract void runWork();
+    public abstract void runWork() throws Exception;
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
index d0c6567..b9053ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
@@ -36,17 +36,13 @@
 import org.apache.hyracks.control.cc.job.TaskClusterAttempt;
 
 public abstract class AbstractTaskLifecycleWork extends AbstractHeartbeatWork {
-    protected final ClusterControllerService ccs;
     protected final JobId jobId;
     protected final TaskAttemptId taId;
-    protected final String nodeId;
 
     public AbstractTaskLifecycleWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId) {
         super(ccs, nodeId, null);
-        this.ccs = ccs;
         this.jobId = jobId;
         this.taId = taId;
-        this.nodeId = nodeId;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
index 392046d..f2aa1f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -35,15 +35,11 @@
     private static final Logger LOGGER = LogManager.getLogger();
     private byte[] message;
     private DeploymentId deploymentId;
-    private String nodeId;
-    private ClusterControllerService ccs;
 
     public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId,
             String nodeId) {
         super(ccs, nodeId, null);
-        this.ccs = ccs;
         this.deploymentId = deploymentId;
-        this.nodeId = nodeId;
         this.message = message;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index cc37f9c..727793b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -37,15 +37,11 @@
 public class JobletCleanupNotificationWork extends AbstractHeartbeatWork {
     private static final Logger LOGGER = LogManager.getLogger();
 
-    private ClusterControllerService ccs;
     private JobId jobId;
-    private String nodeId;
 
     public JobletCleanupNotificationWork(ClusterControllerService ccs, JobId jobId, String nodeId) {
         super(ccs, nodeId, null);
-        this.ccs = ccs;
         this.jobId = jobId;
-        this.nodeId = nodeId;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
index 5c98035..b772ef9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NodeHeartbeatWork.java
@@ -18,19 +18,39 @@
  */
 package org.apache.hyracks.control.cc.work;
 
+import java.net.InetSocketAddress;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
+import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
 import org.apache.logging.log4j.Level;
 
 public class NodeHeartbeatWork extends AbstractHeartbeatWork {
 
-    public NodeHeartbeatWork(ClusterControllerService ccs, String nodeId, HeartbeatData hbData) {
+    private final InetSocketAddress ncAddress;
+
+    public NodeHeartbeatWork(ClusterControllerService ccs, String nodeId, HeartbeatData hbData,
+            InetSocketAddress ncAddress) {
         super(ccs, nodeId, hbData);
+        this.ncAddress = ncAddress;
     }
 
     @Override
-    public void runWork() {
-
+    public void runWork() throws Exception {
+        INodeManager nodeManager = ccs.getNodeManager();
+        final NodeControllerState ncState = nodeManager.getNodeControllerState(nodeId);
+        if (ncState != null) {
+            ncState.getNodeController().heartbeatAck(ccs.getCcId(), null);
+        } else {
+            // unregistered nc- let him know
+            NodeControllerRemoteProxy nc =
+                    new NodeControllerRemoteProxy(ccs.getCcId(), ccs.getClusterIPC().getReconnectingHandle(ncAddress));
+            nc.heartbeatAck(ccs.getCcId(), HyracksDataException.create(ErrorCode.NO_SUCH_NODE, nodeId));
+        }
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
index 62c19bb..80aae39 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/NotifyDeployBinaryWork.java
@@ -31,16 +31,12 @@
  */
 public class NotifyDeployBinaryWork extends AbstractHeartbeatWork {
 
-    private final ClusterControllerService ccs;
-    private final String nodeId;
     private final DeploymentId deploymentId;
     private DeploymentStatus deploymentStatus;
 
     public NotifyDeployBinaryWork(ClusterControllerService ccs, DeploymentId deploymentId, String nodeId,
             DeploymentStatus deploymentStatus) {
         super(ccs, nodeId, null);
-        this.ccs = ccs;
-        this.nodeId = nodeId;
         this.deploymentId = deploymentId;
         this.deploymentStatus = deploymentStatus;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 00693df..fe33bc9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -49,10 +49,16 @@
     @Override
     protected void doRun() throws Exception {
         String id = reg.getNodeId();
-        LOGGER.warn("Registering node: {}", id);
+        LOGGER.info("registering node: {}", id);
         NodeControllerRemoteProxy nc = new NodeControllerRemoteProxy(ccs.getCcId(),
                 ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
         INodeManager nodeManager = ccs.getNodeManager();
+        NodeParameters params = new NodeParameters();
+        params.setClusterControllerInfo(ccs.getClusterControllerInfo());
+        params.setDistributedState(ccs.getContext().getDistributedState());
+        params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
+        params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
+        params.setRegistrationId(registrationId);
         try {
             NodeControllerState state = new NodeControllerState(nc, reg);
             nodeManager.addNode(id, state);
@@ -61,21 +67,13 @@
             for (IOption option : cfg.getOptions()) {
                 ncConfiguration.put(option, cfg.get(option));
             }
-            LOGGER.warn("Registered node: {}", id);
-            NodeParameters params = new NodeParameters();
-            params.setClusterControllerInfo(ccs.getClusterControllerInfo());
-            params.setDistributedState(ccs.getContext().getDistributedState());
-            params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
-            params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
-            params.setRegistrationId(registrationId);
-            LOGGER.warn("sending registration response to node {}", id);
+            LOGGER.info("registered node: {}", id);
             nc.sendRegistrationResult(params, null);
-            LOGGER.warn("notifying node {} joined", id);
             ccs.getContext().notifyNodeJoin(id, ncConfiguration);
         } catch (Exception e) {
-            LOGGER.error("Node {} registration failed", id, e);
+            LOGGER.error("node {} registration failed", id, e);
             nodeManager.removeNode(id);
-            nc.sendRegistrationResult(null, e);
+            nc.sendRegistrationResult(params, e);
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index fbaff55..c811169 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.common.base;
 
+import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
@@ -54,7 +55,7 @@
 
     void notifyShutdown(String nodeId) throws Exception;
 
-    void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
+    void nodeHeartbeat(String id, HeartbeatData hbData, InetSocketAddress ncAddress) throws Exception;
 
     void reportProfile(String id, List<JobProfile> profiles) throws Exception;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index d7941f2..42a0d66 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -93,4 +94,13 @@
      * @throws IPCException
      */
     void ping(CcId ccId) throws IPCException;
+
+    /**
+     * Delivers a response to a heartbeat delivered to this {@link CcId}
+     *
+     * @param ccId
+     * @param e
+     * @throws IPCException
+     */
+    void heartbeatAck(CcId ccId, HyracksDataException e) throws IPCException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
index b49dc20..1dae48c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -176,7 +176,7 @@
     }
 
     public synchronized void ensureNode(String nodeId) {
-        LOGGER.debug("ensureNode: " + nodeId);
+        LOGGER.trace("+ensureNode: {}", nodeId);
         Map<IOption, Object> nodeDefinedMap =
                 nodeSpecificDefinedMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
         Map<IOption, Object> nodeDefaultMap =
@@ -186,14 +186,14 @@
     }
 
     public synchronized void forgetNode(String nodeId) {
-        LOGGER.debug("forgetNode: " + nodeId);
+        LOGGER.trace("+forgetNode: {}", nodeId);
         nodeSpecificDefinedMap.remove(nodeId);
         nodeSpecificDefaultMap.remove(nodeId);
         nodeEffectiveMaps.remove(nodeId);
     }
 
     private Map<IOption, Object> createNodeSpecificMap(String nodeId) {
-        LOGGER.debug("createNodeSpecificMap: " + nodeId);
+        LOGGER.trace("+createNodeSpecificMap: {}", nodeId);
         return Collections.synchronizedMap(new HashMap<>());
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 2522ebe..1616343 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -24,8 +24,11 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.net.InetSocketAddress;
 import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -47,6 +50,7 @@
 import org.apache.hyracks.api.dataflow.connectors.ConnectorPolicyFactory;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -81,6 +85,7 @@
         NOTIFY_TASK_COMPLETE,
         NOTIFY_TASK_FAILURE,
         NODE_HEARTBEAT,
+        NODE_HEARTBEAT_ACK,
         REPORT_PROFILE,
         REGISTER_PARTITION_PROVIDER,
         REGISTER_PARTITION_REQUEST,
@@ -383,10 +388,12 @@
 
         private final String nodeId;
         private final HeartbeatData hbData;
+        private final InetSocketAddress ncAddress;
 
-        public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
+        public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData, InetSocketAddress ncAddress) {
             this.nodeId = nodeId;
             this.hbData = hbData;
+            this.ncAddress = ncAddress;
         }
 
         @Override
@@ -402,21 +409,27 @@
             return hbData;
         }
 
+        public InetSocketAddress getNcAddress() {
+            return ncAddress;
+        }
+
         public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
             ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
-            DataInputStream dis = new DataInputStream(bais);
+            ObjectInputStream dis = new ObjectInputStream(bais);
 
-            String nodeId = dis.readUTF();
             HeartbeatData hbData = new HeartbeatData();
             hbData.readFields(dis);
-            return new NodeHeartbeatFunction(nodeId, hbData);
+            String nodeId = dis.readUTF();
+            InetSocketAddress ncAddress = (InetSocketAddress) dis.readObject();
+            return new NodeHeartbeatFunction(nodeId, hbData, ncAddress);
         }
 
         public static void serialize(OutputStream out, Object object) throws Exception {
             NodeHeartbeatFunction fn = (NodeHeartbeatFunction) object;
-            DataOutputStream dos = new DataOutputStream(out);
-            dos.writeUTF(fn.nodeId);
+            ObjectOutputStream dos = new ObjectOutputStream(out);
             fn.hbData.write(dos);
+            dos.writeUTF(fn.nodeId);
+            dos.writeObject(fn.ncAddress);
         }
     }
 
@@ -1332,6 +1345,25 @@
         }
     }
 
+    public static class NodeHeartbeatAckFunction extends CCIdentifiedFunction {
+        private static final long serialVersionUID = 1L;
+        private final HyracksDataException exception;
+
+        public NodeHeartbeatAckFunction(CcId ccId, HyracksDataException e) {
+            super(ccId);
+            exception = e;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.NODE_HEARTBEAT_ACK;
+        }
+
+        public HyracksDataException getException() {
+            return exception;
+        }
+    }
+
     public static class ShutdownRequestFunction extends CCIdentifiedFunction {
         private static final long serialVersionUID = 1L;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 06904c2..13a08b2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
+import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
@@ -101,8 +102,8 @@
     }
 
     @Override
-    public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
-        NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData);
+    public void nodeHeartbeat(String id, HeartbeatData hbData, InetSocketAddress ncAddress) throws Exception {
+        NodeHeartbeatFunction fn = new NodeHeartbeatFunction(id, hbData, ncAddress);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index dd10020..d32ee32 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -153,6 +154,11 @@
         ipcHandle.send(-1, new CCNCFunctions.PingFunction(ccId), null);
     }
 
+    @Override
+    public void heartbeatAck(CcId ccId, HyracksDataException e) throws IPCException {
+        ipcHandle.send(-1, new CCNCFunctions.NodeHeartbeatAckFunction(ccId, e), null);
+    }
+
     public InetSocketAddress getAddress() {
         return ipcHandle.getRemoteAddress();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index 77623c2..d1f7d5a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -36,13 +36,15 @@
     private static final long REGISTRATION_RESPONSE_POLL_PERIOD = TimeUnit.SECONDS.toMillis(1);
 
     private final IClusterController ccs;
+    private final InetSocketAddress ccAddress;
     private boolean registrationPending;
     private boolean registrationCompleted;
     private Exception registrationException;
     private NodeParameters nodeParameters;
 
-    CcConnection(IClusterController ccs) {
+    CcConnection(IClusterController ccs, InetSocketAddress ccAddress) {
         this.ccs = ccs;
+        this.ccAddress = ccAddress;
     }
 
     @Override
@@ -86,19 +88,17 @@
         return nodeParameters;
     }
 
-    public synchronized void notifyConnectionRestored(NodeControllerService ncs, InetSocketAddress ccAddress)
-            throws InterruptedException {
-        if (registrationCompleted) {
-            registrationCompleted = false;
-            ncs.getExecutor().submit(() -> {
-                try {
-                    return ncs.registerNode(this, ccAddress);
-                } catch (Exception e) {
-                    LOGGER.log(Level.ERROR, "Failed registering with cc", e);
-                    throw new IllegalStateException(e);
-                }
-            });
-        }
+    public synchronized void forceReregister(NodeControllerService ncs) throws InterruptedException {
+        registrationCompleted = false;
+        ncs.getExecutor().submit(() -> {
+            try {
+                return ncs.registerNode(this);
+            } catch (Exception e) {
+                LOGGER.log(Level.ERROR, "Failed registering with cc", e);
+                throw new IllegalStateException(e);
+            }
+        });
+
         while (!registrationCompleted) {
             wait();
         }
@@ -108,4 +108,8 @@
         registrationCompleted = true;
         notifyAll();
     }
+
+    public InetSocketAddress getCcAddress() {
+        return ccAddress;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index cdc16fa..3bc9710 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -20,6 +20,8 @@
 
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
+import org.apache.hyracks.control.nc.task.HeartbeatAckTask;
+import org.apache.hyracks.control.nc.task.PingTask;
 import org.apache.hyracks.control.nc.task.ShutdownTask;
 import org.apache.hyracks.control.nc.task.ThreadDumpTask;
 import org.apache.hyracks.control.nc.work.AbortAllJobsWork;
@@ -28,7 +30,6 @@
 import org.apache.hyracks.control.nc.work.CleanupJobletWork;
 import org.apache.hyracks.control.nc.work.DeployBinaryWork;
 import org.apache.hyracks.control.nc.work.DeployJobSpecWork;
-import org.apache.hyracks.control.nc.task.PingTask;
 import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 import org.apache.hyracks.control.nc.work.StartTasksWork;
 import org.apache.hyracks.control.nc.work.StateDumpWork;
@@ -139,6 +140,11 @@
                 ncs.getExecutor().submit(new PingTask(ncs, pcf.getCcId()));
                 return;
 
+            case NODE_HEARTBEAT_ACK:
+                final CCNCFunctions.NodeHeartbeatAckFunction nbaf = (CCNCFunctions.NodeHeartbeatAckFunction) fn;
+                ncs.getExecutor().submit(new HeartbeatAckTask(ncs, nbaf.getCcId(), nbaf.getException()));
+                return;
+
             default:
                 throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index a189ac5..653d6e01 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -59,7 +59,6 @@
 import org.apache.hyracks.api.result.IResultPartitionManager;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.util.CleanupUtils;
-import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.control.common.NodeControllerData;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.config.ConfigManager;
@@ -75,7 +74,7 @@
 import org.apache.hyracks.control.common.work.WorkQueue;
 import org.apache.hyracks.control.nc.application.NCServiceContext;
 import org.apache.hyracks.control.nc.heartbeat.HeartbeatComputeTask;
-import org.apache.hyracks.control.nc.heartbeat.HeartbeatTask;
+import org.apache.hyracks.control.nc.heartbeat.HeartbeatManager;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.control.nc.net.MessagingNetworkManager;
 import org.apache.hyracks.control.nc.net.NetworkManager;
@@ -144,7 +143,7 @@
 
     private ExecutorService executor;
 
-    private Map<CcId, Thread> heartbeatThreads = new ConcurrentHashMap<>();
+    private Map<CcId, HeartbeatManager> heartbeatManagers = new ConcurrentHashMap<>();
 
     private Map<CcId, Timer> ccTimers = new ConcurrentHashMap<>();
 
@@ -348,7 +347,7 @@
                     // we need to re-register in case of NC -> CC connection reset
                     final CcConnection ccConnection = getCcConnection(ccAddressMap.get(ccAddress));
                     try {
-                        ccConnection.notifyConnectionRestored(NodeControllerService.this, ccAddress);
+                        ccConnection.forceReregister(NodeControllerService.this);
                     } catch (InterruptedException e) {
                         Thread.currentThread().interrupt();
                         throw new IPCException(e);
@@ -357,7 +356,7 @@
             };
             ClusterControllerRemoteProxy ccProxy = new ClusterControllerRemoteProxy(
                     ipc.getHandle(ccAddress, ncConfig.getClusterConnectRetries(), 1, ipcEventListener));
-            return registerNode(new CcConnection(ccProxy), ccAddress);
+            return registerNode(new CcConnection(ccProxy, ccAddress));
         }
     }
 
@@ -395,8 +394,10 @@
                         () -> String.valueOf(e));
             }
             getWorkQueue().scheduleAndSync(new AbortAllJobsWork(this, ccId));
-            Thread hbThread = heartbeatThreads.remove(ccId);
-            hbThread.interrupt();
+            HeartbeatManager hbMgr = heartbeatManagers.remove(ccId);
+            if (hbMgr != null) {
+                hbMgr.shutdown();
+            }
             Timer ccTimer = ccTimers.remove(ccId);
             if (ccTimer != null) {
                 ccTimer.cancel();
@@ -406,13 +407,13 @@
         }
     }
 
-    public CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception {
+    public CcId registerNode(CcConnection ccc) throws Exception {
         LOGGER.info("Registering with Cluster Controller {}", ccc);
         int registrationId = nextRegistrationId.incrementAndGet();
         pendingRegistrations.put(registrationId, ccc);
         CcId ccId = ccc.registerNode(nodeRegistration, registrationId);
         ccMap.put(ccId, ccc);
-        ccAddressMap.put(ccAddress, ccId);
+        ccAddressMap.put(ccc.getCcAddress(), ccId);
         Serializable distributedState = ccc.getNodeParameters().getDistributedState();
         if (distributedState != null) {
             getDistributedState().put(ccId, distributedState);
@@ -420,15 +421,8 @@
         IClusterController ccs = ccc.getClusterControllerService();
         NodeParameters nodeParameters = ccc.getNodeParameters();
         // Start heartbeat generator.
-        if (!heartbeatThreads.containsKey(ccId)) {
-            Thread heartbeatThread = new Thread(
-                    new HeartbeatTask(getId(), hbTask.getHeartbeatData(), ccs, nodeParameters.getHeartbeatPeriod()),
-                    id + "-Heartbeat");
-            heartbeatThread.setPriority(Thread.MAX_PRIORITY);
-            heartbeatThread.setDaemon(true);
-            heartbeatThread.start();
-            heartbeatThreads.put(ccId, heartbeatThread);
-        }
+        heartbeatManagers.computeIfAbsent(ccId, newCcId -> HeartbeatManager.init(this, ccc, hbTask.getHeartbeatData(),
+                nodeRegistration.getNodeControllerAddress()));
         if (!ccTimers.containsKey(ccId) && nodeParameters.getProfileDumpPeriod() > 0) {
             Timer ccTimer = new Timer("Timer-" + ccId, true);
             // Schedule profile dump generator.
@@ -506,10 +500,7 @@
              * Stop heartbeats only after NC has stopped to avoid false node failure detection
              * on CC if an NC takes a long time to stop.
              */
-            heartbeatThreads.values().parallelStream().forEach(t -> {
-                t.interrupt();
-                InvokeUtil.doUninterruptibly(() -> t.join(1000));
-            });
+            heartbeatManagers.values().parallelStream().forEach(HeartbeatManager::shutdown);
             synchronized (ccLock) {
                 ccMap.values().parallelStream().forEach(cc -> {
                     try {
@@ -673,6 +664,14 @@
         return application.getApplicationContext();
     }
 
+    public HeartbeatManager getHeartbeatManager(CcId ccId) {
+        return heartbeatManagers.get(ccId);
+    }
+
+    public NodeRegistration getNodeRegistration() {
+        return nodeRegistration;
+    }
+
     private class ProfileDumpTask extends TimerTask {
         private final IClusterController cc;
         private final CcId ccId;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
index 81ceea5..cd051e3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
@@ -33,7 +33,6 @@
 import org.apache.hyracks.control.nc.io.profiling.IOCounterFactory;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
new file mode 100644
index 0000000..0ea6399
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatManager.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.heartbeat;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
+import org.apache.hyracks.control.nc.CcConnection;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class HeartbeatManager {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final NodeControllerService ncs;
+    private final CcConnection ccc;
+    private final Thread hbThread;
+    private final CcId ccId;
+
+    private HeartbeatManager(NodeControllerService ncs, CcConnection ccc, HeartbeatData hbData,
+            InetSocketAddress ncAddress) {
+        this.ncs = ncs;
+        this.ccc = ccc;
+        hbThread = new Thread(new HeartbeatTask(ncs.getId(), hbData, ccc.getClusterControllerService(),
+                ccc.getNodeParameters().getHeartbeatPeriod(), ncAddress), ncs.getId() + "-Heartbeat");
+        hbThread.setPriority(Thread.MAX_PRIORITY);
+        hbThread.setDaemon(true);
+        ccId = ccc.getCcId();
+    }
+
+    public static HeartbeatManager init(NodeControllerService ncs, CcConnection ccc, HeartbeatData hbData,
+            InetSocketAddress ncAddress) {
+        HeartbeatManager hbMgr = new HeartbeatManager(ncs, ccc, hbData, ncAddress);
+        hbMgr.start();
+        return hbMgr;
+    }
+
+    public void shutdown() {
+        hbThread.interrupt();
+    }
+
+    public void start() {
+        hbThread.start();
+    }
+
+    public void notifyAck(HyracksDataException exception) {
+        // TODO: we should also reregister in case of no ack
+        LOGGER.debug("ack rec'd from {} w/ exception: {}", ccId::toString, () -> String.valueOf(exception));
+        if (exception != null && exception.matches(ErrorCode.HYRACKS, ErrorCode.NO_SUCH_NODE)) {
+            LOGGER.info("{} indicates it does not recognize us; force a reconnect", ccId);
+            try {
+                ccc.forceReregister(ncs);
+            } catch (Exception e) {
+                LOGGER.warn("ignoring exception attempting to reregister with {}", ccId, e);
+            }
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java
index 6d08fb1..4af7e4e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.nc.heartbeat;
 
+import java.net.InetSocketAddress;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -36,12 +37,15 @@
     private final Semaphore delayBlock = new Semaphore(0);
     private final IClusterController cc;
     private final long heartbeatPeriodNanos;
+    private final InetSocketAddress ncAddress;
 
-    public HeartbeatTask(String ncId, HeartbeatData hbData, IClusterController cc, long heartbeatPeriod) {
+    public HeartbeatTask(String ncId, HeartbeatData hbData, IClusterController cc, long heartbeatPeriod,
+            InetSocketAddress ncAddress) {
         this.ncId = ncId;
         this.hbData = hbData;
         this.cc = cc;
         this.heartbeatPeriodNanos = TimeUnit.MILLISECONDS.toNanos(heartbeatPeriod);
+        this.ncAddress = ncAddress;
     }
 
     @Override
@@ -67,18 +71,15 @@
     private boolean execute() throws InterruptedException {
         try {
             synchronized (hbData) {
-                cc.nodeHeartbeat(ncId, hbData);
+                cc.nodeHeartbeat(ncId, hbData, ncAddress);
             }
             LOGGER.trace("Successfully sent heartbeat");
             return true;
         } catch (InterruptedException e) {
             throw e;
         } catch (Exception e) {
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.log(Level.DEBUG, "Exception sending heartbeat; will retry after 1s", e);
-            } else {
-                LOGGER.log(Level.ERROR, "Exception sending heartbeat; will retry after 1s: " + e.toString());
-            }
+            LOGGER.log(Level.DEBUG, "Exception sending heartbeat; will retry after 1s", e);
+            LOGGER.log(Level.WARN, "Exception sending heartbeat; will retry after 1s: " + e.toString());
             return false;
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/HeartbeatAckTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/HeartbeatAckTask.java
new file mode 100644
index 0000000..f43c029
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/HeartbeatAckTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.task;
+
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class HeartbeatAckTask implements Runnable {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final NodeControllerService ncs;
+    private final CcId ccId;
+    private final HyracksDataException exception;
+
+    public HeartbeatAckTask(NodeControllerService ncs, CcId ccId, HyracksDataException exception) {
+        this.ncs = ncs;
+        this.ccId = ccId;
+        this.exception = exception;
+    }
+
+    @Override
+    public void run() {
+        try {
+            ncs.getHeartbeatManager(ccId).notifyAck(exception);
+        } catch (Exception e) {
+            LOGGER.info("failure processing heartbeat ack from {}", ccId, e);
+        }
+    }
+}
\ No newline at end of file
