notifyHeartbeat in some non-heartbeat events
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index 67ba2b6..3ee036d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -33,276 +33,281 @@
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo;
 
 public class NodeControllerState {
-    private static final int RRD_SIZE = 720;
+	private static final int RRD_SIZE = 720;
 
-    private final INodeController nodeController;
+	private final INodeController nodeController;
 
-    private final NCConfig ncConfig;
+	private final NCConfig ncConfig;
 
-    private final NetworkAddress dataPort;
+	private final NetworkAddress dataPort;
 
-    private final NetworkAddress datasetPort;
+	private final NetworkAddress datasetPort;
 
-    private final Set<JobId> activeJobIds;
+	private final Set<JobId> activeJobIds;
 
-    private final String osName;
+	private final String osName;
 
-    private final String arch;
+	private final String arch;
 
-    private final String osVersion;
+	private final String osVersion;
 
-    private final int nProcessors;
+	private final int nProcessors;
 
-    private final String vmName;
+	private final String vmName;
 
-    private final String vmVersion;
+	private final String vmVersion;
 
-    private final String vmVendor;
+	private final String vmVendor;
 
-    private final String classpath;
+	private final String classpath;
 
-    private final String libraryPath;
+	private final String libraryPath;
 
-    private final String bootClasspath;
+	private final String bootClasspath;
 
-    private final List<String> inputArguments;
+	private final List<String> inputArguments;
 
-    private final Map<String, String> systemProperties;
+	private final Map<String, String> systemProperties;
 
-    private final HeartbeatSchema hbSchema;
+	private final HeartbeatSchema hbSchema;
 
-    private final long[] hbTime;
+	private final long[] hbTime;
 
-    private final long[] heapInitSize;
+	private final long[] heapInitSize;
 
-    private final long[] heapUsedSize;
+	private final long[] heapUsedSize;
 
-    private final long[] heapCommittedSize;
+	private final long[] heapCommittedSize;
 
-    private final long[] heapMaxSize;
+	private final long[] heapMaxSize;
 
-    private final long[] nonheapInitSize;
+	private final long[] nonheapInitSize;
 
-    private final long[] nonheapUsedSize;
+	private final long[] nonheapUsedSize;
 
-    private final long[] nonheapCommittedSize;
+	private final long[] nonheapCommittedSize;
 
-    private final long[] nonheapMaxSize;
+	private final long[] nonheapMaxSize;
 
-    private final int[] threadCount;
+	private final int[] threadCount;
 
-    private final int[] peakThreadCount;
+	private final int[] peakThreadCount;
 
-    private final double[] systemLoadAverage;
+	private final double[] systemLoadAverage;
 
-    private final String[] gcNames;
+	private final String[] gcNames;
 
-    private final long[][] gcCollectionCounts;
+	private final long[][] gcCollectionCounts;
 
-    private final long[][] gcCollectionTimes;
+	private final long[][] gcCollectionTimes;
 
-    private final long[] netPayloadBytesRead;
+	private final long[] netPayloadBytesRead;
 
-    private final long[] netPayloadBytesWritten;
+	private final long[] netPayloadBytesWritten;
 
-    private final long[] netSignalingBytesRead;
+	private final long[] netSignalingBytesRead;
 
-    private final long[] netSignalingBytesWritten;
+	private final long[] netSignalingBytesWritten;
 
-    private final long[] datasetNetPayloadBytesRead;
+	private final long[] datasetNetPayloadBytesRead;
 
-    private final long[] datasetNetPayloadBytesWritten;
+	private final long[] datasetNetPayloadBytesWritten;
 
-    private final long[] datasetNetSignalingBytesRead;
+	private final long[] datasetNetSignalingBytesRead;
 
-    private final long[] datasetNetSignalingBytesWritten;
+	private final long[] datasetNetSignalingBytesWritten;
 
-    private final long[] ipcMessagesSent;
+	private final long[] ipcMessagesSent;
 
-    private final long[] ipcMessageBytesSent;
+	private final long[] ipcMessageBytesSent;
 
-    private final long[] ipcMessagesReceived;
+	private final long[] ipcMessagesReceived;
 
-    private final long[] ipcMessageBytesReceived;
+	private final long[] ipcMessageBytesReceived;
 
-    private int rrdPtr;
+	private int rrdPtr;
 
-    private int lastHeartbeatDuration;
+	private int lastHeartbeatDuration;
 
-    public NodeControllerState(INodeController nodeController, NodeRegistration reg) {
-        this.nodeController = nodeController;
-        ncConfig = reg.getNCConfig();
-        dataPort = reg.getDataPort();
-        datasetPort = reg.getDatasetPort();
-        activeJobIds = new HashSet<JobId>();
+	public NodeControllerState(INodeController nodeController,
+			NodeRegistration reg) {
+		this.nodeController = nodeController;
+		ncConfig = reg.getNCConfig();
+		dataPort = reg.getDataPort();
+		datasetPort = reg.getDatasetPort();
+		activeJobIds = new HashSet<JobId>();
 
-        osName = reg.getOSName();
-        arch = reg.getArch();
-        osVersion = reg.getOSVersion();
-        nProcessors = reg.getNProcessors();
-        vmName = reg.getVmName();
-        vmVersion = reg.getVmVersion();
-        vmVendor = reg.getVmVendor();
-        classpath = reg.getClasspath();
-        libraryPath = reg.getLibraryPath();
-        bootClasspath = reg.getBootClasspath();
-        inputArguments = reg.getInputArguments();
-        systemProperties = reg.getSystemProperties();
+		osName = reg.getOSName();
+		arch = reg.getArch();
+		osVersion = reg.getOSVersion();
+		nProcessors = reg.getNProcessors();
+		vmName = reg.getVmName();
+		vmVersion = reg.getVmVersion();
+		vmVendor = reg.getVmVendor();
+		classpath = reg.getClasspath();
+		libraryPath = reg.getLibraryPath();
+		bootClasspath = reg.getBootClasspath();
+		inputArguments = reg.getInputArguments();
+		systemProperties = reg.getSystemProperties();
 
-        hbSchema = reg.getHeartbeatSchema();
+		hbSchema = reg.getHeartbeatSchema();
 
-        hbTime = new long[RRD_SIZE];
-        heapInitSize = new long[RRD_SIZE];
-        heapUsedSize = new long[RRD_SIZE];
-        heapCommittedSize = new long[RRD_SIZE];
-        heapMaxSize = new long[RRD_SIZE];
-        nonheapInitSize = new long[RRD_SIZE];
-        nonheapUsedSize = new long[RRD_SIZE];
-        nonheapCommittedSize = new long[RRD_SIZE];
-        nonheapMaxSize = new long[RRD_SIZE];
-        threadCount = new int[RRD_SIZE];
-        peakThreadCount = new int[RRD_SIZE];
-        systemLoadAverage = new double[RRD_SIZE];
-        GarbageCollectorInfo[] gcInfos = hbSchema.getGarbageCollectorInfos();
-        int gcN = gcInfos.length;
-        gcNames = new String[gcN];
-        for (int i = 0; i < gcN; ++i) {
-            gcNames[i] = gcInfos[i].getName();
-        }
-        gcCollectionCounts = new long[gcN][RRD_SIZE];
-        gcCollectionTimes = new long[gcN][RRD_SIZE];
-        netPayloadBytesRead = new long[RRD_SIZE];
-        netPayloadBytesWritten = new long[RRD_SIZE];
-        netSignalingBytesRead = new long[RRD_SIZE];
-        netSignalingBytesWritten = new long[RRD_SIZE];
-        datasetNetPayloadBytesRead = new long[RRD_SIZE];
-        datasetNetPayloadBytesWritten = new long[RRD_SIZE];
-        datasetNetSignalingBytesRead = new long[RRD_SIZE];
-        datasetNetSignalingBytesWritten = new long[RRD_SIZE];
-        ipcMessagesSent = new long[RRD_SIZE];
-        ipcMessageBytesSent = new long[RRD_SIZE];
-        ipcMessagesReceived = new long[RRD_SIZE];
-        ipcMessageBytesReceived = new long[RRD_SIZE];
+		hbTime = new long[RRD_SIZE];
+		heapInitSize = new long[RRD_SIZE];
+		heapUsedSize = new long[RRD_SIZE];
+		heapCommittedSize = new long[RRD_SIZE];
+		heapMaxSize = new long[RRD_SIZE];
+		nonheapInitSize = new long[RRD_SIZE];
+		nonheapUsedSize = new long[RRD_SIZE];
+		nonheapCommittedSize = new long[RRD_SIZE];
+		nonheapMaxSize = new long[RRD_SIZE];
+		threadCount = new int[RRD_SIZE];
+		peakThreadCount = new int[RRD_SIZE];
+		systemLoadAverage = new double[RRD_SIZE];
+		GarbageCollectorInfo[] gcInfos = hbSchema.getGarbageCollectorInfos();
+		int gcN = gcInfos.length;
+		gcNames = new String[gcN];
+		for (int i = 0; i < gcN; ++i) {
+			gcNames[i] = gcInfos[i].getName();
+		}
+		gcCollectionCounts = new long[gcN][RRD_SIZE];
+		gcCollectionTimes = new long[gcN][RRD_SIZE];
+		netPayloadBytesRead = new long[RRD_SIZE];
+		netPayloadBytesWritten = new long[RRD_SIZE];
+		netSignalingBytesRead = new long[RRD_SIZE];
+		netSignalingBytesWritten = new long[RRD_SIZE];
+		datasetNetPayloadBytesRead = new long[RRD_SIZE];
+		datasetNetPayloadBytesWritten = new long[RRD_SIZE];
+		datasetNetSignalingBytesRead = new long[RRD_SIZE];
+		datasetNetSignalingBytesWritten = new long[RRD_SIZE];
+		ipcMessagesSent = new long[RRD_SIZE];
+		ipcMessageBytesSent = new long[RRD_SIZE];
+		ipcMessagesReceived = new long[RRD_SIZE];
+		ipcMessageBytesReceived = new long[RRD_SIZE];
 
-        rrdPtr = 0;
-    }
+		rrdPtr = 0;
+	}
 
-    public void notifyHeartbeat(HeartbeatData hbData) {
-        lastHeartbeatDuration = 0;
-
-        hbTime[rrdPtr] = System.currentTimeMillis();
-        heapInitSize[rrdPtr] = hbData.heapInitSize;
-        heapUsedSize[rrdPtr] = hbData.heapUsedSize;
-        heapCommittedSize[rrdPtr] = hbData.heapCommittedSize;
-        heapMaxSize[rrdPtr] = hbData.heapMaxSize;
-        nonheapInitSize[rrdPtr] = hbData.nonheapInitSize;
-        nonheapUsedSize[rrdPtr] = hbData.nonheapUsedSize;
-        nonheapCommittedSize[rrdPtr] = hbData.nonheapCommittedSize;
-        nonheapMaxSize[rrdPtr] = hbData.nonheapMaxSize;
-        threadCount[rrdPtr] = hbData.threadCount;
-        peakThreadCount[rrdPtr] = hbData.peakThreadCount;
-        systemLoadAverage[rrdPtr] = hbData.systemLoadAverage;
-        int gcN = hbSchema.getGarbageCollectorInfos().length;
-        for (int i = 0; i < gcN; ++i) {
-            gcCollectionCounts[i][rrdPtr] = hbData.gcCollectionCounts[i];
-            gcCollectionTimes[i][rrdPtr] = hbData.gcCollectionTimes[i];
-        }
-        netPayloadBytesRead[rrdPtr] = hbData.netPayloadBytesRead;
-        netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
-        netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
-        netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
-        datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
-        datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
-        datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
-        datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
-        ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
-        ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
-        ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
-        ipcMessageBytesReceived[rrdPtr] = hbData.ipcMessageBytesReceived;
-        rrdPtr = (rrdPtr + 1) % RRD_SIZE;
-    }
+	public void notifyHeartbeat(HeartbeatData hbData) {
+		lastHeartbeatDuration = 0;
+		hbTime[rrdPtr] = System.currentTimeMillis();
+		if (hbData != null) {
+			heapInitSize[rrdPtr] = hbData.heapInitSize;
+			heapUsedSize[rrdPtr] = hbData.heapUsedSize;
+			heapCommittedSize[rrdPtr] = hbData.heapCommittedSize;
+			heapMaxSize[rrdPtr] = hbData.heapMaxSize;
+			nonheapInitSize[rrdPtr] = hbData.nonheapInitSize;
+			nonheapUsedSize[rrdPtr] = hbData.nonheapUsedSize;
+			nonheapCommittedSize[rrdPtr] = hbData.nonheapCommittedSize;
+			nonheapMaxSize[rrdPtr] = hbData.nonheapMaxSize;
+			threadCount[rrdPtr] = hbData.threadCount;
+			peakThreadCount[rrdPtr] = hbData.peakThreadCount;
+			systemLoadAverage[rrdPtr] = hbData.systemLoadAverage;
+			int gcN = hbSchema.getGarbageCollectorInfos().length;
+			for (int i = 0; i < gcN; ++i) {
+				gcCollectionCounts[i][rrdPtr] = hbData.gcCollectionCounts[i];
+				gcCollectionTimes[i][rrdPtr] = hbData.gcCollectionTimes[i];
+			}
+			netPayloadBytesRead[rrdPtr] = hbData.netPayloadBytesRead;
+			netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
+			netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
+			netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
+			datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
+			datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
+			datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
+			datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
+			ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
+			ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
+			ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
+			ipcMessageBytesReceived[rrdPtr] = hbData.ipcMessageBytesReceived;
+		}
+		rrdPtr = (rrdPtr + 1) % RRD_SIZE;
+	}
 
-    public int incrementLastHeartbeatDuration() {
-        return lastHeartbeatDuration++;
-    }
+	public int incrementLastHeartbeatDuration() {
+		return lastHeartbeatDuration++;
+	}
 
-    public int getLastHeartbeatDuration() {
-        return lastHeartbeatDuration;
-    }
+	public int getLastHeartbeatDuration() {
+		return lastHeartbeatDuration;
+	}
 
-    public INodeController getNodeController() {
-        return nodeController;
-    }
+	public INodeController getNodeController() {
+		return nodeController;
+	}
 
-    public NCConfig getNCConfig() {
-        return ncConfig;
-    }
+	public NCConfig getNCConfig() {
+		return ncConfig;
+	}
 
-    public Set<JobId> getActiveJobIds() {
-        return activeJobIds;
-    }
+	public Set<JobId> getActiveJobIds() {
+		return activeJobIds;
+	}
 
-    public NetworkAddress getDataPort() {
-        return dataPort;
-    }
+	public NetworkAddress getDataPort() {
+		return dataPort;
+	}
 
-    public NetworkAddress getDatasetPort() {
-        return datasetPort;
-    }
+	public NetworkAddress getDatasetPort() {
+		return datasetPort;
+	}
 
-    public JSONObject toSummaryJSON() throws JSONException {
-        JSONObject o = new JSONObject();
-        o.put("node-id", ncConfig.nodeId);
-        o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
-        o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
+	public JSONObject toSummaryJSON() throws JSONException {
+		JSONObject o = new JSONObject();
+		o.put("node-id", ncConfig.nodeId);
+		o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
+		o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1)
+				% RRD_SIZE]);
 
-        return o;
-    }
+		return o;
+	}
 
-    public JSONObject toDetailedJSON() throws JSONException {
-        JSONObject o = new JSONObject();
+	public JSONObject toDetailedJSON() throws JSONException {
+		JSONObject o = new JSONObject();
 
-        o.put("node-id", ncConfig.nodeId);
-        o.put("os-name", osName);
-        o.put("arch", arch);
-        o.put("os-version", osVersion);
-        o.put("num-processors", nProcessors);
-        o.put("vm-name", vmName);
-        o.put("vm-version", vmVersion);
-        o.put("vm-vendor", vmVendor);
-        o.put("classpath", classpath);
-        o.put("library-path", libraryPath);
-        o.put("boot-classpath", bootClasspath);
-        o.put("input-arguments", new JSONArray(inputArguments));
-        o.put("rrd-ptr", rrdPtr);
-        o.put("heartbeat-times", hbTime);
-        o.put("heap-init-sizes", heapInitSize);
-        o.put("heap-used-sizes", heapUsedSize);
-        o.put("heap-committed-sizes", heapCommittedSize);
-        o.put("heap-max-sizes", heapMaxSize);
-        o.put("nonheap-init-sizes", nonheapInitSize);
-        o.put("nonheap-used-sizes", nonheapUsedSize);
-        o.put("nonheap-committed-sizes", nonheapCommittedSize);
-        o.put("nonheap-max-sizes", nonheapMaxSize);
-        o.put("thread-counts", threadCount);
-        o.put("peak-thread-counts", peakThreadCount);
-        o.put("system-load-averages", systemLoadAverage);
-        o.put("gc-names", gcNames);
-        o.put("gc-collection-counts", gcCollectionCounts);
-        o.put("gc-collection-times", gcCollectionTimes);
-        o.put("net-payload-bytes-read", netPayloadBytesRead);
-        o.put("net-payload-bytes-written", netPayloadBytesWritten);
-        o.put("net-signaling-bytes-read", netSignalingBytesRead);
-        o.put("net-signaling-bytes-written", netSignalingBytesWritten);
-        o.put("dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
-        o.put("dataset-net-payload-bytes-written", datasetNetPayloadBytesWritten);
-        o.put("dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
-        o.put("dataset-net-signaling-bytes-written", datasetNetSignalingBytesWritten);
-        o.put("ipc-messages-sent", ipcMessagesSent);
-        o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
-        o.put("ipc-messages-received", ipcMessagesReceived);
-        o.put("ipc-message-bytes-received", ipcMessageBytesReceived);
+		o.put("node-id", ncConfig.nodeId);
+		o.put("os-name", osName);
+		o.put("arch", arch);
+		o.put("os-version", osVersion);
+		o.put("num-processors", nProcessors);
+		o.put("vm-name", vmName);
+		o.put("vm-version", vmVersion);
+		o.put("vm-vendor", vmVendor);
+		o.put("classpath", classpath);
+		o.put("library-path", libraryPath);
+		o.put("boot-classpath", bootClasspath);
+		o.put("input-arguments", new JSONArray(inputArguments));
+		o.put("rrd-ptr", rrdPtr);
+		o.put("heartbeat-times", hbTime);
+		o.put("heap-init-sizes", heapInitSize);
+		o.put("heap-used-sizes", heapUsedSize);
+		o.put("heap-committed-sizes", heapCommittedSize);
+		o.put("heap-max-sizes", heapMaxSize);
+		o.put("nonheap-init-sizes", nonheapInitSize);
+		o.put("nonheap-used-sizes", nonheapUsedSize);
+		o.put("nonheap-committed-sizes", nonheapCommittedSize);
+		o.put("nonheap-max-sizes", nonheapMaxSize);
+		o.put("thread-counts", threadCount);
+		o.put("peak-thread-counts", peakThreadCount);
+		o.put("system-load-averages", systemLoadAverage);
+		o.put("gc-names", gcNames);
+		o.put("gc-collection-counts", gcCollectionCounts);
+		o.put("gc-collection-times", gcCollectionTimes);
+		o.put("net-payload-bytes-read", netPayloadBytesRead);
+		o.put("net-payload-bytes-written", netPayloadBytesWritten);
+		o.put("net-signaling-bytes-read", netSignalingBytesRead);
+		o.put("net-signaling-bytes-written", netSignalingBytesWritten);
+		o.put("dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
+		o.put("dataset-net-payload-bytes-written",
+				datasetNetPayloadBytesWritten);
+		o.put("dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
+		o.put("dataset-net-signaling-bytes-written",
+				datasetNetSignalingBytesWritten);
+		o.put("ipc-messages-sent", ipcMessagesSent);
+		o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
+		o.put("ipc-messages-received", ipcMessagesReceived);
+		o.put("ipc-message-bytes-received", ipcMessageBytesReceived);
 
-        return o;
-    }
+		return o;
+	}
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
index bc58d1e..5109078 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.messages.IMessage;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 
@@ -57,6 +58,7 @@
             LOGGER.log(Level.WARNING, "Error in stats reporting", e);
             throw new RuntimeException(e);
         }
+        HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 5eb851a..66186ca 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 
 public class JobletCleanupNotificationWork extends AbstractWork {
@@ -45,6 +46,7 @@
 
     @Override
     public void run() {
+        HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
         final JobRun run = ccs.getActiveRunMap().get(jobId);
         Set<String> cleanupPendingNodes = run.getCleanupPendingNodeIds();
         if (!cleanupPendingNodes.remove(nodeId)) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
index 970a45d..eef6b96 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
@@ -14,11 +14,10 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
-import java.util.Map;
 import java.util.logging.Level;
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 
@@ -35,11 +34,7 @@
 
     @Override
     protected void doRun() throws Exception {
-        Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
-        NodeControllerState state = nodeMap.get(nodeId);
-        if (state != null) {
-            state.notifyHeartbeat(hbData);
-        }
+        HeartbeatUtils.notifyHeartbeat(ccs, nodeId, hbData);
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
index c4c8873..e400fa8 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
@@ -17,6 +17,7 @@
 
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
@@ -47,6 +48,7 @@
         /** triggered remotely by a NC to notify that the NC is deployed */
         DeploymentRun dRun = ccs.getDeploymentRun(deploymentId);
         dRun.notifyDeploymentStatus(nodeId, deploymentStatus);
+        HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
     }
 
 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
index 8e5050c..77cddf3 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -50,6 +51,7 @@
                 jobletProfile.getTaskProfiles().put(taId, statistics);
             }
             run.getScheduler().notifyTaskComplete(ta);
+            HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
         } catch (HyracksException e) {
             e.printStackTrace();
         }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index e00025d..3578fd62 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
 
 public class TaskFailureWork extends AbstractTaskLifecycleWork {
     private final List<Exception> exceptions;
@@ -38,6 +39,7 @@
         ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
         ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
         run.getScheduler().notifyTaskFailure(ta, ac, exceptions);
+        HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java
new file mode 100644
index 0000000..51bbc16
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java
@@ -0,0 +1,18 @@
+package edu.uci.ics.hyracks.control.cc.work.utils;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+
+public class HeartbeatUtils {
+
+    public static void notifyHeartbeat(ClusterControllerService ccs, String nodeId, HeartbeatData hbData) {
+        Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+        NodeControllerState state = nodeMap.get(nodeId);
+        if (state != null) {
+            state.notifyHeartbeat(hbData);
+        }
+    }
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index 279de61..76a9562 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -62,7 +62,7 @@
         ccConfig.jobHistorySize = 1;
         ccConfig.profileDumpPeriod = -1;
         ccConfig.heartbeatPeriod = 1000;
-        ccConfig.maxHeartbeatLapsePeriods = 10;
+        ccConfig.maxHeartbeatLapsePeriods = 15;
         
         // cluster controller
         cc = new ClusterControllerService(ccConfig);