[NO ISSUE]: Avoid incrementing job counters for job cleanup tasks
Change-Id: I70cd1db750cd30930f61a790e5c1bc69f4f866cf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18298
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 3a954f4..2b03da5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.exceptions.IError;
+import org.apache.hyracks.api.exceptions.IFormattedException;
import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
@@ -148,7 +149,6 @@
// trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecycle of the job.
// Therefore, we do not remove the job out of activeRunMap here.
jobRun.getExecutor().cancelJob(callback);
- incrementCancelledJobs();
return;
}
// Removes a pending job.
@@ -237,11 +237,7 @@
@Override
public void finalComplete(JobRun run) throws HyracksException {
checkJob(run);
- if (run.getPendingStatus() == JobStatus.FAILURE) {
- incrementFailedJobs();
- } else if (run.getPendingStatus() == JobStatus.TERMINATED) {
- incrementSuccessfulJobs();
- }
+ boolean successful = run.getPendingStatus() == JobStatus.TERMINATED;
JobId jobId = run.getJobId();
Throwable caughtException = null;
@@ -256,6 +252,8 @@
run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
run.setEndTime(System.currentTimeMillis());
if (activeRunMap.remove(jobId) != null) {
+ incrementJobCounters(run, successful);
+
// non-active jobs have zero capacity
releaseJobCapacity(run);
}
@@ -283,6 +281,41 @@
}
}
+ /**
+ * Increments the job counters depending on the status
+ *
+ * @param run job run
+ * @param successful if job is successful
+ */
+ private void incrementJobCounters(JobRun run, boolean successful) {
+ if (successful) {
+ incrementSuccessfulJobs();
+ return;
+ }
+
+ if (run.getExceptions() != null && !run.getExceptions().isEmpty() && isCancelledJob(run)) {
+ incrementCancelledJobs();
+ } else {
+ incrementFailedJobs();
+ }
+ }
+
+ /**
+ * Checks the exceptions for a job run to see if the job is cancelled
+ *
+ * @param run job run
+ * @return true if cancelled job, false otherwise
+ */
+ private boolean isCancelledJob(JobRun run) {
+ List<Exception> exceptions = run.getExceptions();
+ for (Exception e : exceptions) {
+ if (e instanceof IFormattedException f && f.getErrorCode() == ErrorCode.JOB_CANCELED.intValue()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public Collection<JobRun> getRunningJobs() {
return activeRunMap.values();