[ASTERIXDB-2197][FAIL] Abort Job on Failures in NotifyTaskCompleteWork

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Abort the job if any unexpected failure happens on
  NotifyTaskCompleteWork to ensure that the job will
  not be waiting forever.

Change-Id: I60c911c7aae872ee6b94e68efa53638207c0180d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2218
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 01201a6..ab7a3db 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -642,27 +642,32 @@
         return doomed;
     }
 
-    public void notifyTaskComplete(TaskAttempt ta) throws HyracksException {
-        TaskAttemptId taId = ta.getTaskAttemptId();
-        TaskCluster tc = ta.getTask().getTaskCluster();
-        TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
-        if (lastAttempt == null || taId.getAttempt() != lastAttempt.getAttempt()) {
-            LOGGER.warning(
-                    "Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt);
-            return;
-        }
-        TaskAttempt.TaskStatus taStatus = ta.getStatus();
-        if (taStatus != TaskAttempt.TaskStatus.RUNNING) {
-            LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
-            return;
-        }
-        ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
-        ta.setEndTime(System.currentTimeMillis());
-        if (lastAttempt.decrementPendingTasksCounter() == 0) {
-            lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
-            lastAttempt.setEndTime(System.currentTimeMillis());
-            inProgressTaskClusters.remove(tc);
-            startRunnableActivityClusters();
+    public void notifyTaskComplete(TaskAttempt ta) {
+        try {
+            TaskAttemptId taId = ta.getTaskAttemptId();
+            TaskCluster tc = ta.getTask().getTaskCluster();
+            TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+            if (lastAttempt == null || taId.getAttempt() != lastAttempt.getAttempt()) {
+                LOGGER.warning(() -> "Ignoring task complete notification: " + taId + " -- Current last attempt = "
+                        + lastAttempt);
+                return;
+            }
+            TaskAttempt.TaskStatus taStatus = ta.getStatus();
+            if (taStatus != TaskAttempt.TaskStatus.RUNNING) {
+                LOGGER.warning(() -> "Spurious task complete notification: " + taId + " Current state = " + taStatus);
+                return;
+            }
+            ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
+            ta.setEndTime(System.currentTimeMillis());
+            if (lastAttempt.decrementPendingTasksCounter() == 0) {
+                lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
+                lastAttempt.setEndTime(System.currentTimeMillis());
+                inProgressTaskClusters.remove(tc);
+                startRunnableActivityClusters();
+            }
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, e, () -> "Unexpected failure. Aborting job " + jobRun.getJobId());
+            abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE);
         }
     }
 
@@ -724,6 +729,7 @@
                     ta -> HyracksException.create(ErrorCode.NODE_FAILED, ta.getNodeId()));
             startRunnableActivityClusters();
         } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, e, () -> "Unexpected failure. Aborting job " + jobRun.getJobId());
             abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE);
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
index f4f2f52..e2f8b0d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
@@ -21,7 +21,6 @@
 import java.util.Map;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.job.IJobManager;
@@ -42,23 +41,19 @@
 
     @Override
     protected void performEvent(TaskAttempt ta) {
-        try {
-            IJobManager jobManager = ccs.getJobManager();
-            JobRun run = jobManager.get(jobId);
-            if (statistics != null) {
-                JobProfile jobProfile = run.getJobProfile();
-                Map<String, JobletProfile> jobletProfiles = jobProfile.getJobletProfiles();
-                JobletProfile jobletProfile = jobletProfiles.get(nodeId);
-                if (jobletProfile == null) {
-                    jobletProfile = new JobletProfile(nodeId);
-                    jobletProfiles.put(nodeId, jobletProfile);
-                }
-                jobletProfile.getTaskProfiles().put(taId, statistics);
+        IJobManager jobManager = ccs.getJobManager();
+        JobRun run = jobManager.get(jobId);
+        if (statistics != null) {
+            JobProfile jobProfile = run.getJobProfile();
+            Map<String, JobletProfile> jobletProfiles = jobProfile.getJobletProfiles();
+            JobletProfile jobletProfile = jobletProfiles.get(nodeId);
+            if (jobletProfile == null) {
+                jobletProfile = new JobletProfile(nodeId);
+                jobletProfiles.put(nodeId, jobletProfile);
             }
-            run.getExecutor().notifyTaskComplete(ta);
-        } catch (HyracksException e) {
-            e.printStackTrace();
+            jobletProfile.getTaskProfiles().put(taId, statistics);
         }
+        run.getExecutor().notifyTaskComplete(ta);
     }
 
     @Override