[ASTERIXDB-2003][FAIL] Abort jobs failing during job start

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Prevent NPE or unmodifiable list in JobCleanupWork and
  JobletCleanupNotificationWork.
- Abort job if a failure happens during job start

Change-Id: If6fe4ed9084270f9f22ee4b4c71936d679c8b883
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1904
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 2150bdd..f18a917 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -521,7 +521,7 @@
         }
     }
 
-    private void abortJob(List<Exception> exceptions) {
+    public void abortJob(List<Exception> exceptions) {
         Set<TaskCluster> inProgressTaskClustersCopy = new HashSet<>(inProgressTaskClusters);
         for (TaskCluster tc : inProgressTaskClustersCopy) {
             abortTaskCluster(findLastTaskClusterAttempt(tc), TaskClusterAttempt.TaskClusterStatus.ABORTED);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index c1a7899..abf1d57 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -45,7 +45,6 @@
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue;
 import org.apache.hyracks.control.cc.scheduler.IJobQueue;
-import org.apache.hyracks.control.cc.work.JobCleanupWork;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -318,8 +317,12 @@
         try {
             run.getExecutor().startJob();
         } catch (Exception e) {
-            ccs.getWorkQueue().schedule(new JobCleanupWork(ccs.getJobManager(), run.getJobId(), JobStatus.FAILURE,
-                    Collections.singletonList(e)));
+            LOGGER.log(Level.SEVERE, "Aborting " + run.getJobId() + " due to failure during job start", e);
+            final List<Exception> exceptions = Collections.singletonList(e);
+            // fail the job then abort it
+            run.setStatus(JobStatus.FAILURE, exceptions);
+            // abort job will trigger JobCleanupWork
+            run.getExecutor().abortJob(exceptions);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 5f29981..502ac50 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.cc.work;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -55,8 +56,12 @@
         } catch (HyracksException e) {
             // Fail the job with the caught exception during final completion.
             JobRun run = jobManager.get(jobId);
-            run.getExceptions().add(e);
-            run.setStatus(JobStatus.FAILURE, run.getExceptions());
+            List<Exception> completionException = new ArrayList<>();
+            if (run.getExceptions() != null && !run.getExceptions().isEmpty()) {
+                completionException.addAll(run.getExceptions());
+            }
+            completionException.add(0, e);
+            run.setStatus(JobStatus.FAILURE, completionException);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 6a8e631..5bf721b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.control.cc.work;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -67,8 +69,12 @@
                 jobManager.finalComplete(run);
             } catch (HyracksException e) {
                 // Fail the job with the caught exception during final completion.
-                run.getExceptions().add(e);
-                run.setStatus(JobStatus.FAILURE, run.getExceptions());
+                List<Exception> completionException = new ArrayList<>();
+                if (run.getExceptions() != null && !run.getExceptions().isEmpty()) {
+                    completionException.addAll(run.getExceptions());
+                }
+                completionException.add(0, e);
+                run.setStatus(JobStatus.FAILURE, completionException);
             }
         }
     }