[ASTERIXDB-2197][FAIL] Abort Job on Failures in NotifyTaskCompleteWork
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Abort the job if any unexpected failure happens on
NotifyTaskCompleteWork to ensure that the job will
not be waiting forever.
Change-Id: I60c911c7aae872ee6b94e68efa53638207c0180d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2218
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 01201a6..ab7a3db 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -642,27 +642,32 @@
return doomed;
}
- public void notifyTaskComplete(TaskAttempt ta) throws HyracksException {
- TaskAttemptId taId = ta.getTaskAttemptId();
- TaskCluster tc = ta.getTask().getTaskCluster();
- TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
- if (lastAttempt == null || taId.getAttempt() != lastAttempt.getAttempt()) {
- LOGGER.warning(
- "Ignoring task complete notification: " + taId + " -- Current last attempt = " + lastAttempt);
- return;
- }
- TaskAttempt.TaskStatus taStatus = ta.getStatus();
- if (taStatus != TaskAttempt.TaskStatus.RUNNING) {
- LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
- return;
- }
- ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
- ta.setEndTime(System.currentTimeMillis());
- if (lastAttempt.decrementPendingTasksCounter() == 0) {
- lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
- lastAttempt.setEndTime(System.currentTimeMillis());
- inProgressTaskClusters.remove(tc);
- startRunnableActivityClusters();
+ public void notifyTaskComplete(TaskAttempt ta) {
+ try {
+ TaskAttemptId taId = ta.getTaskAttemptId();
+ TaskCluster tc = ta.getTask().getTaskCluster();
+ TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+ if (lastAttempt == null || taId.getAttempt() != lastAttempt.getAttempt()) {
+ LOGGER.warning(() -> "Ignoring task complete notification: " + taId + " -- Current last attempt = "
+ + lastAttempt);
+ return;
+ }
+ TaskAttempt.TaskStatus taStatus = ta.getStatus();
+ if (taStatus != TaskAttempt.TaskStatus.RUNNING) {
+ LOGGER.warning(() -> "Spurious task complete notification: " + taId + " Current state = " + taStatus);
+ return;
+ }
+ ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
+ ta.setEndTime(System.currentTimeMillis());
+ if (lastAttempt.decrementPendingTasksCounter() == 0) {
+ lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
+ lastAttempt.setEndTime(System.currentTimeMillis());
+ inProgressTaskClusters.remove(tc);
+ startRunnableActivityClusters();
+ }
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, e, () -> "Unexpected failure. Aborting job " + jobRun.getJobId());
+ abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE);
}
}
@@ -724,6 +729,7 @@
ta -> HyracksException.create(ErrorCode.NODE_FAILED, ta.getNodeId()));
startRunnableActivityClusters();
} catch (Exception e) {
+ LOGGER.log(Level.SEVERE, e, () -> "Unexpected failure. Aborting job " + jobRun.getJobId());
abortJob(Collections.singletonList(e), NoOpCallback.INSTANCE);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
index f4f2f52..e2f8b0d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
@@ -21,7 +21,6 @@
import java.util.Map;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.job.IJobManager;
@@ -42,23 +41,19 @@
@Override
protected void performEvent(TaskAttempt ta) {
- try {
- IJobManager jobManager = ccs.getJobManager();
- JobRun run = jobManager.get(jobId);
- if (statistics != null) {
- JobProfile jobProfile = run.getJobProfile();
- Map<String, JobletProfile> jobletProfiles = jobProfile.getJobletProfiles();
- JobletProfile jobletProfile = jobletProfiles.get(nodeId);
- if (jobletProfile == null) {
- jobletProfile = new JobletProfile(nodeId);
- jobletProfiles.put(nodeId, jobletProfile);
- }
- jobletProfile.getTaskProfiles().put(taId, statistics);
+ IJobManager jobManager = ccs.getJobManager();
+ JobRun run = jobManager.get(jobId);
+ if (statistics != null) {
+ JobProfile jobProfile = run.getJobProfile();
+ Map<String, JobletProfile> jobletProfiles = jobProfile.getJobletProfiles();
+ JobletProfile jobletProfile = jobletProfiles.get(nodeId);
+ if (jobletProfile == null) {
+ jobletProfile = new JobletProfile(nodeId);
+ jobletProfiles.put(nodeId, jobletProfile);
}
- run.getExecutor().notifyTaskComplete(ta);
- } catch (HyracksException e) {
- e.printStackTrace();
+ jobletProfile.getTaskProfiles().put(taId, statistics);
}
+ run.getExecutor().notifyTaskComplete(ta);
}
@Override