Added notification and cleanup on failure (Logical merge of -r 580:581 from trunk).

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@837 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
index bfb70ea..c83e333 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
@@ -17,5 +17,5 @@
 public interface IJobletEventListener {
     public void jobletStart();
 
-    public void jobletFinish();
+    public void jobletFinish(JobStatus status);
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
index 414cf12..548c963 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
@@ -15,21 +15,24 @@
 package edu.uci.ics.hyracks.control.cc.remote.ops;
 
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
 import edu.uci.ics.hyracks.control.common.base.INodeController;
 
 public class JobCompleteNotifier implements RemoteOp<Void> {
     private String nodeId;
     private JobId jobId;
+    private JobStatus status;
 
-    public JobCompleteNotifier(String nodeId, JobId jobId) {
+    public JobCompleteNotifier(String nodeId, JobId jobId, JobStatus status) {
         this.nodeId = nodeId;
         this.jobId = jobId;
+        this.status = status;
     }
 
     @Override
     public Void execute(INodeController node) throws Exception {
-        node.cleanUpJob(jobId);
+        node.cleanUpJob(jobId, status);
         return null;
     }
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index e86741e..c5df7bf 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -462,7 +462,8 @@
         for (TaskCluster tc : inProgressTaskClusters) {
             abortTaskCluster(findLastTaskClusterAttempt(tc));
         }
-        jobRun.setStatus(JobStatus.FAILURE, exception);
+        inProgressTaskClusters.clear();
+        ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, jobRun.getJobId(), JobStatus.FAILURE, exception));
     }
 
     private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index d792aa6..006874b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -51,7 +51,7 @@
         final JobCompleteNotifier[] jcns = new JobCompleteNotifier[targetNodes.size()];
         int i = 0;
         for (String n : targetNodes) {
-            jcns[i++] = new JobCompleteNotifier(n, jobId);
+            jcns[i++] = new JobCompleteNotifier(n, jobId, status);
             NodeControllerState ncs = ccs.getNodeMap().get(n);
             if (ncs != null) {
                 ncs.getActiveJobIds().remove(jobId);
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index efaa921..313f92e 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
@@ -37,7 +38,7 @@
 
     public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
 
-    public void cleanUpJob(JobId jobId) throws Exception;
+    public void cleanUpJob(JobId jobId, JobStatus status) throws Exception;
 
     public void notifyRegistration(IClusterController ccs) throws Exception;
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 3cd3c2f..373d8b3 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -47,6 +47,7 @@
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
 import edu.uci.ics.hyracks.control.common.base.IClusterController;
@@ -241,8 +242,8 @@
     }
 
     @Override
-    public void cleanUpJob(JobId jobId) throws Exception {
-        CleanupJobWork cjw = new CleanupJobWork(this, jobId);
+    public void cleanUpJob(JobId jobId, JobStatus status) throws Exception {
+        CleanupJobWork cjw = new CleanupJobWork(this, jobId, status);
         queue.scheduleAndSync(cjw);
     }
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
index 7ae27ba..7fbc603 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
@@ -20,6 +20,7 @@
 
 import edu.uci.ics.hyracks.api.job.IJobletEventListener;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 import edu.uci.ics.hyracks.control.nc.Joblet;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
@@ -31,9 +32,12 @@
 
     private final JobId jobId;
 
-    public CleanupJobWork(NodeControllerService ncs, JobId jobId) {
+    private JobStatus status;
+
+    public CleanupJobWork(NodeControllerService ncs, JobId jobId, JobStatus status) {
         this.ncs = ncs;
         this.jobId = jobId;
+        this.status = status;
     }
 
     @Override
@@ -46,7 +50,7 @@
         if (joblet != null) {
             IJobletEventListener listener = joblet.getJobletEventListener();
             if (listener != null) {
-                listener.jobletFinish();
+                listener.jobletFinish(status);
             }
             ncs.getPartitionManager().unregisterPartitions(jobId);
             joblet.close();