Added notification and cleanup on failure (Logical merge of -r 580:581 from trunk).
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@837 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
index bfb70ea..c83e333 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
@@ -17,5 +17,5 @@
public interface IJobletEventListener {
public void jobletStart();
- public void jobletFinish();
+ public void jobletFinish(JobStatus status);
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
index 414cf12..548c963 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/JobCompleteNotifier.java
@@ -15,21 +15,24 @@
package edu.uci.ics.hyracks.control.cc.remote.ops;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.remote.RemoteOp;
import edu.uci.ics.hyracks.control.common.base.INodeController;
public class JobCompleteNotifier implements RemoteOp<Void> {
private String nodeId;
private JobId jobId;
+ private JobStatus status;
- public JobCompleteNotifier(String nodeId, JobId jobId) {
+ public JobCompleteNotifier(String nodeId, JobId jobId, JobStatus status) {
this.nodeId = nodeId;
this.jobId = jobId;
+ this.status = status;
}
@Override
public Void execute(INodeController node) throws Exception {
- node.cleanUpJob(jobId);
+ node.cleanUpJob(jobId, status);
return null;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index e86741e..c5df7bf 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -462,7 +462,8 @@
for (TaskCluster tc : inProgressTaskClusters) {
abortTaskCluster(findLastTaskClusterAttempt(tc));
}
- jobRun.setStatus(JobStatus.FAILURE, exception);
+ inProgressTaskClusters.clear();
+ ccs.getWorkQueue().schedule(new JobCleanupWork(ccs, jobRun.getJobId(), JobStatus.FAILURE, exception));
}
private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index d792aa6..006874b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -51,7 +51,7 @@
final JobCompleteNotifier[] jcns = new JobCompleteNotifier[targetNodes.size()];
int i = 0;
for (String n : targetNodes) {
- jcns[i++] = new JobCompleteNotifier(n, jobId);
+ jcns[i++] = new JobCompleteNotifier(n, jobId, status);
NodeControllerState ncs = ccs.getNodeMap().get(n);
if (ncs != null) {
ncs.getActiveJobIds().remove(jobId);
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index efaa921..313f92e 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
@@ -37,7 +38,7 @@
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
- public void cleanUpJob(JobId jobId) throws Exception;
+ public void cleanUpJob(JobId jobId, JobStatus status) throws Exception;
public void notifyRegistration(IClusterController ccs) throws Exception;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 3cd3c2f..373d8b3 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -47,6 +47,7 @@
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
import edu.uci.ics.hyracks.control.common.base.IClusterController;
@@ -241,8 +242,8 @@
}
@Override
- public void cleanUpJob(JobId jobId) throws Exception {
- CleanupJobWork cjw = new CleanupJobWork(this, jobId);
+ public void cleanUpJob(JobId jobId, JobStatus status) throws Exception {
+ CleanupJobWork cjw = new CleanupJobWork(this, jobId, status);
queue.scheduleAndSync(cjw);
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
index 7ae27ba..7fbc603 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.api.job.IJobletEventListener;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
import edu.uci.ics.hyracks.control.nc.Joblet;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
@@ -31,9 +32,12 @@
private final JobId jobId;
- public CleanupJobWork(NodeControllerService ncs, JobId jobId) {
+ private JobStatus status;
+
+ public CleanupJobWork(NodeControllerService ncs, JobId jobId, JobStatus status) {
this.ncs = ncs;
this.jobId = jobId;
+ this.status = status;
}
@Override
@@ -46,7 +50,7 @@
if (joblet != null) {
IJobletEventListener listener = joblet.getJobletEventListener();
if (listener != null) {
- listener.jobletFinish();
+ listener.jobletFinish(status);
}
ncs.getPartitionManager().unregisterPartitions(jobId);
joblet.close();