Fixed NPE in Stagelet Completion. Added success flag to finishJoblet() call

git-svn-id: https://hyracks.googlecode.com/svn/trunk@577 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java
index d0269fc..a90eaec 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java
@@ -43,7 +43,7 @@
 
     public void abortJoblet(UUID jobId, int attempt) throws Exception;
 
-    public void cleanUpJob(UUID jobId) throws Exception;
+    public void cleanUpJob(UUID jobId, boolean success) throws Exception;
 
     public void startStage(UUID jobId, UUID stageId) throws Exception;
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
index bfb70ea..4827c25 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
@@ -17,5 +17,5 @@
 public interface IJobletEventListener {
     public void jobletStart();
 
-    public void jobletFinish();
+    public void jobletFinish(boolean success);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
index c4ed077..9ec8760 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCleanupEvent.java
@@ -49,7 +49,7 @@
         final JobCompleteNotifier[] jcns = new JobCompleteNotifier[targetNodes.size()];
         int i = 0;
         for (String n : targetNodes) {
-            jcns[i++] = new JobCompleteNotifier(n, jobId);
+            jcns[i++] = new JobCompleteNotifier(n, jobId, status == JobStatus.TERMINATED);
         }
         ccs.getExecutor().execute(new Runnable() {
             @Override
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java
index 34dbeb4..8742ff3 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java
@@ -30,16 +30,12 @@
 public class StageletFailureEvent implements Runnable {
     private final ClusterControllerService ccs;
     private final UUID jobId;
-    private final UUID stageId;
     private final int attempt;
-    private final String nodeId;
 
     public StageletFailureEvent(ClusterControllerService ccs, UUID jobId, UUID stageId, int attempt, String nodeId) {
         this.ccs = ccs;
         this.jobId = jobId;
-        this.stageId = stageId;
         this.attempt = attempt;
-        this.nodeId = nodeId;
     }
 
     @Override
@@ -50,7 +46,10 @@
         final Set<String> targetNodes = new HashSet<String>(ja.getParticipatingNodeIds());
         Map<String, NodeControllerState> nodeMap = new HashMap<String, NodeControllerState>();
         for (String nodeId : targetNodes) {
-            nodeMap.get(nodeId).getActiveJobIds().remove(jobId);
+            NodeControllerState ncState = nodeMap.get(nodeId);
+            if (ncState != null) {
+                ncState.getActiveJobIds().remove(jobId);
+            }
         }
         ccs.getExecutor().execute(new Runnable() {
             @Override
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
index 81a8610..563e883 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
@@ -22,15 +22,17 @@
 public class JobCompleteNotifier implements RemoteOp<Void> {
     private String nodeId;
     private UUID jobId;
+    private boolean status;
 
-    public JobCompleteNotifier(String nodeId, UUID jobId) {
+    public JobCompleteNotifier(String nodeId, UUID jobId, boolean status) {
         this.nodeId = nodeId;
         this.jobId = jobId;
+        this.status = status;
     }
 
     @Override
     public Void execute(INodeController node) throws Exception {
-        node.cleanUpJob(jobId);
+        node.cleanUpJob(jobId, status);
         return null;
     }
 
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index b2f2364..163d792 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -466,7 +466,7 @@
     }
 
     @Override
-    public void cleanUpJob(UUID jobId) throws Exception {
+    public void cleanUpJob(UUID jobId, boolean success) throws Exception {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Cleaning up after job: " + jobId);
         }
@@ -474,7 +474,7 @@
         if (joblet != null) {
             IJobletEventListener listener = joblet.getJobletEventListener();
             if (listener != null) {
-                listener.jobletFinish();
+                listener.jobletFinish(success);
             }
             joblet.close();
         }