diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
index 0f08a22..992b85b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -91,7 +91,9 @@
             ncDataMap.put(nc, getNcDiagnosticFutures(nc));
         }
         ObjectNode result = OBJECT_MAPPER.createObjectNode();
-        result.putPOJO("cc", resolveFutures(ccFutureData));
+        if (!ccFutureData.isEmpty()) {
+            result.putPOJO("cc", resolveFutures(ccFutureData));
+        }
         List<Map<String, ?>> ncList = new ArrayList<>();
         for (Map.Entry<String, Map<String, Future<JsonNode>>> entry : ncDataMap.entrySet()) {
             final Map<String, JsonNode> ncMap = resolveFutures(entry.getValue());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
index 8ca0947..8cb8cb6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
@@ -41,7 +41,6 @@
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import io.netty.handler.codec.http.HttpResponseStatus;
 
 public class NodeControllerDetailsApiServlet extends ClusterApiServlet {
@@ -142,7 +141,10 @@
 
     protected ObjectNode processNodeStats(IHyracksClientConnection hcc, String node) throws Exception {
         final String details = checkNullDetail(node, hcc.getNodeDetailsJSON(node, true, false));
-        ObjectNode json = (ObjectNode) OBJECT_MAPPER.readTree(details);
+        return processNodeDetailsJSON((ObjectNode) OBJECT_MAPPER.readTree(details));
+    }
+
+    protected ObjectNode processNodeDetailsJSON(ObjectNode json) {
         int index = json.get("rrd-ptr").asInt() - 1;
         json.remove("rrd-ptr");
 
@@ -150,7 +152,6 @@
         for (Iterator<String> iter = json.fieldNames(); iter.hasNext();) {
             keys.add(iter.next());
         }
-
         final ArrayNode gcNames = (ArrayNode) json.get("gc-names");
         final ArrayNode gcCollectionTimes = (ArrayNode) json.get("gc-collection-times");
         final ArrayNode gcCollectionCounts = (ArrayNode) json.get("gc-collection-counts");
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 06c92dd..84cb4bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -65,7 +65,8 @@
         switch (fn.getFunctionId()) {
             case REGISTER_NODE:
                 CCNCFunctions.RegisterNodeFunction rnf = (CCNCFunctions.RegisterNodeFunction) fn;
-                ccs.getWorkQueue().schedule(new RegisterNodeWork(ccs, rnf.getNodeRegistration()));
+                ccs.getWorkQueue()
+                        .schedule(new RegisterNodeWork(ccs, rnf.getNodeRegistration(), rnf.getRegistrationId()));
                 break;
             case UNREGISTER_NODE:
                 CCNCFunctions.UnregisterNodeFunction unf = (CCNCFunctions.UnregisterNodeFunction) fn;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index 415ca81..5c8bbdd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -18,338 +18,20 @@
  */
 package org.apache.hyracks.control.cc;
 
-import java.io.File;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.resource.NodeCapacity;
-import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.common.NodeControllerData;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
-import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
-import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
-import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo;
 import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-public class NodeControllerState {
-    private static final int RRD_SIZE = 720;
+public class NodeControllerState extends NodeControllerData {
 
     private final NodeControllerRemoteProxy nodeController;
 
-    private final NCConfig ncConfig;
-
-    private final NetworkAddress dataPort;
-
-    private final NetworkAddress datasetPort;
-
-    private final NetworkAddress messagingPort;
-
-    private final Set<JobId> activeJobIds;
-
-    private final String osName;
-
-    private final String arch;
-
-    private final String osVersion;
-
-    private final int nProcessors;
-
-    private final String vmName;
-
-    private final String vmVersion;
-
-    private final String vmVendor;
-
-    private final String classpath;
-
-    private final String libraryPath;
-
-    private final String bootClasspath;
-
-    private final List<String> inputArguments;
-
-    private final Map<String, String> systemProperties;
-
-    private final int pid;
-
-    private final HeartbeatSchema hbSchema;
-
-    private final long[] hbTime;
-
-    private final long[] heapInitSize;
-
-    private final long[] heapUsedSize;
-
-    private final long[] heapCommittedSize;
-
-    private final long[] heapMaxSize;
-
-    private final long[] nonheapInitSize;
-
-    private final long[] nonheapUsedSize;
-
-    private final long[] nonheapCommittedSize;
-
-    private final long[] nonheapMaxSize;
-
-    private final int[] threadCount;
-
-    private final int[] peakThreadCount;
-
-    private final double[] systemLoadAverage;
-
-    private final String[] gcNames;
-
-    private final long[][] gcCollectionCounts;
-
-    private final long[][] gcCollectionTimes;
-
-    private final long[] netPayloadBytesRead;
-
-    private final long[] netPayloadBytesWritten;
-
-    private final long[] netSignalingBytesRead;
-
-    private final long[] netSignalingBytesWritten;
-
-    private final long[] datasetNetPayloadBytesRead;
-
-    private final long[] datasetNetPayloadBytesWritten;
-
-    private final long[] datasetNetSignalingBytesRead;
-
-    private final long[] datasetNetSignalingBytesWritten;
-
-    private final long[] ipcMessagesSent;
-
-    private final long[] ipcMessageBytesSent;
-
-    private final long[] ipcMessagesReceived;
-
-    private final long[] ipcMessageBytesReceived;
-
-    private final long[] diskReads;
-
-    private final long[] diskWrites;
-
-    private int rrdPtr;
-
-    private volatile long lastHeartbeatNanoTime;
-
-    private NodeCapacity capacity;
-
     public NodeControllerState(NodeControllerRemoteProxy nodeController, NodeRegistration reg) {
+        super(reg);
         this.nodeController = nodeController;
-        ncConfig = reg.getNCConfig();
-        dataPort = reg.getDataPort();
-        datasetPort = reg.getDatasetPort();
-        messagingPort = reg.getMessagingPort();
-        activeJobIds = new HashSet<>();
-
-        osName = reg.getOSName();
-        arch = reg.getArch();
-        osVersion = reg.getOSVersion();
-        nProcessors = reg.getNProcessors();
-        vmName = reg.getVmName();
-        vmVersion = reg.getVmVersion();
-        vmVendor = reg.getVmVendor();
-        classpath = reg.getClasspath();
-        libraryPath = reg.getLibraryPath();
-        bootClasspath = reg.getBootClasspath();
-        inputArguments = reg.getInputArguments();
-        systemProperties = reg.getSystemProperties();
-        pid = reg.getPid();
-
-        hbSchema = reg.getHeartbeatSchema();
-
-        hbTime = new long[RRD_SIZE];
-        heapInitSize = new long[RRD_SIZE];
-        heapUsedSize = new long[RRD_SIZE];
-        heapCommittedSize = new long[RRD_SIZE];
-        heapMaxSize = new long[RRD_SIZE];
-        nonheapInitSize = new long[RRD_SIZE];
-        nonheapUsedSize = new long[RRD_SIZE];
-        nonheapCommittedSize = new long[RRD_SIZE];
-        nonheapMaxSize = new long[RRD_SIZE];
-        threadCount = new int[RRD_SIZE];
-        peakThreadCount = new int[RRD_SIZE];
-        systemLoadAverage = new double[RRD_SIZE];
-        GarbageCollectorInfo[] gcInfos = hbSchema.getGarbageCollectorInfos();
-        int gcN = gcInfos.length;
-        gcNames = new String[gcN];
-        for (int i = 0; i < gcN; ++i) {
-            gcNames[i] = gcInfos[i].getName();
-        }
-        gcCollectionCounts = new long[gcN][RRD_SIZE];
-        gcCollectionTimes = new long[gcN][RRD_SIZE];
-        netPayloadBytesRead = new long[RRD_SIZE];
-        netPayloadBytesWritten = new long[RRD_SIZE];
-        netSignalingBytesRead = new long[RRD_SIZE];
-        netSignalingBytesWritten = new long[RRD_SIZE];
-        datasetNetPayloadBytesRead = new long[RRD_SIZE];
-        datasetNetPayloadBytesWritten = new long[RRD_SIZE];
-        datasetNetSignalingBytesRead = new long[RRD_SIZE];
-        datasetNetSignalingBytesWritten = new long[RRD_SIZE];
-        ipcMessagesSent = new long[RRD_SIZE];
-        ipcMessageBytesSent = new long[RRD_SIZE];
-        ipcMessagesReceived = new long[RRD_SIZE];
-        ipcMessageBytesReceived = new long[RRD_SIZE];
-
-        diskReads = new long[RRD_SIZE];
-        diskWrites = new long[RRD_SIZE];
-
-        rrdPtr = 0;
-        capacity = reg.getCapacity();
-        touchHeartbeat();
-    }
-
-    public synchronized void notifyHeartbeat(HeartbeatData hbData) {
-        touchHeartbeat();
-        hbTime[rrdPtr] = System.currentTimeMillis();
-        heapInitSize[rrdPtr] = hbData.heapInitSize;
-        heapUsedSize[rrdPtr] = hbData.heapUsedSize;
-        heapCommittedSize[rrdPtr] = hbData.heapCommittedSize;
-        heapMaxSize[rrdPtr] = hbData.heapMaxSize;
-        nonheapInitSize[rrdPtr] = hbData.nonheapInitSize;
-        nonheapUsedSize[rrdPtr] = hbData.nonheapUsedSize;
-        nonheapCommittedSize[rrdPtr] = hbData.nonheapCommittedSize;
-        nonheapMaxSize[rrdPtr] = hbData.nonheapMaxSize;
-        threadCount[rrdPtr] = hbData.threadCount;
-        peakThreadCount[rrdPtr] = hbData.peakThreadCount;
-        systemLoadAverage[rrdPtr] = hbData.systemLoadAverage;
-        int gcN = hbSchema.getGarbageCollectorInfos().length;
-        for (int i = 0; i < gcN; ++i) {
-            gcCollectionCounts[i][rrdPtr] = hbData.gcCollectionCounts[i];
-            gcCollectionTimes[i][rrdPtr] = hbData.gcCollectionTimes[i];
-        }
-        netPayloadBytesRead[rrdPtr] = hbData.netPayloadBytesRead;
-        netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
-        netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
-        netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
-        datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
-        datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
-        datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
-        datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
-        ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
-        ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
-        ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
-        ipcMessageBytesReceived[rrdPtr] = hbData.ipcMessageBytesReceived;
-        diskReads[rrdPtr] = hbData.diskReads;
-        diskWrites[rrdPtr] = hbData.diskWrites;
-        rrdPtr = (rrdPtr + 1) % RRD_SIZE;
-    }
-
-    public void touchHeartbeat() {
-        lastHeartbeatNanoTime = System.nanoTime();
-    }
-
-    public long nanosSinceLastHeartbeat() {
-        return System.nanoTime() - lastHeartbeatNanoTime;
     }
 
     public NodeControllerRemoteProxy getNodeController() {
         return nodeController;
     }
-
-    public NCConfig getNCConfig() {
-        return ncConfig;
-    }
-
-    public Set<JobId> getActiveJobIds() {
-        return activeJobIds;
-    }
-
-    public NetworkAddress getDataPort() {
-        return dataPort;
-    }
-
-    public NetworkAddress getDatasetPort() {
-        return datasetPort;
-    }
-
-    public NetworkAddress getMessagingPort() {
-        return messagingPort;
-    }
-
-    public NodeCapacity getCapacity() {
-        return capacity;
-    }
-
-    public synchronized ObjectNode toSummaryJSON() {
-        ObjectMapper om = new ObjectMapper();
-        ObjectNode o = om.createObjectNode();
-        o.put("node-id", ncConfig.getNodeId());
-        o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
-        o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
-
-        return o;
-    }
-
-    public synchronized ObjectNode toDetailedJSON(boolean includeStats, boolean includeConfig) {
-        ObjectMapper om = new ObjectMapper();
-        ObjectNode o = om.createObjectNode();
-
-        o.put("node-id", ncConfig.getNodeId());
-
-        if (includeConfig) {
-            o.put("os-name", osName);
-            o.put("arch", arch);
-            o.put("os-version", osVersion);
-            o.put("num-processors", nProcessors);
-            o.put("vm-name", vmName);
-            o.put("vm-version", vmVersion);
-            o.put("vm-vendor", vmVendor);
-            o.putPOJO("classpath", classpath.split(File.pathSeparator));
-            o.putPOJO("library-path", libraryPath.split(File.pathSeparator));
-            o.putPOJO("boot-classpath", bootClasspath.split(File.pathSeparator));
-            o.putPOJO("input-arguments", inputArguments);
-            o.putPOJO("system-properties", systemProperties);
-            o.put("pid", pid);
-        }
-        if (includeStats) {
-            o.putPOJO("date", new Date());
-            o.put("rrd-ptr", rrdPtr);
-            o.putPOJO("heartbeat-times", hbTime);
-            o.putPOJO("heap-init-sizes", heapInitSize);
-            o.putPOJO("heap-used-sizes", heapUsedSize);
-            o.putPOJO("heap-committed-sizes", heapCommittedSize);
-            o.putPOJO("heap-max-sizes", heapMaxSize);
-            o.putPOJO("nonheap-init-sizes", nonheapInitSize);
-            o.putPOJO("nonheap-used-sizes", nonheapUsedSize);
-            o.putPOJO("nonheap-committed-sizes", nonheapCommittedSize);
-            o.putPOJO("nonheap-max-sizes", nonheapMaxSize);
-            o.putPOJO("application-memory-budget", capacity.getMemoryByteSize());
-            o.putPOJO("application-cpu-core-budget", capacity.getCores());
-            o.putPOJO("thread-counts", threadCount);
-            o.putPOJO("peak-thread-counts", peakThreadCount);
-            o.putPOJO("system-load-averages", systemLoadAverage);
-            o.putPOJO("gc-names", gcNames);
-            o.putPOJO("gc-collection-counts", gcCollectionCounts);
-            o.putPOJO("gc-collection-times", gcCollectionTimes);
-            o.putPOJO("net-payload-bytes-read", netPayloadBytesRead);
-            o.putPOJO("net-payload-bytes-written", netPayloadBytesWritten);
-            o.putPOJO("net-signaling-bytes-read", netSignalingBytesRead);
-            o.putPOJO("net-signaling-bytes-written", netSignalingBytesWritten);
-            o.putPOJO("dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
-            o.putPOJO("dataset-net-payload-bytes-written", datasetNetPayloadBytesWritten);
-            o.putPOJO("dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
-            o.putPOJO("dataset-net-signaling-bytes-written", datasetNetSignalingBytesWritten);
-            o.putPOJO("ipc-messages-sent", ipcMessagesSent);
-            o.putPOJO("ipc-message-bytes-sent", ipcMessageBytesSent);
-            o.putPOJO("ipc-messages-received", ipcMessagesReceived);
-            o.putPOJO("ipc-message-bytes-received", ipcMessageBytesReceived);
-            o.putPOJO("disk-reads", diskReads);
-            o.putPOJO("disk-writes", diskWrites);
-        }
-
-        return o;
-    }
-
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java
index d827eba..b600e5d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GatherStateDumpsWork.java
@@ -20,7 +20,6 @@
 package org.apache.hyracks.control.cc.work;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 96f5f1b..de7d941 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -41,10 +41,12 @@
 
     private final ClusterControllerService ccs;
     private final NodeRegistration reg;
+    private final int registrationId;
 
-    public RegisterNodeWork(ClusterControllerService ccs, NodeRegistration reg) {
+    public RegisterNodeWork(ClusterControllerService ccs, NodeRegistration reg, int registrationId) {
         this.ccs = ccs;
         this.reg = reg;
+        this.registrationId = registrationId;
     }
 
     @Override
@@ -71,7 +73,7 @@
             params.setDistributedState(ccs.getContext().getDistributedState());
             params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
             params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
-            params.setRegistrationId(reg.getRegistrationId());
+            params.setRegistrationId(registrationId);
             result = new CCNCFunctions.NodeRegistrationResult(params, null);
         } catch (Exception e) {
             LOGGER.log(Level.WARN, "Node registration failed", e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
index bdb2aed..e13e23b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -79,5 +79,9 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
new file mode 100644
index 0000000..8dfc729
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common;
+
+import static org.apache.hyracks.util.JSONUtil.put;
+
+import java.io.File;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.common.controllers.NodeRegistration;
+import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
+import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class NodeControllerData {
+
+    private static final int RRD_SIZE = 720;
+
+    private final NCConfig ncConfig;
+
+    private final NetworkAddress dataPort;
+
+    private final NetworkAddress datasetPort;
+
+    private final NetworkAddress messagingPort;
+
+    private final Set<JobId> activeJobIds;
+
+    private final String osName;
+
+    private final String arch;
+
+    private final String osVersion;
+
+    private final int nProcessors;
+
+    private final String vmName;
+
+    private final String vmVersion;
+
+    private final String vmVendor;
+
+    private final String classpath;
+
+    private final String libraryPath;
+
+    private final String bootClasspath;
+
+    private final List<String> inputArguments;
+
+    private final Map<String, String> systemProperties;
+
+    private final int pid;
+
+    private final HeartbeatSchema hbSchema;
+
+    private final long[] hbTime;
+
+    private final long[] heapInitSize;
+
+    private final long[] heapUsedSize;
+
+    private final long[] heapCommittedSize;
+
+    private final long[] heapMaxSize;
+
+    private final long[] nonheapInitSize;
+
+    private final long[] nonheapUsedSize;
+
+    private final long[] nonheapCommittedSize;
+
+    private final long[] nonheapMaxSize;
+
+    private final int[] threadCount;
+
+    private final int[] peakThreadCount;
+
+    private final double[] systemLoadAverage;
+
+    private final String[] gcNames;
+
+    private final long[][] gcCollectionCounts;
+
+    private final long[][] gcCollectionTimes;
+
+    private final long[] netPayloadBytesRead;
+
+    private final long[] netPayloadBytesWritten;
+
+    private final long[] netSignalingBytesRead;
+
+    private final long[] netSignalingBytesWritten;
+
+    private final long[] datasetNetPayloadBytesRead;
+
+    private final long[] datasetNetPayloadBytesWritten;
+
+    private final long[] datasetNetSignalingBytesRead;
+
+    private final long[] datasetNetSignalingBytesWritten;
+
+    private final long[] ipcMessagesSent;
+
+    private final long[] ipcMessageBytesSent;
+
+    private final long[] ipcMessagesReceived;
+
+    private final long[] ipcMessageBytesReceived;
+
+    private final long[] diskReads;
+
+    private final long[] diskWrites;
+
+    private int rrdPtr;
+
+    private volatile long lastHeartbeatNanoTime;
+
+    private NodeCapacity capacity;
+
+    public NodeControllerData(NodeRegistration reg) {
+        ncConfig = reg.getNCConfig();
+        dataPort = reg.getDataPort();
+        datasetPort = reg.getDatasetPort();
+        messagingPort = reg.getMessagingPort();
+        activeJobIds = new HashSet<>();
+
+        osName = reg.getOSName();
+        arch = reg.getArch();
+        osVersion = reg.getOSVersion();
+        nProcessors = reg.getNProcessors();
+        vmName = reg.getVmName();
+        vmVersion = reg.getVmVersion();
+        vmVendor = reg.getVmVendor();
+        classpath = reg.getClasspath();
+        libraryPath = reg.getLibraryPath();
+        bootClasspath = reg.getBootClasspath();
+        inputArguments = reg.getInputArguments();
+        systemProperties = reg.getSystemProperties();
+        pid = reg.getPid();
+
+        hbSchema = reg.getHeartbeatSchema();
+
+        hbTime = new long[RRD_SIZE];
+        heapInitSize = new long[RRD_SIZE];
+        heapUsedSize = new long[RRD_SIZE];
+        heapCommittedSize = new long[RRD_SIZE];
+        heapMaxSize = new long[RRD_SIZE];
+        nonheapInitSize = new long[RRD_SIZE];
+        nonheapUsedSize = new long[RRD_SIZE];
+        nonheapCommittedSize = new long[RRD_SIZE];
+        nonheapMaxSize = new long[RRD_SIZE];
+        threadCount = new int[RRD_SIZE];
+        peakThreadCount = new int[RRD_SIZE];
+        systemLoadAverage = new double[RRD_SIZE];
+        HeartbeatSchema.GarbageCollectorInfo[] gcInfos = hbSchema.getGarbageCollectorInfos();
+        int gcN = gcInfos.length;
+        gcNames = new String[gcN];
+        for (int i = 0; i < gcN; ++i) {
+            gcNames[i] = gcInfos[i].getName();
+        }
+        gcCollectionCounts = new long[gcN][RRD_SIZE];
+        gcCollectionTimes = new long[gcN][RRD_SIZE];
+        netPayloadBytesRead = new long[RRD_SIZE];
+        netPayloadBytesWritten = new long[RRD_SIZE];
+        netSignalingBytesRead = new long[RRD_SIZE];
+        netSignalingBytesWritten = new long[RRD_SIZE];
+        datasetNetPayloadBytesRead = new long[RRD_SIZE];
+        datasetNetPayloadBytesWritten = new long[RRD_SIZE];
+        datasetNetSignalingBytesRead = new long[RRD_SIZE];
+        datasetNetSignalingBytesWritten = new long[RRD_SIZE];
+        ipcMessagesSent = new long[RRD_SIZE];
+        ipcMessageBytesSent = new long[RRD_SIZE];
+        ipcMessagesReceived = new long[RRD_SIZE];
+        ipcMessageBytesReceived = new long[RRD_SIZE];
+
+        diskReads = new long[RRD_SIZE];
+        diskWrites = new long[RRD_SIZE];
+
+        rrdPtr = 0;
+        capacity = reg.getCapacity();
+        touchHeartbeat();
+    }
+
+    public synchronized void notifyHeartbeat(HeartbeatData hbData) {
+        touchHeartbeat();
+        hbTime[rrdPtr] = System.currentTimeMillis();
+        heapInitSize[rrdPtr] = hbData.heapInitSize;
+        heapUsedSize[rrdPtr] = hbData.heapUsedSize;
+        heapCommittedSize[rrdPtr] = hbData.heapCommittedSize;
+        heapMaxSize[rrdPtr] = hbData.heapMaxSize;
+        nonheapInitSize[rrdPtr] = hbData.nonheapInitSize;
+        nonheapUsedSize[rrdPtr] = hbData.nonheapUsedSize;
+        nonheapCommittedSize[rrdPtr] = hbData.nonheapCommittedSize;
+        nonheapMaxSize[rrdPtr] = hbData.nonheapMaxSize;
+        threadCount[rrdPtr] = hbData.threadCount;
+        peakThreadCount[rrdPtr] = hbData.peakThreadCount;
+        systemLoadAverage[rrdPtr] = hbData.systemLoadAverage;
+        int gcN = hbSchema.getGarbageCollectorInfos().length;
+        for (int i = 0; i < gcN; ++i) {
+            gcCollectionCounts[i][rrdPtr] = hbData.gcCollectionCounts[i];
+            gcCollectionTimes[i][rrdPtr] = hbData.gcCollectionTimes[i];
+        }
+        netPayloadBytesRead[rrdPtr] = hbData.netPayloadBytesRead;
+        netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
+        netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
+        netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
+        datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
+        datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
+        datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
+        datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
+        ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
+        ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
+        ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
+        ipcMessageBytesReceived[rrdPtr] = hbData.ipcMessageBytesReceived;
+        diskReads[rrdPtr] = hbData.diskReads;
+        diskWrites[rrdPtr] = hbData.diskWrites;
+        rrdPtr = (rrdPtr + 1) % RRD_SIZE;
+    }
+
+    public void touchHeartbeat() {
+        lastHeartbeatNanoTime = System.nanoTime();
+    }
+
+    public long nanosSinceLastHeartbeat() {
+        return System.nanoTime() - lastHeartbeatNanoTime;
+    }
+
+    public NCConfig getNCConfig() {
+        return ncConfig;
+    }
+
+    public Set<JobId> getActiveJobIds() {
+        return activeJobIds;
+    }
+
+    public NetworkAddress getDataPort() {
+        return dataPort;
+    }
+
+    public NetworkAddress getDatasetPort() {
+        return datasetPort;
+    }
+
+    public NetworkAddress getMessagingPort() {
+        return messagingPort;
+    }
+
+    public NodeCapacity getCapacity() {
+        return capacity;
+    }
+
+    public synchronized ObjectNode toSummaryJSON() {
+        ObjectMapper om = new ObjectMapper();
+        ObjectNode o = om.createObjectNode();
+        put(o, "node-id", ncConfig.getNodeId());
+        put(o, "heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
+        put(o, "system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
+
+        return o;
+    }
+
+    public synchronized ObjectNode toDetailedJSON(boolean includeStats, boolean includeConfig) {
+        ObjectMapper om = new ObjectMapper();
+        ObjectNode o = om.createObjectNode();
+
+        put(o, "node-id", ncConfig.getNodeId());
+
+        if (includeConfig) {
+            put(o, "os-name", osName);
+            put(o, "arch", arch);
+            put(o, "os-version", osVersion);
+            put(o, "num-processors", nProcessors);
+            put(o, "vm-name", vmName);
+            put(o, "vm-version", vmVersion);
+            put(o, "vm-vendor", vmVendor);
+            put(o, "classpath", StringUtils.split(classpath, File.pathSeparatorChar));
+            put(o, "library-path", StringUtils.split(libraryPath, File.pathSeparatorChar));
+            put(o, "boot-classpath", StringUtils.split(bootClasspath, File.pathSeparatorChar));
+            put(o, "input-arguments", inputArguments);
+            put(o, "input-arguments", inputArguments);
+            put(o, "system-properties", systemProperties);
+            put(o, "pid", pid);
+        }
+        if (includeStats) {
+            o.putPOJO("date", new Date());
+            put(o, "rrd-ptr", rrdPtr);
+            put(o, "heartbeat-times", hbTime);
+            put(o, "heap-init-sizes", heapInitSize);
+            put(o, "heap-used-sizes", heapUsedSize);
+            put(o, "heap-committed-sizes", heapCommittedSize);
+            put(o, "heap-max-sizes", heapMaxSize);
+            put(o, "nonheap-init-sizes", nonheapInitSize);
+            put(o, "nonheap-used-sizes", nonheapUsedSize);
+            put(o, "nonheap-committed-sizes", nonheapCommittedSize);
+            put(o, "nonheap-max-sizes", nonheapMaxSize);
+            put(o, "application-memory-budget", capacity.getMemoryByteSize());
+            put(o, "application-cpu-core-budget", capacity.getCores());
+            put(o, "thread-counts", threadCount);
+            put(o, "peak-thread-counts", peakThreadCount);
+            put(o, "system-load-averages", systemLoadAverage);
+            put(o, "gc-names", gcNames);
+            put(o, "gc-collection-counts", gcCollectionCounts);
+            put(o, "gc-collection-times", gcCollectionTimes);
+            put(o, "net-payload-bytes-read", netPayloadBytesRead);
+            put(o, "net-payload-bytes-written", netPayloadBytesWritten);
+            put(o, "net-signaling-bytes-read", netSignalingBytesRead);
+            put(o, "net-signaling-bytes-written", netSignalingBytesWritten);
+            put(o, "dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
+            put(o, "dataset-net-payload-bytes-written", datasetNetPayloadBytesWritten);
+            put(o, "dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
+            put(o, "dataset-net-signaling-bytes-written", datasetNetSignalingBytesWritten);
+            put(o, "ipc-messages-sent", ipcMessagesSent);
+            put(o, "ipc-message-bytes-sent", ipcMessageBytesSent);
+            put(o, "ipc-messages-received", ipcMessagesReceived);
+            put(o, "ipc-message-bytes-received", ipcMessageBytesReceived);
+            put(o, "disk-reads", diskReads);
+            put(o, "disk-writes", diskWrites);
+        }
+
+        return o;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index 5e3c3d4..6230f1d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -35,7 +35,7 @@
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
 
 public interface IClusterController {
-    void registerNode(NodeRegistration reg) throws Exception;
+    void registerNode(NodeRegistration reg, int registrationId) throws Exception;
 
     void unregisterNode(String nodeId) throws Exception;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index a87c30a..76f5ad8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -22,7 +22,6 @@
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
@@ -73,10 +72,6 @@
 
     private final NodeCapacity capacity;
 
-    private final int registrationId;
-
-    private static final AtomicInteger nextRegistrationId = new AtomicInteger();
-
     public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
             NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors, String vmName,
             String vmVersion, String vmVendor, String classpath, String libraryPath, String bootClasspath,
@@ -103,7 +98,6 @@
         this.messagingPort = messagingPort;
         this.capacity = capacity;
         this.pid = pid;
-        this.registrationId = nextRegistrationId.getAndIncrement();
     }
 
     public InetSocketAddress getNodeControllerAddress() {
@@ -189,8 +183,4 @@
     public int getPid() {
         return pid;
     }
-
-    public int getRegistrationId() {
-        return registrationId;
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/heartbeat/HeartbeatData.java
index 1dbb5b6..d09d0b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.control.common.heartbeat;
 
+import static org.apache.hyracks.control.common.utils.MXHelper.gcMXBeans;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -54,6 +56,11 @@
     public long diskWrites;
     public int numCores;
 
+    public HeartbeatData() {
+        gcCollectionCounts = new long[gcMXBeans.size()];
+        gcCollectionTimes = new long[gcMXBeans.size()];
+    }
+
     public void readFields(DataInput dis) throws IOException {
         heapInitSize = dis.readLong();
         heapUsedSize = dis.readLong();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 3d505f3..c10c8981 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -174,9 +174,11 @@
         private static final long serialVersionUID = 1L;
 
         private final NodeRegistration reg;
+        private final int registrationId;
 
-        public RegisterNodeFunction(NodeRegistration reg) {
+        public RegisterNodeFunction(NodeRegistration reg, int registrationId) {
             this.reg = reg;
+            this.registrationId = registrationId;
         }
 
         @Override
@@ -187,6 +189,10 @@
         public NodeRegistration getNodeRegistration() {
             return reg;
         }
+
+        public int getRegistrationId() {
+            return registrationId;
+        }
     }
 
     public static class UnregisterNodeFunction extends Function {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 027316e..bf35e6b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -63,8 +63,8 @@
     }
 
     @Override
-    public void registerNode(NodeRegistration reg) throws Exception {
-        RegisterNodeFunction fn = new RegisterNodeFunction(reg);
+    public void registerNode(NodeRegistration reg, int registrationId) throws Exception {
+        RegisterNodeFunction fn = new RegisterNodeFunction(reg, registrationId);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ConfigurationUtil.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ConfigurationUtil.java
index d2a455f2..d7cef4e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ConfigurationUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ConfigurationUtil.java
@@ -22,7 +22,6 @@
     public static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
 
     private ConfigurationUtil() {
-
     }
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/HyracksThreadFactory.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/HyracksThreadFactory.java
index 6c50b09..dce9bf5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/HyracksThreadFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/HyracksThreadFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hyracks.control.common.utils;
 
-import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -40,12 +39,8 @@
     public Thread newThread(Runnable runnable) {
         Thread t = new Thread(runnable, "Executor-" + threadId.incrementAndGet() + ":" + identifier);
         t.setDaemon(true);
-        t.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(Thread t, Throwable e) {
-                LOGGER.log(Level.ERROR, "Uncaught exception by " + t.getName(), e);
-            }
-        });
+        t.setUncaughtExceptionHandler(
+                (thread, e) -> LOGGER.log(Level.ERROR, "Uncaught exception by " + thread.getName(), e));
         return t;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/MXHelper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/MXHelper.java
new file mode 100644
index 0000000..cfe8c21
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/MXHelper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.utils;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.RuntimeMXBean;
+import java.lang.management.ThreadMXBean;
+import java.util.Collections;
+import java.util.List;
+
+public class MXHelper {
+    public static final MemoryMXBean memoryMXBean;
+    public static final List<GarbageCollectorMXBean> gcMXBeans;
+    public static final ThreadMXBean threadMXBean;
+    public static final RuntimeMXBean runtimeMXBean;
+    public static final OperatingSystemMXBean osMXBean;
+
+    static {
+        memoryMXBean = ManagementFactory.getMemoryMXBean();
+        gcMXBeans = Collections.unmodifiableList(ManagementFactory.getGarbageCollectorMXBeans());
+        threadMXBean = ManagementFactory.getThreadMXBean();
+        runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+        osMXBean = ManagementFactory.getOperatingSystemMXBean();
+    }
+
+    private MXHelper() {
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index 63fffb4..1c6c98e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -54,9 +54,9 @@
         notifyAll();
     }
 
-    public synchronized CcId registerNode(NodeRegistration nodeRegistration) throws Exception {
+    public synchronized CcId registerNode(NodeRegistration nodeRegistration, int registrationId) throws Exception {
         registrationPending = true;
-        ccs.registerNode(nodeRegistration);
+        ccs.registerNode(nodeRegistration, registrationId);
         while (registrationPending) {
             wait();
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0b7254d..0d0c535 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -18,16 +18,14 @@
  */
 package org.apache.hyracks.control.nc;
 
+import static org.apache.hyracks.control.common.utils.MXHelper.gcMXBeans;
+import static org.apache.hyracks.control.common.utils.MXHelper.memoryMXBean;
+import static org.apache.hyracks.control.common.utils.MXHelper.osMXBean;
+import static org.apache.hyracks.control.common.utils.MXHelper.runtimeMXBean;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.MemoryUsage;
-import java.lang.management.OperatingSystemMXBean;
-import java.lang.management.RuntimeMXBean;
-import java.lang.management.ThreadMXBean;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collections;
@@ -40,8 +38,8 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -65,13 +63,13 @@
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.control.common.NodeControllerData;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.context.ServerContext;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.common.controllers.NodeParameters;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
-import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import org.apache.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
@@ -80,9 +78,9 @@
 import org.apache.hyracks.control.common.work.WorkQueue;
 import org.apache.hyracks.control.nc.application.NCServiceContext;
 import org.apache.hyracks.control.nc.dataset.DatasetPartitionManager;
+import org.apache.hyracks.control.nc.heartbeat.HeartbeatComputeTask;
+import org.apache.hyracks.control.nc.heartbeat.HeartbeatTask;
 import org.apache.hyracks.control.nc.io.IOManager;
-import org.apache.hyracks.control.nc.io.profiling.IIOCounter;
-import org.apache.hyracks.control.nc.io.profiling.IOCounterFactory;
 import org.apache.hyracks.control.nc.net.DatasetNetworkManager;
 import org.apache.hyracks.control.nc.net.MessagingNetworkManager;
 import org.apache.hyracks.control.nc.net.NetworkManager;
@@ -92,11 +90,9 @@
 import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
 import org.apache.hyracks.ipc.api.IIPCEventListener;
 import org.apache.hyracks.ipc.api.IIPCHandle;
-import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
 import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
-import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.hyracks.util.PidHelper;
 import org.apache.hyracks.util.trace.ITracer;
@@ -110,7 +106,7 @@
     private static final Logger LOGGER = LogManager.getLogger();
 
     private static final double MEMORY_FUDGE_FACTOR = 0.8;
-    private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1);
+    private static final int HEARTBEAT_REFRESH_MILLIS = 60000;
 
     private final NCConfig ncConfig;
 
@@ -162,31 +158,28 @@
 
     private final ILifeCycleComponentManager lccm;
 
-    private final MemoryMXBean memoryMXBean;
-
-    private final List<GarbageCollectorMXBean> gcMXBeans;
-
-    private final ThreadMXBean threadMXBean;
-
-    private final RuntimeMXBean runtimeMXBean;
-
-    private final OperatingSystemMXBean osMXBean;
-
     private final Mutable<FutureValue<Map<String, NodeControllerInfo>>> getNodeControllerInfosAcceptor;
 
     private final MemoryManager memoryManager;
 
     private StackTraceElement[] shutdownCallStack;
 
-    private IIOCounter ioCounter;
-
     private MessagingNetworkManager messagingNetManager;
 
     private final ConfigManager configManager;
 
     private final Map<CcId, AtomicLong> maxJobIds = new ConcurrentHashMap<>();
+
     private NodeStatus status = NodeStatus.BOOTING;
 
+    private NodeRegistration nodeRegistration;
+
+    private NodeControllerData ncData;
+
+    private HeartbeatComputeTask hbTask;
+
+    private static final AtomicInteger nextRegistrationId = new AtomicInteger();
+
     static {
         ExitUtil.init();
     }
@@ -226,15 +219,9 @@
             timer = new Timer(true);
             serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
                     new File(new File(NodeControllerService.class.getName()), id));
-            memoryMXBean = ManagementFactory.getMemoryMXBean();
-            gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
-            threadMXBean = ManagementFactory.getThreadMXBean();
-            runtimeMXBean = ManagementFactory.getRuntimeMXBean();
-            osMXBean = ManagementFactory.getOperatingSystemMXBean();
             getNodeControllerInfosAcceptor = new MutableObject<>();
             memoryManager =
                     new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
-            ioCounter = IOCounterFactory.INSTANCE.getIOCounter();
         } catch (Throwable th) { // NOSONAR will be re-thrown
             CleanupUtils.close(ioManager, th);
             throw th;
@@ -276,7 +263,7 @@
         fv.setValue(ncInfos);
     }
 
-    private void init() throws Exception {
+    private void init() {
         datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.getResultManagerMemory(),
                 ncConfig.getResultTTL(), ncConfig.getResultSweepThreshold());
         datasetNetworkManager = new DatasetNetworkManager(ncConfig.getResultListenAddress(),
@@ -308,11 +295,15 @@
         if (messagingNetManager != null) {
             messagingNetManager.start();
         }
-
-        this.primaryCcId = addCc(new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()));
+        initNodeControllerState();
+        hbTask = new HeartbeatComputeTask(this);
+        primaryCcId = addCc(new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()));
 
         workQueue.start();
 
+        // Schedule heartbeat data updates
+        timer.schedule(hbTask, HEARTBEAT_REFRESH_MILLIS, HEARTBEAT_REFRESH_MILLIS);
+
         // Schedule tracing a human-readable datetime
         timer.schedule(new TraceCurrentTimeTask(serviceCtx.getTracer()), 0, 60000);
 
@@ -320,6 +311,34 @@
         application.startupCompleted();
     }
 
+    private void initNodeControllerState() {
+        // Use "public" versions of network addresses and ports, if defined
+        InetSocketAddress ncAddress;
+        if (ncConfig.getClusterPublicPort() == 0) {
+            ncAddress = ipc.getSocketAddress();
+        } else {
+            ncAddress = new InetSocketAddress(ncConfig.getClusterPublicAddress(), ncConfig.getClusterPublicPort());
+        }
+        HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
+        for (int i = 0; i < gcInfos.length; ++i) {
+            gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
+        }
+        HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
+
+        NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
+        NetworkAddress netAddress = netManager.getPublicNetworkAddress();
+        NetworkAddress messagingAddress =
+                messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() : null;
+        nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress, osMXBean.getName(),
+                osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(),
+                runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(),
+                runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
+                runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress, application.getCapacity(),
+                PidHelper.getPid());
+
+        ncData = new NodeControllerData(nodeRegistration);
+    }
+
     public CcId addCc(InetSocketAddress ccAddress) throws Exception {
         synchronized (ccLock) {
             LOGGER.info("addCc: {}", ccAddress);
@@ -348,7 +367,7 @@
         }
     }
 
-    public void makePrimaryCc(InetSocketAddress ccAddress) throws Exception {
+    public void makePrimaryCc(InetSocketAddress ccAddress) {
         LOGGER.info("makePrimaryCc: {}", ccAddress);
         if (ccAddress.isUnresolved()) {
             throw new IllegalArgumentException("must use resolved InetSocketAddress");
@@ -393,33 +412,12 @@
         }
     }
 
-    protected CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception {
+    private CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception {
         LOGGER.info("Registering with Cluster Controller {}", ccc);
-        HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
-        for (int i = 0; i < gcInfos.length; ++i) {
-            gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
-        }
-        HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
-        // Use "public" versions of network addresses and ports, if defined
-        InetSocketAddress ncAddress;
-        if (ncConfig.getClusterPublicPort() == 0) {
-            ncAddress = ipc.getSocketAddress();
-        } else {
-            ncAddress = new InetSocketAddress(ncConfig.getClusterPublicAddress(), ncConfig.getClusterPublicPort());
-        }
-        NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
-        NetworkAddress netAddress = netManager.getPublicNetworkAddress();
-        NetworkAddress messagingAddress =
-                messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() : null;
-        NodeRegistration nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress,
-                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
-                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
-                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
-                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress,
-                application.getCapacity(), PidHelper.getPid());
 
-        pendingRegistrations.put(nodeRegistration.getRegistrationId(), ccc);
-        CcId ccId = ccc.registerNode(nodeRegistration);
+        int registrationId = nextRegistrationId.incrementAndGet();
+        pendingRegistrations.put(registrationId, ccc);
+        CcId ccId = ccc.registerNode(nodeRegistration, registrationId);
         ccMap.put(ccId, ccc);
         ccAddressMap.put(ccAddress, ccId);
         Serializable distributedState = ccc.getNodeParameters().getDistributedState();
@@ -432,8 +430,9 @@
 
         // Start heartbeat generator.
         if (!heartbeatThreads.containsKey(ccId)) {
-            Thread heartbeatThread =
-                    new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()), id + "-Heartbeat");
+            Thread heartbeatThread = new Thread(
+                    new HeartbeatTask(getId(), hbTask.getHeartbeatData(), ccs, nodeParameters.getHeartbeatPeriod()),
+                    id + "-Heartbeat");
             heartbeatThread.setPriority(Thread.MAX_PRIORITY);
             heartbeatThread.setDaemon(true);
             heartbeatThread.start();
@@ -642,103 +641,39 @@
         this.status = status;
     }
 
-    private class HeartbeatTask implements Runnable {
-        private final Semaphore delayBlock = new Semaphore(0);
-        private final IClusterController cc;
-        private final long heartbeatPeriodNanos;
+    public NodeControllerData getNodeControllerData() {
+        return ncData;
+    }
 
-        private final HeartbeatData hbData;
+    public IPCSystem getIpcSystem() {
+        return ipc;
+    }
 
-        HeartbeatTask(IClusterController cc, long heartbeatPeriod) {
-            this.cc = cc;
-            this.heartbeatPeriodNanos = TimeUnit.MILLISECONDS.toNanos(heartbeatPeriod);
-            hbData = new HeartbeatData();
-            hbData.gcCollectionCounts = new long[gcMXBeans.size()];
-            hbData.gcCollectionTimes = new long[gcMXBeans.size()];
+    public void sendApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId deploymentId) throws Exception {
+        getClusterController(ccId).sendApplicationMessageToCC(data, deploymentId, id);
+    }
+
+    public IDatasetPartitionManager getDatasetPartitionManager() {
+        return datasetPartitionManager;
+    }
+
+    public MessagingNetworkManager getMessagingNetworkManager() {
+        return messagingNetManager;
+    }
+
+    private static INCApplication getApplication(NCConfig config)
+            throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+        if (config.getAppClass() != null) {
+            Class<?> c = Class.forName(config.getAppClass());
+            return (INCApplication) c.newInstance();
+        } else {
+            return BaseNCApplication.INSTANCE;
         }
+    }
 
-        @Override
-        public void run() {
-            while (!Thread.currentThread().isInterrupted()) {
-                try {
-                    long nextFireNanoTime = System.nanoTime() + heartbeatPeriodNanos;
-                    final boolean success = execute();
-                    sleepUntilNextFire(success ? nextFireNanoTime - System.nanoTime() : ONE_SECOND_NANOS);
-                } catch (InterruptedException e) { // NOSONAR
-                    break;
-                }
-            }
-            LOGGER.log(Level.INFO, "Heartbeat thread interrupted; shutting down");
-        }
-
-        private void sleepUntilNextFire(long delayNanos) throws InterruptedException {
-            if (delayNanos > 0) {
-                delayBlock.tryAcquire(delayNanos, TimeUnit.NANOSECONDS); //NOSONAR - ignore result of tryAcquire
-            } else {
-                LOGGER.warn("After sending heartbeat, next one is already late by "
-                        + TimeUnit.NANOSECONDS.toMillis(-delayNanos) + "ms; sending without delay");
-            }
-        }
-
-        private boolean execute() throws InterruptedException {
-            MemoryUsage heapUsage = memoryMXBean.getHeapMemoryUsage();
-            hbData.heapInitSize = heapUsage.getInit();
-            hbData.heapUsedSize = heapUsage.getUsed();
-            hbData.heapCommittedSize = heapUsage.getCommitted();
-            hbData.heapMaxSize = heapUsage.getMax();
-            MemoryUsage nonheapUsage = memoryMXBean.getNonHeapMemoryUsage();
-            hbData.nonheapInitSize = nonheapUsage.getInit();
-            hbData.nonheapUsedSize = nonheapUsage.getUsed();
-            hbData.nonheapCommittedSize = nonheapUsage.getCommitted();
-            hbData.nonheapMaxSize = nonheapUsage.getMax();
-            hbData.threadCount = threadMXBean.getThreadCount();
-            hbData.peakThreadCount = threadMXBean.getPeakThreadCount();
-            hbData.totalStartedThreadCount = threadMXBean.getTotalStartedThreadCount();
-            hbData.systemLoadAverage = osMXBean.getSystemLoadAverage();
-            int gcN = gcMXBeans.size();
-            for (int i = 0; i < gcN; ++i) {
-                GarbageCollectorMXBean gcMXBean = gcMXBeans.get(i);
-                hbData.gcCollectionCounts[i] = gcMXBean.getCollectionCount();
-                hbData.gcCollectionTimes[i] = gcMXBean.getCollectionTime();
-            }
-
-            MuxDemuxPerformanceCounters netPC = netManager.getPerformanceCounters();
-            hbData.netPayloadBytesRead = netPC.getPayloadBytesRead();
-            hbData.netPayloadBytesWritten = netPC.getPayloadBytesWritten();
-            hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
-            hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
-
-            MuxDemuxPerformanceCounters datasetNetPC = datasetNetworkManager.getPerformanceCounters();
-            hbData.datasetNetPayloadBytesRead = datasetNetPC.getPayloadBytesRead();
-            hbData.datasetNetPayloadBytesWritten = datasetNetPC.getPayloadBytesWritten();
-            hbData.datasetNetSignalingBytesRead = datasetNetPC.getSignalingBytesRead();
-            hbData.datasetNetSignalingBytesWritten = datasetNetPC.getSignalingBytesWritten();
-
-            IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
-            hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
-            hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
-            hbData.ipcMessagesReceived = ipcPC.getMessageReceivedCount();
-            hbData.ipcMessageBytesReceived = ipcPC.getMessageBytesReceived();
-
-            hbData.diskReads = ioCounter.getReads();
-            hbData.diskWrites = ioCounter.getWrites();
-            hbData.numCores = Runtime.getRuntime().availableProcessors();
-
-            try {
-                cc.nodeHeartbeat(id, hbData);
-                LOGGER.log(Level.DEBUG, "Successfully sent heartbeat");
-                return true;
-            } catch (InterruptedException e) {
-                throw e;
-            } catch (Exception e) {
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.log(Level.DEBUG, "Exception sending heartbeat; will retry after 1s", e);
-                } else {
-                    LOGGER.log(Level.ERROR, "Exception sending heartbeat; will retry after 1s: " + e.toString());
-                }
-                return false;
-            }
-        }
+    @Override
+    public Object getApplicationContext() {
+        return application.getApplicationContext();
     }
 
     private class ProfileDumpTask extends TimerTask {
@@ -785,32 +720,4 @@
             }
         }
     }
-
-    public void sendApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId deploymentId) throws Exception {
-        getClusterController(ccId).sendApplicationMessageToCC(data, deploymentId, id);
-    }
-
-    public IDatasetPartitionManager getDatasetPartitionManager() {
-        return datasetPartitionManager;
-    }
-
-    public MessagingNetworkManager getMessagingNetworkManager() {
-        return messagingNetManager;
-    }
-
-    private static INCApplication getApplication(NCConfig config)
-            throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-        if (config.getAppClass() != null) {
-            Class<?> c = Class.forName(config.getAppClass());
-            return (INCApplication) c.newInstance();
-        } else {
-            return BaseNCApplication.INSTANCE;
-        }
-    }
-
-    @Override
-    public Object getApplicationContext() {
-        return application.getApplicationContext();
-    }
-
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
new file mode 100644
index 0000000..4965b85
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatComputeTask.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.heartbeat;
+
+import static org.apache.hyracks.control.common.utils.MXHelper.gcMXBeans;
+import static org.apache.hyracks.control.common.utils.MXHelper.memoryMXBean;
+import static org.apache.hyracks.control.common.utils.MXHelper.osMXBean;
+import static org.apache.hyracks.control.common.utils.MXHelper.threadMXBean;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.MemoryUsage;
+import java.util.TimerTask;
+
+import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.io.profiling.IIOCounter;
+import org.apache.hyracks.control.nc.io.profiling.IOCounterFactory;
+import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
+import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class HeartbeatComputeTask extends TimerTask {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final IIOCounter ioCounter;
+
+    private NodeControllerService ncs;
+    private final HeartbeatData hbData;
+
+    public HeartbeatComputeTask(NodeControllerService ncs) {
+        this.ncs = ncs;
+        hbData = new HeartbeatData();
+        ioCounter = IOCounterFactory.INSTANCE.getIOCounter();
+        run();
+    }
+
+    public HeartbeatData getHeartbeatData() {
+        return hbData;
+    }
+
+    @Override
+    public void run() {
+        synchronized (hbData) {
+            MemoryUsage heapUsage = memoryMXBean.getHeapMemoryUsage();
+            hbData.heapInitSize = heapUsage.getInit();
+            hbData.heapUsedSize = heapUsage.getUsed();
+            hbData.heapCommittedSize = heapUsage.getCommitted();
+            hbData.heapMaxSize = heapUsage.getMax();
+            MemoryUsage nonheapUsage = memoryMXBean.getNonHeapMemoryUsage();
+            hbData.nonheapInitSize = nonheapUsage.getInit();
+            hbData.nonheapUsedSize = nonheapUsage.getUsed();
+            hbData.nonheapCommittedSize = nonheapUsage.getCommitted();
+            hbData.nonheapMaxSize = nonheapUsage.getMax();
+            hbData.threadCount = threadMXBean.getThreadCount();
+            hbData.peakThreadCount = threadMXBean.getPeakThreadCount();
+            hbData.totalStartedThreadCount = threadMXBean.getTotalStartedThreadCount();
+            hbData.systemLoadAverage = osMXBean.getSystemLoadAverage();
+            int gcN = gcMXBeans.size();
+            for (int i = 0; i < gcN; ++i) {
+                GarbageCollectorMXBean gcMXBean = gcMXBeans.get(i);
+                hbData.gcCollectionCounts[i] = gcMXBean.getCollectionCount();
+                hbData.gcCollectionTimes[i] = gcMXBean.getCollectionTime();
+            }
+
+            MuxDemuxPerformanceCounters netPC = ncs.getNetworkManager().getPerformanceCounters();
+            hbData.netPayloadBytesRead = netPC.getPayloadBytesRead();
+            hbData.netPayloadBytesWritten = netPC.getPayloadBytesWritten();
+            hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
+            hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
+
+            MuxDemuxPerformanceCounters datasetNetPC = ncs.getDatasetNetworkManager().getPerformanceCounters();
+            hbData.datasetNetPayloadBytesRead = datasetNetPC.getPayloadBytesRead();
+            hbData.datasetNetPayloadBytesWritten = datasetNetPC.getPayloadBytesWritten();
+            hbData.datasetNetSignalingBytesRead = datasetNetPC.getSignalingBytesRead();
+            hbData.datasetNetSignalingBytesWritten = datasetNetPC.getSignalingBytesWritten();
+
+            IPCPerformanceCounters ipcPC = ncs.getIpcSystem().getPerformanceCounters();
+            hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
+            hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
+            hbData.ipcMessagesReceived = ipcPC.getMessageReceivedCount();
+            hbData.ipcMessageBytesReceived = ipcPC.getMessageBytesReceived();
+
+            hbData.diskReads = ioCounter.getReads();
+            hbData.diskWrites = ioCounter.getWrites();
+            hbData.numCores = Runtime.getRuntime().availableProcessors();
+
+            ncs.getNodeControllerData().notifyHeartbeat(hbData);
+        }
+        LOGGER.log(Level.DEBUG, "Successfully refreshed heartbeat data");
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java
new file mode 100644
index 0000000..9160e46
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/heartbeat/HeartbeatTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.heartbeat;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.control.common.base.IClusterController;
+import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class HeartbeatTask implements Runnable {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private static final long ONE_SECOND_NANOS = TimeUnit.SECONDS.toNanos(1);
+    private final String ncId;
+    private final HeartbeatData hbData;
+    private final Semaphore delayBlock = new Semaphore(0);
+    private final IClusterController cc;
+    private final long heartbeatPeriodNanos;
+
+    public HeartbeatTask(String ncId, HeartbeatData hbData, IClusterController cc, long heartbeatPeriod) {
+        this.ncId = ncId;
+        this.hbData = hbData;
+        this.cc = cc;
+        this.heartbeatPeriodNanos = TimeUnit.MILLISECONDS.toNanos(heartbeatPeriod);
+    }
+
+    @Override
+    public void run() {
+        while (!Thread.currentThread().isInterrupted()) {
+            try {
+                long nextFireNanoTime = System.nanoTime() + heartbeatPeriodNanos;
+                final boolean success = execute();
+                long delayNanos = success ? nextFireNanoTime - System.nanoTime() : ONE_SECOND_NANOS;
+                if (delayNanos > 0) {
+                    delayBlock.tryAcquire(delayNanos, TimeUnit.NANOSECONDS); //NOSONAR - ignore result of tryAcquire
+                } else {
+                    LOGGER.warn("After sending heartbeat, next one is already late by "
+                            + TimeUnit.NANOSECONDS.toMillis(-delayNanos) + "ms; sending without delay");
+                }
+            } catch (InterruptedException e) { // NOSONAR
+                break;
+            }
+        }
+        LOGGER.log(Level.INFO, "Heartbeat task interrupted; shutting down");
+    }
+
+    private boolean execute() throws InterruptedException {
+        try {
+            synchronized (hbData) {
+                cc.nodeHeartbeat(ncId, hbData);
+            }
+            LOGGER.log(Level.DEBUG, "Successfully sent heartbeat");
+            return true;
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (Exception e) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.log(Level.DEBUG, "Exception sending heartbeat; will retry after 1s", e);
+            } else {
+                LOGGER.log(Level.ERROR, "Exception sending heartbeat; will retry after 1s: " + e.toString());
+            }
+            return false;
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java
index 158ab66..6085c1c 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java
@@ -20,9 +20,13 @@
 
 import java.io.IOException;
 import java.io.Writer;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -33,6 +37,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class JSONUtil {
 
@@ -84,47 +89,46 @@
         throw new UnsupportedOperationException(o.getClass().getSimpleName());
     }
 
-    private static StringBuilder appendObj(StringBuilder builder, JsonNode jobj, int indent) {
-        StringBuilder sb = builder.append("{\n");
+    private static StringBuilder appendObj(final StringBuilder sb, final JsonNode outer, final int indent) {
+        sb.append("{\n");
         boolean first = true;
-        for (Iterator<JsonNode> it = jobj.iterator(); it.hasNext();) {
-            final String key = it.next().asText();
+        for (JsonNode inner : outer) {
+            final String key = inner.asText();
             if (first) {
                 first = false;
             } else {
-                sb = sb.append(",\n");
+                sb.append(",\n");
             }
-            sb = indent(sb, indent + 1);
-            sb = quote(sb, key);
-            sb = sb.append(": ");
-            if (jobj.get(key).isArray()) {
-                sb = appendAry(sb, jobj.get(key), indent + 1);
-            } else if (jobj.get(key).isObject()) {
-                sb = appendObj(sb, jobj.get(key), indent + 1);
-            } else {
-                sb = appendOrd(sb, jobj.get(key), indent + 1);
-            }
+            indent(sb, indent + 1);
+            quote(sb, key);
+            sb.append(": ");
+            appendVal(sb, outer.get(key), indent);
         }
-        sb = sb.append("\n");
+        sb.append("\n");
         return indent(sb, indent).append("}");
     }
 
-    private static StringBuilder appendAry(StringBuilder builder, JsonNode jarr, int indent) {
-        StringBuilder sb = builder.append("[\n");
+    private static StringBuilder appendVal(final StringBuilder sb, final JsonNode value, final int indent) {
+        if (value.isArray()) {
+            appendAry(sb, value, indent + 1);
+        } else if (value.isObject()) {
+            appendObj(sb, value, indent + 1);
+        } else {
+            appendOrd(sb, value, indent + 1);
+        }
+        return sb;
+    }
+
+    private static StringBuilder appendAry(final StringBuilder sb, JsonNode jarr, int indent) {
+        sb.append("[\n");
         for (int i = 0; i < jarr.size(); ++i) {
             if (i > 0) {
-                sb = sb.append(",\n");
+                sb.append(",\n");
             }
-            sb = indent(sb, indent + 1);
-            if (jarr.get(i).isArray()) {
-                sb = appendAry(sb, jarr.get(i), indent + 1);
-            } else if (jarr.get(i).isObject()) {
-                sb = appendObj(sb, jarr.get(i), indent + 1);
-            } else {
-                sb = appendOrd(sb, jarr.get(i), indent + 1);
-            }
+            indent(sb, indent + 1);
+            appendVal(sb, jarr.get(i), indent);
         }
-        sb = sb.append("\n");
+        sb.append("\n");
         return indent(sb, indent).append("]");
     }
 
@@ -228,4 +232,48 @@
         aString.append(" }");
         return aString.toString();
     }
+
+    public static void put(ObjectNode o, String name, int value) {
+        o.put(name, value);
+    }
+
+    public static void put(ObjectNode o, String name, String value) {
+        o.put(name, value);
+    }
+
+    public static void put(ObjectNode o, String name, long value) {
+        o.put(name, value);
+    }
+
+    public static void put(ObjectNode o, String name, double value) {
+        o.put(name, value);
+    }
+
+    public static void put(ObjectNode o, String name, long[] elements) {
+        LongStream.of(elements).forEachOrdered(o.putArray(name)::add);
+    }
+
+    public static void put(ObjectNode o, String name, long[][] elements) {
+        Stream.of(elements).forEachOrdered(o.putArray(name)::addPOJO);
+    }
+
+    public static void put(ObjectNode o, String name, int[] elements) {
+        IntStream.of(elements).forEachOrdered(o.putArray(name)::add);
+    }
+
+    public static void put(ObjectNode o, String name, double[] elements) {
+        DoubleStream.of(elements).forEachOrdered(o.putArray(name)::add);
+    }
+
+    public static void put(ObjectNode o, String name, Map<String, String> map) {
+        map.forEach(o.putObject(name)::put);
+    }
+
+    public static void put(ObjectNode o, String name, String[] elements) {
+        Stream.of(elements).forEachOrdered(o.putArray(name)::add);
+    }
+
+    public static void put(ObjectNode o, String name, List<String> elements) {
+        elements.forEach(o.putArray(name)::add);
+    }
 }
