Job abort now has atmost-one semantics

git-svn-id: https://hyracks.googlecode.com/svn/trunk@590 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 5766288..514301a 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.control.cc.job;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -28,11 +29,13 @@
     private final List<JobAttempt> attempts;
     private JobStatus status;
     private Set<ConstraintExpression> constraints;
+    private Set<Integer> abortedAttempts;
 
     public JobRun(JobPlan plan, Set<ConstraintExpression> constraints) {
         this.plan = plan;
         attempts = new ArrayList<JobAttempt>();
         this.constraints = constraints;
+        abortedAttempts = new HashSet<Integer>();
     }
 
     public JobPlan getJobPlan() {
@@ -69,4 +72,8 @@
             wait();
         }
     }
+
+    public synchronized boolean registerAbort(int attempt) {
+        return abortedAttempts.add(attempt);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/JobLifecycleHelper.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/JobLifecycleHelper.java
index be42e22..d1d1469 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/JobLifecycleHelper.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/JobLifecycleHelper.java
@@ -4,24 +4,34 @@
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.job.manager.events.JobAttemptStartEvent;
 import edu.uci.ics.hyracks.control.cc.remote.RemoteRunner;
 import edu.uci.ics.hyracks.control.cc.remote.ops.JobletAborter;
 
 public class JobLifecycleHelper {
-    public static void abortJob(ClusterControllerService ccs, UUID jobId, int attempt, Set<String> targetNodes) {
-        if (!targetNodes.isEmpty()) {
-            JobletAborter[] jas = new JobletAborter[targetNodes.size()];
-            int i = 0;
-            for (String nodeId : targetNodes) {
-                jas[i++] = new JobletAborter(nodeId, jobId, attempt);
-            }
-            try {
-                RemoteRunner.runRemote(ccs, jas, null);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
+    public static void abortJob(final ClusterControllerService ccs, final UUID jobId, final int attempt,
+            final Set<String> targetNodes) {
+        JobRun run = ccs.getRunMap().get(jobId);
+        if (run != null && run.registerAbort(attempt)) {
+            ccs.getExecutor().execute(new Runnable() {
+                @Override
+                public void run() {
+                    if (!targetNodes.isEmpty()) {
+                        JobletAborter[] jas = new JobletAborter[targetNodes.size()];
+                        int i = 0;
+                        for (String nodeId : targetNodes) {
+                            jas[i++] = new JobletAborter(nodeId, jobId, attempt);
+                        }
+                        try {
+                            RemoteRunner.runRemote(ccs, jas, null);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                    ccs.getJobQueue().schedule(new JobAttemptStartEvent(ccs, jobId));
+                }
+            });
         }
-        ccs.getJobQueue().schedule(new JobAttemptStartEvent(ccs, jobId));
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAbortEvent.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAbortEvent.java
index 9a0eff1..ecf2ea0 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAbortEvent.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobAbortEvent.java
@@ -56,11 +56,6 @@
             }
         }
 
-        ccs.getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                JobLifecycleHelper.abortJob(ccs, jobId, attempt, targetNodes);
-            }
-        });
+        JobLifecycleHelper.abortJob(ccs, jobId, attempt, targetNodes);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java
index 9c9d569..1f8e9d4 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/StageletFailureEvent.java
@@ -50,11 +50,6 @@
                 ncState.getActiveJobIds().remove(jobId);
             }
         }
-        ccs.getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                JobLifecycleHelper.abortJob(ccs, jobId, attempt, targetNodes);
-            }
-        });
+        JobLifecycleHelper.abortJob(ccs, jobId, attempt, targetNodes);
     }
 }
\ No newline at end of file