notifyHeartbeat in some non-heartbeat events
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index 67ba2b6..3ee036d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -33,276 +33,281 @@
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo;
public class NodeControllerState {
- private static final int RRD_SIZE = 720;
+ private static final int RRD_SIZE = 720;
- private final INodeController nodeController;
+ private final INodeController nodeController;
- private final NCConfig ncConfig;
+ private final NCConfig ncConfig;
- private final NetworkAddress dataPort;
+ private final NetworkAddress dataPort;
- private final NetworkAddress datasetPort;
+ private final NetworkAddress datasetPort;
- private final Set<JobId> activeJobIds;
+ private final Set<JobId> activeJobIds;
- private final String osName;
+ private final String osName;
- private final String arch;
+ private final String arch;
- private final String osVersion;
+ private final String osVersion;
- private final int nProcessors;
+ private final int nProcessors;
- private final String vmName;
+ private final String vmName;
- private final String vmVersion;
+ private final String vmVersion;
- private final String vmVendor;
+ private final String vmVendor;
- private final String classpath;
+ private final String classpath;
- private final String libraryPath;
+ private final String libraryPath;
- private final String bootClasspath;
+ private final String bootClasspath;
- private final List<String> inputArguments;
+ private final List<String> inputArguments;
- private final Map<String, String> systemProperties;
+ private final Map<String, String> systemProperties;
- private final HeartbeatSchema hbSchema;
+ private final HeartbeatSchema hbSchema;
- private final long[] hbTime;
+ private final long[] hbTime;
- private final long[] heapInitSize;
+ private final long[] heapInitSize;
- private final long[] heapUsedSize;
+ private final long[] heapUsedSize;
- private final long[] heapCommittedSize;
+ private final long[] heapCommittedSize;
- private final long[] heapMaxSize;
+ private final long[] heapMaxSize;
- private final long[] nonheapInitSize;
+ private final long[] nonheapInitSize;
- private final long[] nonheapUsedSize;
+ private final long[] nonheapUsedSize;
- private final long[] nonheapCommittedSize;
+ private final long[] nonheapCommittedSize;
- private final long[] nonheapMaxSize;
+ private final long[] nonheapMaxSize;
- private final int[] threadCount;
+ private final int[] threadCount;
- private final int[] peakThreadCount;
+ private final int[] peakThreadCount;
- private final double[] systemLoadAverage;
+ private final double[] systemLoadAverage;
- private final String[] gcNames;
+ private final String[] gcNames;
- private final long[][] gcCollectionCounts;
+ private final long[][] gcCollectionCounts;
- private final long[][] gcCollectionTimes;
+ private final long[][] gcCollectionTimes;
- private final long[] netPayloadBytesRead;
+ private final long[] netPayloadBytesRead;
- private final long[] netPayloadBytesWritten;
+ private final long[] netPayloadBytesWritten;
- private final long[] netSignalingBytesRead;
+ private final long[] netSignalingBytesRead;
- private final long[] netSignalingBytesWritten;
+ private final long[] netSignalingBytesWritten;
- private final long[] datasetNetPayloadBytesRead;
+ private final long[] datasetNetPayloadBytesRead;
- private final long[] datasetNetPayloadBytesWritten;
+ private final long[] datasetNetPayloadBytesWritten;
- private final long[] datasetNetSignalingBytesRead;
+ private final long[] datasetNetSignalingBytesRead;
- private final long[] datasetNetSignalingBytesWritten;
+ private final long[] datasetNetSignalingBytesWritten;
- private final long[] ipcMessagesSent;
+ private final long[] ipcMessagesSent;
- private final long[] ipcMessageBytesSent;
+ private final long[] ipcMessageBytesSent;
- private final long[] ipcMessagesReceived;
+ private final long[] ipcMessagesReceived;
- private final long[] ipcMessageBytesReceived;
+ private final long[] ipcMessageBytesReceived;
- private int rrdPtr;
+ private int rrdPtr;
- private int lastHeartbeatDuration;
+ private int lastHeartbeatDuration;
- public NodeControllerState(INodeController nodeController, NodeRegistration reg) {
- this.nodeController = nodeController;
- ncConfig = reg.getNCConfig();
- dataPort = reg.getDataPort();
- datasetPort = reg.getDatasetPort();
- activeJobIds = new HashSet<JobId>();
+ public NodeControllerState(INodeController nodeController,
+ NodeRegistration reg) {
+ this.nodeController = nodeController;
+ ncConfig = reg.getNCConfig();
+ dataPort = reg.getDataPort();
+ datasetPort = reg.getDatasetPort();
+ activeJobIds = new HashSet<JobId>();
- osName = reg.getOSName();
- arch = reg.getArch();
- osVersion = reg.getOSVersion();
- nProcessors = reg.getNProcessors();
- vmName = reg.getVmName();
- vmVersion = reg.getVmVersion();
- vmVendor = reg.getVmVendor();
- classpath = reg.getClasspath();
- libraryPath = reg.getLibraryPath();
- bootClasspath = reg.getBootClasspath();
- inputArguments = reg.getInputArguments();
- systemProperties = reg.getSystemProperties();
+ osName = reg.getOSName();
+ arch = reg.getArch();
+ osVersion = reg.getOSVersion();
+ nProcessors = reg.getNProcessors();
+ vmName = reg.getVmName();
+ vmVersion = reg.getVmVersion();
+ vmVendor = reg.getVmVendor();
+ classpath = reg.getClasspath();
+ libraryPath = reg.getLibraryPath();
+ bootClasspath = reg.getBootClasspath();
+ inputArguments = reg.getInputArguments();
+ systemProperties = reg.getSystemProperties();
- hbSchema = reg.getHeartbeatSchema();
+ hbSchema = reg.getHeartbeatSchema();
- hbTime = new long[RRD_SIZE];
- heapInitSize = new long[RRD_SIZE];
- heapUsedSize = new long[RRD_SIZE];
- heapCommittedSize = new long[RRD_SIZE];
- heapMaxSize = new long[RRD_SIZE];
- nonheapInitSize = new long[RRD_SIZE];
- nonheapUsedSize = new long[RRD_SIZE];
- nonheapCommittedSize = new long[RRD_SIZE];
- nonheapMaxSize = new long[RRD_SIZE];
- threadCount = new int[RRD_SIZE];
- peakThreadCount = new int[RRD_SIZE];
- systemLoadAverage = new double[RRD_SIZE];
- GarbageCollectorInfo[] gcInfos = hbSchema.getGarbageCollectorInfos();
- int gcN = gcInfos.length;
- gcNames = new String[gcN];
- for (int i = 0; i < gcN; ++i) {
- gcNames[i] = gcInfos[i].getName();
- }
- gcCollectionCounts = new long[gcN][RRD_SIZE];
- gcCollectionTimes = new long[gcN][RRD_SIZE];
- netPayloadBytesRead = new long[RRD_SIZE];
- netPayloadBytesWritten = new long[RRD_SIZE];
- netSignalingBytesRead = new long[RRD_SIZE];
- netSignalingBytesWritten = new long[RRD_SIZE];
- datasetNetPayloadBytesRead = new long[RRD_SIZE];
- datasetNetPayloadBytesWritten = new long[RRD_SIZE];
- datasetNetSignalingBytesRead = new long[RRD_SIZE];
- datasetNetSignalingBytesWritten = new long[RRD_SIZE];
- ipcMessagesSent = new long[RRD_SIZE];
- ipcMessageBytesSent = new long[RRD_SIZE];
- ipcMessagesReceived = new long[RRD_SIZE];
- ipcMessageBytesReceived = new long[RRD_SIZE];
+ hbTime = new long[RRD_SIZE];
+ heapInitSize = new long[RRD_SIZE];
+ heapUsedSize = new long[RRD_SIZE];
+ heapCommittedSize = new long[RRD_SIZE];
+ heapMaxSize = new long[RRD_SIZE];
+ nonheapInitSize = new long[RRD_SIZE];
+ nonheapUsedSize = new long[RRD_SIZE];
+ nonheapCommittedSize = new long[RRD_SIZE];
+ nonheapMaxSize = new long[RRD_SIZE];
+ threadCount = new int[RRD_SIZE];
+ peakThreadCount = new int[RRD_SIZE];
+ systemLoadAverage = new double[RRD_SIZE];
+ GarbageCollectorInfo[] gcInfos = hbSchema.getGarbageCollectorInfos();
+ int gcN = gcInfos.length;
+ gcNames = new String[gcN];
+ for (int i = 0; i < gcN; ++i) {
+ gcNames[i] = gcInfos[i].getName();
+ }
+ gcCollectionCounts = new long[gcN][RRD_SIZE];
+ gcCollectionTimes = new long[gcN][RRD_SIZE];
+ netPayloadBytesRead = new long[RRD_SIZE];
+ netPayloadBytesWritten = new long[RRD_SIZE];
+ netSignalingBytesRead = new long[RRD_SIZE];
+ netSignalingBytesWritten = new long[RRD_SIZE];
+ datasetNetPayloadBytesRead = new long[RRD_SIZE];
+ datasetNetPayloadBytesWritten = new long[RRD_SIZE];
+ datasetNetSignalingBytesRead = new long[RRD_SIZE];
+ datasetNetSignalingBytesWritten = new long[RRD_SIZE];
+ ipcMessagesSent = new long[RRD_SIZE];
+ ipcMessageBytesSent = new long[RRD_SIZE];
+ ipcMessagesReceived = new long[RRD_SIZE];
+ ipcMessageBytesReceived = new long[RRD_SIZE];
- rrdPtr = 0;
- }
+ rrdPtr = 0;
+ }
- public void notifyHeartbeat(HeartbeatData hbData) {
- lastHeartbeatDuration = 0;
-
- hbTime[rrdPtr] = System.currentTimeMillis();
- heapInitSize[rrdPtr] = hbData.heapInitSize;
- heapUsedSize[rrdPtr] = hbData.heapUsedSize;
- heapCommittedSize[rrdPtr] = hbData.heapCommittedSize;
- heapMaxSize[rrdPtr] = hbData.heapMaxSize;
- nonheapInitSize[rrdPtr] = hbData.nonheapInitSize;
- nonheapUsedSize[rrdPtr] = hbData.nonheapUsedSize;
- nonheapCommittedSize[rrdPtr] = hbData.nonheapCommittedSize;
- nonheapMaxSize[rrdPtr] = hbData.nonheapMaxSize;
- threadCount[rrdPtr] = hbData.threadCount;
- peakThreadCount[rrdPtr] = hbData.peakThreadCount;
- systemLoadAverage[rrdPtr] = hbData.systemLoadAverage;
- int gcN = hbSchema.getGarbageCollectorInfos().length;
- for (int i = 0; i < gcN; ++i) {
- gcCollectionCounts[i][rrdPtr] = hbData.gcCollectionCounts[i];
- gcCollectionTimes[i][rrdPtr] = hbData.gcCollectionTimes[i];
- }
- netPayloadBytesRead[rrdPtr] = hbData.netPayloadBytesRead;
- netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
- netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
- netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
- datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
- datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
- datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
- datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
- ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
- ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
- ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
- ipcMessageBytesReceived[rrdPtr] = hbData.ipcMessageBytesReceived;
- rrdPtr = (rrdPtr + 1) % RRD_SIZE;
- }
+ public void notifyHeartbeat(HeartbeatData hbData) {
+ lastHeartbeatDuration = 0;
+ hbTime[rrdPtr] = System.currentTimeMillis();
+ if (hbData != null) {
+ heapInitSize[rrdPtr] = hbData.heapInitSize;
+ heapUsedSize[rrdPtr] = hbData.heapUsedSize;
+ heapCommittedSize[rrdPtr] = hbData.heapCommittedSize;
+ heapMaxSize[rrdPtr] = hbData.heapMaxSize;
+ nonheapInitSize[rrdPtr] = hbData.nonheapInitSize;
+ nonheapUsedSize[rrdPtr] = hbData.nonheapUsedSize;
+ nonheapCommittedSize[rrdPtr] = hbData.nonheapCommittedSize;
+ nonheapMaxSize[rrdPtr] = hbData.nonheapMaxSize;
+ threadCount[rrdPtr] = hbData.threadCount;
+ peakThreadCount[rrdPtr] = hbData.peakThreadCount;
+ systemLoadAverage[rrdPtr] = hbData.systemLoadAverage;
+ int gcN = hbSchema.getGarbageCollectorInfos().length;
+ for (int i = 0; i < gcN; ++i) {
+ gcCollectionCounts[i][rrdPtr] = hbData.gcCollectionCounts[i];
+ gcCollectionTimes[i][rrdPtr] = hbData.gcCollectionTimes[i];
+ }
+ netPayloadBytesRead[rrdPtr] = hbData.netPayloadBytesRead;
+ netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
+ netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
+ netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
+ datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
+ datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
+ datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
+ datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
+ ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
+ ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
+ ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
+ ipcMessageBytesReceived[rrdPtr] = hbData.ipcMessageBytesReceived;
+ }
+ rrdPtr = (rrdPtr + 1) % RRD_SIZE;
+ }
- public int incrementLastHeartbeatDuration() {
- return lastHeartbeatDuration++;
- }
+ public int incrementLastHeartbeatDuration() {
+ return lastHeartbeatDuration++;
+ }
- public int getLastHeartbeatDuration() {
- return lastHeartbeatDuration;
- }
+ public int getLastHeartbeatDuration() {
+ return lastHeartbeatDuration;
+ }
- public INodeController getNodeController() {
- return nodeController;
- }
+ public INodeController getNodeController() {
+ return nodeController;
+ }
- public NCConfig getNCConfig() {
- return ncConfig;
- }
+ public NCConfig getNCConfig() {
+ return ncConfig;
+ }
- public Set<JobId> getActiveJobIds() {
- return activeJobIds;
- }
+ public Set<JobId> getActiveJobIds() {
+ return activeJobIds;
+ }
- public NetworkAddress getDataPort() {
- return dataPort;
- }
+ public NetworkAddress getDataPort() {
+ return dataPort;
+ }
- public NetworkAddress getDatasetPort() {
- return datasetPort;
- }
+ public NetworkAddress getDatasetPort() {
+ return datasetPort;
+ }
- public JSONObject toSummaryJSON() throws JSONException {
- JSONObject o = new JSONObject();
- o.put("node-id", ncConfig.nodeId);
- o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
- o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
+ public JSONObject toSummaryJSON() throws JSONException {
+ JSONObject o = new JSONObject();
+ o.put("node-id", ncConfig.nodeId);
+ o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
+ o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1)
+ % RRD_SIZE]);
- return o;
- }
+ return o;
+ }
- public JSONObject toDetailedJSON() throws JSONException {
- JSONObject o = new JSONObject();
+ public JSONObject toDetailedJSON() throws JSONException {
+ JSONObject o = new JSONObject();
- o.put("node-id", ncConfig.nodeId);
- o.put("os-name", osName);
- o.put("arch", arch);
- o.put("os-version", osVersion);
- o.put("num-processors", nProcessors);
- o.put("vm-name", vmName);
- o.put("vm-version", vmVersion);
- o.put("vm-vendor", vmVendor);
- o.put("classpath", classpath);
- o.put("library-path", libraryPath);
- o.put("boot-classpath", bootClasspath);
- o.put("input-arguments", new JSONArray(inputArguments));
- o.put("rrd-ptr", rrdPtr);
- o.put("heartbeat-times", hbTime);
- o.put("heap-init-sizes", heapInitSize);
- o.put("heap-used-sizes", heapUsedSize);
- o.put("heap-committed-sizes", heapCommittedSize);
- o.put("heap-max-sizes", heapMaxSize);
- o.put("nonheap-init-sizes", nonheapInitSize);
- o.put("nonheap-used-sizes", nonheapUsedSize);
- o.put("nonheap-committed-sizes", nonheapCommittedSize);
- o.put("nonheap-max-sizes", nonheapMaxSize);
- o.put("thread-counts", threadCount);
- o.put("peak-thread-counts", peakThreadCount);
- o.put("system-load-averages", systemLoadAverage);
- o.put("gc-names", gcNames);
- o.put("gc-collection-counts", gcCollectionCounts);
- o.put("gc-collection-times", gcCollectionTimes);
- o.put("net-payload-bytes-read", netPayloadBytesRead);
- o.put("net-payload-bytes-written", netPayloadBytesWritten);
- o.put("net-signaling-bytes-read", netSignalingBytesRead);
- o.put("net-signaling-bytes-written", netSignalingBytesWritten);
- o.put("dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
- o.put("dataset-net-payload-bytes-written", datasetNetPayloadBytesWritten);
- o.put("dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
- o.put("dataset-net-signaling-bytes-written", datasetNetSignalingBytesWritten);
- o.put("ipc-messages-sent", ipcMessagesSent);
- o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
- o.put("ipc-messages-received", ipcMessagesReceived);
- o.put("ipc-message-bytes-received", ipcMessageBytesReceived);
+ o.put("node-id", ncConfig.nodeId);
+ o.put("os-name", osName);
+ o.put("arch", arch);
+ o.put("os-version", osVersion);
+ o.put("num-processors", nProcessors);
+ o.put("vm-name", vmName);
+ o.put("vm-version", vmVersion);
+ o.put("vm-vendor", vmVendor);
+ o.put("classpath", classpath);
+ o.put("library-path", libraryPath);
+ o.put("boot-classpath", bootClasspath);
+ o.put("input-arguments", new JSONArray(inputArguments));
+ o.put("rrd-ptr", rrdPtr);
+ o.put("heartbeat-times", hbTime);
+ o.put("heap-init-sizes", heapInitSize);
+ o.put("heap-used-sizes", heapUsedSize);
+ o.put("heap-committed-sizes", heapCommittedSize);
+ o.put("heap-max-sizes", heapMaxSize);
+ o.put("nonheap-init-sizes", nonheapInitSize);
+ o.put("nonheap-used-sizes", nonheapUsedSize);
+ o.put("nonheap-committed-sizes", nonheapCommittedSize);
+ o.put("nonheap-max-sizes", nonheapMaxSize);
+ o.put("thread-counts", threadCount);
+ o.put("peak-thread-counts", peakThreadCount);
+ o.put("system-load-averages", systemLoadAverage);
+ o.put("gc-names", gcNames);
+ o.put("gc-collection-counts", gcCollectionCounts);
+ o.put("gc-collection-times", gcCollectionTimes);
+ o.put("net-payload-bytes-read", netPayloadBytesRead);
+ o.put("net-payload-bytes-written", netPayloadBytesWritten);
+ o.put("net-signaling-bytes-read", netSignalingBytesRead);
+ o.put("net-signaling-bytes-written", netSignalingBytesWritten);
+ o.put("dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
+ o.put("dataset-net-payload-bytes-written",
+ datasetNetPayloadBytesWritten);
+ o.put("dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
+ o.put("dataset-net-signaling-bytes-written",
+ datasetNetSignalingBytesWritten);
+ o.put("ipc-messages-sent", ipcMessagesSent);
+ o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
+ o.put("ipc-messages-received", ipcMessagesReceived);
+ o.put("ipc-message-bytes-received", ipcMessageBytesReceived);
- return o;
- }
+ return o;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
index bc58d1e..5109078 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.messages.IMessage;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
@@ -57,6 +58,7 @@
LOGGER.log(Level.WARNING, "Error in stats reporting", e);
throw new RuntimeException(e);
}
+ HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 5eb851a..66186ca 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
public class JobletCleanupNotificationWork extends AbstractWork {
@@ -45,6 +46,7 @@
@Override
public void run() {
+ HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
final JobRun run = ccs.getActiveRunMap().get(jobId);
Set<String> cleanupPendingNodes = run.getCleanupPendingNodeIds();
if (!cleanupPendingNodes.remove(nodeId)) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
index 970a45d..eef6b96 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NodeHeartbeatWork.java
@@ -14,11 +14,10 @@
*/
package edu.uci.ics.hyracks.control.cc.work;
-import java.util.Map;
import java.util.logging.Level;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
@@ -35,11 +34,7 @@
@Override
protected void doRun() throws Exception {
- Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
- NodeControllerState state = nodeMap.get(nodeId);
- if (state != null) {
- state.notifyHeartbeat(hbData);
- }
+ HeartbeatUtils.notifyHeartbeat(ccs, nodeId, hbData);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
index c4c8873..e400fa8 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyDeployBinaryWork.java
@@ -17,6 +17,7 @@
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
import edu.uci.ics.hyracks.control.common.deployment.DeploymentStatus;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
@@ -47,6 +48,7 @@
/** triggered remotely by a NC to notify that the NC is deployed */
DeploymentRun dRun = ccs.getDeploymentRun(deploymentId);
dRun.notifyDeploymentStatus(nodeId, deploymentStatus);
+ HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
index 8e5050c..77cddf3 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -50,6 +51,7 @@
jobletProfile.getTaskProfiles().put(taId, statistics);
}
run.getScheduler().notifyTaskComplete(ta);
+ HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
} catch (HyracksException e) {
e.printStackTrace();
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index e00025d..3578fd62 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
+import edu.uci.ics.hyracks.control.cc.work.utils.HeartbeatUtils;
public class TaskFailureWork extends AbstractTaskLifecycleWork {
private final List<Exception> exceptions;
@@ -38,6 +39,7 @@
ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
run.getScheduler().notifyTaskFailure(ta, ac, exceptions);
+ HeartbeatUtils.notifyHeartbeat(ccs, nodeId, null);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java
new file mode 100644
index 0000000..51bbc16
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/utils/HeartbeatUtils.java
@@ -0,0 +1,18 @@
+package edu.uci.ics.hyracks.control.cc.work.utils;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+
+public class HeartbeatUtils {
+
+ public static void notifyHeartbeat(ClusterControllerService ccs, String nodeId, HeartbeatData hbData) {
+ Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
+ NodeControllerState state = nodeMap.get(nodeId);
+ if (state != null) {
+ state.notifyHeartbeat(hbData);
+ }
+ }
+}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index 279de61..76a9562 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -62,7 +62,7 @@
ccConfig.jobHistorySize = 1;
ccConfig.profileDumpPeriod = -1;
ccConfig.heartbeatPeriod = 1000;
- ccConfig.maxHeartbeatLapsePeriods = 10;
+ ccConfig.maxHeartbeatLapsePeriods = 15;
// cluster controller
cc = new ClusterControllerService(ccConfig);