Job abort now has atmost-one semantics
git-svn-id: https://hyracks.googlecode.com/svn/trunk@590 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 5766288..514301a 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.control.cc.job;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -28,11 +29,13 @@
private final List<JobAttempt> attempts;
private JobStatus status;
private Set<ConstraintExpression> constraints;
+ private Set<Integer> abortedAttempts;
public JobRun(JobPlan plan, Set<ConstraintExpression> constraints) {
this.plan = plan;
attempts = new ArrayList<JobAttempt>();
this.constraints = constraints;
+ abortedAttempts = new HashSet<Integer>();
}
public JobPlan getJobPlan() {
@@ -69,4 +72,8 @@
wait();
}
}
+
+ public synchronized boolean registerAbort(int attempt) {
+ return abortedAttempts.add(attempt);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/JobLifecycleHelper.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/JobLifecycleHelper.java
index be42e22..d1d1469 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/JobLifecycleHelper.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/JobLifecycleHelper.java
@@ -4,24 +4,34 @@
import java.util.UUID;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.manager.events.JobAttemptStartEvent;
import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
import edu.uci.ics.hyracks.control.cc.remote.ops.JobletAborter;
public class JobLifecycleHelper {
- public static void abortJob(ClusterControllerService ccs, UUID jobId, int attempt, Set<String> targetNodes) {
- if (!targetNodes.isEmpty()) {
- JobletAborter[] jas = new JobletAborter[targetNodes.size()];
- int i = 0;
- for (String nodeId : targetNodes) {
- jas[i++] = new JobletAborter(nodeId, jobId, attempt);
- }
- try {
- RemoteRunner.runRemote(ccs, jas, null);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ public static void abortJob(final ClusterControllerService ccs, final UUID jobId, final int attempt,
+ final Set<String> targetNodes) {
+ JobRun run = ccs.getRunMap().get(jobId);
+ if (run != null && run.registerAbort(attempt)) {
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ if (!targetNodes.isEmpty()) {
+ JobletAborter[] jas = new JobletAborter[targetNodes.size()];
+ int i = 0;
+ for (String nodeId : targetNodes) {
+ jas[i++] = new JobletAborter(nodeId, jobId, attempt);
+ }
+ try {
+ RemoteRunner.runRemote(ccs, jas, null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ ccs.getJobQueue().schedule(new JobAttemptStartEvent(ccs, jobId));
+ }
+ });
}
- ccs.getJobQueue().schedule(new JobAttemptStartEvent(ccs, jobId));
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAbortEvent.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAbortEvent.java
index 9a0eff1..ecf2ea0 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAbortEvent.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAbortEvent.java
@@ -56,11 +56,6 @@
}
}
- ccs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- JobLifecycleHelper.abortJob(ccs, jobId, attempt, targetNodes);
- }
- });
+ JobLifecycleHelper.abortJob(ccs, jobId, attempt, targetNodes);
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java
index 9c9d569..1f8e9d4 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java
@@ -50,11 +50,6 @@
ncState.getActiveJobIds().remove(jobId);
}
}
- ccs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- JobLifecycleHelper.abortJob(ccs, jobId, attempt, targetNodes);
- }
- });
+ JobLifecycleHelper.abortJob(ccs, jobId, attempt, targetNodes);
}
}
\ No newline at end of file