Fixed NPE in Stagelet Completion. Added success flag to finishJoblet() call
git-svn-id: https://hyracks.googlecode.com/svn/trunk@577 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java
index d0269fc..a90eaec 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java
@@ -43,7 +43,7 @@
public void abortJoblet(UUID jobId, int attempt) throws Exception;
- public void cleanUpJob(UUID jobId) throws Exception;
+ public void cleanUpJob(UUID jobId, boolean success) throws Exception;
public void startStage(UUID jobId, UUID stageId) throws Exception;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
index bfb70ea..4827c25 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
@@ -17,5 +17,5 @@
public interface IJobletEventListener {
public void jobletStart();
- public void jobletFinish();
+ public void jobletFinish(boolean success);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
index c4ed077..9ec8760 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
@@ -49,7 +49,7 @@
final JobCompleteNotifier[] jcns = new JobCompleteNotifier[targetNodes.size()];
int i = 0;
for (String n : targetNodes) {
- jcns[i++] = new JobCompleteNotifier(n, jobId);
+ jcns[i++] = new JobCompleteNotifier(n, jobId, status == JobStatus.TERMINATED);
}
ccs.getExecutor().execute(new Runnable() {
@Override
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java
index 34dbeb4..8742ff3 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java
@@ -30,16 +30,12 @@
public class StageletFailureEvent implements Runnable {
private final ClusterControllerService ccs;
private final UUID jobId;
- private final UUID stageId;
private final int attempt;
- private final String nodeId;
public StageletFailureEvent(ClusterControllerService ccs, UUID jobId, UUID stageId, int attempt, String nodeId) {
this.ccs = ccs;
this.jobId = jobId;
- this.stageId = stageId;
this.attempt = attempt;
- this.nodeId = nodeId;
}
@Override
@@ -50,7 +46,10 @@
final Set<String> targetNodes = new HashSet<String>(ja.getParticipatingNodeIds());
Map<String, NodeControllerState> nodeMap = new HashMap<String, NodeControllerState>();
for (String nodeId : targetNodes) {
- nodeMap.get(nodeId).getActiveJobIds().remove(jobId);
+ NodeControllerState ncState = nodeMap.get(nodeId);
+ if (ncState != null) {
+ ncState.getActiveJobIds().remove(jobId);
+ }
}
ccs.getExecutor().execute(new Runnable() {
@Override
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
index 81a8610..563e883 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
@@ -22,15 +22,17 @@
public class JobCompleteNotifier implements RemoteOp<Void> {
private String nodeId;
private UUID jobId;
+ private boolean status;
- public JobCompleteNotifier(String nodeId, UUID jobId) {
+ public JobCompleteNotifier(String nodeId, UUID jobId, boolean status) {
this.nodeId = nodeId;
this.jobId = jobId;
+ this.status = status;
}
@Override
public Void execute(INodeController node) throws Exception {
- node.cleanUpJob(jobId);
+ node.cleanUpJob(jobId, status);
return null;
}
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index b2f2364..163d792 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -466,7 +466,7 @@
}
@Override
- public void cleanUpJob(UUID jobId) throws Exception {
+ public void cleanUpJob(UUID jobId, boolean success) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Cleaning up after job: " + jobId);
}
@@ -474,7 +474,7 @@
if (joblet != null) {
IJobletEventListener listener = joblet.getJobletEventListener();
if (listener != null) {
- listener.jobletFinish();
+ listener.jobletFinish(success);
}
joblet.close();
}