[ASTERIXDB-3467][HYR] ConcurrentModificationException when picking new jobs to run
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
When picking new jobs from the job queue, if a job cannot be
picked (e.g. due to cluster state), then collect those jobs
first instead of failing them and calling jobManager.prepareComplete()
one by one. Completing them one by one could lead to one job
calling pickJobsToRun() again and concurrently modifying
the job queue map.
Ext-ref: MB-62857
Change-Id: I6ec0c6625d9d84cd0797964781256e93f5346a91
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18512
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
index 236056c..1296d04 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -53,7 +53,7 @@
public JobSubmissionStatus allocate(JobSpecification job, JobId jobId, Set<JobFlag> jobFlags)
throws HyracksException {
if (!ccApp.acceptingJobs(jobFlags)) {
- throw HyracksDataException.create(ErrorCode.JOB_REJECTED, job);
+ throw HyracksDataException.create(ErrorCode.JOB_REJECTED, jobId);
}
IClusterCapacity requiredCapacity = job.getRequiredClusterCapacity();
long reqAggregatedMemoryByteSize = requiredCapacity.getAggregatedMemoryByteSize();
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 7da6bbd..e94c12e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -146,7 +146,7 @@
126 = Illegal state. %1$s
127 = Decoding error - %1$s
128 = Unsupported copy to specification: PARTITION BY %1$s, ORDER BY %2$s
-129 = Job %1$s not run. Cluster is not accepting jobs
+129 = Job %1$s failed to run. Cluster is not accepting jobs.
130 = Frame data=%1$s (requiring %2$s) is bigger than the sort budget. Used=%3$s, max=%4$s. Please increase the sort memory budget.
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index d003853..ec9333c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
@@ -81,9 +82,10 @@
}
@Override
- public List<JobRun> pull() {
+ public synchronized List<JobRun> pull() {
List<JobRun> jobRuns = new ArrayList<>();
Iterator<JobRun> runIterator = jobListMap.values().iterator();
+ List<Pair<JobRun, List<Exception>>> failingJobs = null;
while (runIterator.hasNext()) {
JobRun run = runIterator.next();
JobSpecification job = run.getJobSpecification();
@@ -98,13 +100,21 @@
runIterator.remove(); // Removes the selected job.
}
} catch (HyracksException exception) {
- // The required capacity exceeds maximum capacity.
- List<Exception> exceptions = new ArrayList<>();
+ if (failingJobs == null) {
+ failingJobs = new ArrayList<>();
+ }
+ // The required capacity exceeds maximum capacity or the job cannot be run at this time.
+ List<Exception> exceptions = new ArrayList<>(1);
exceptions.add(exception);
+ failingJobs.add(Pair.of(run, exceptions));
runIterator.remove(); // Removes the job from the queue.
+ }
+ }
+ if (failingJobs != null) {
+ for (int i = 0; i < failingJobs.size(); i++) {
try {
- // Fails the job.
- jobManager.prepareComplete(run, JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
+ Pair<JobRun, List<Exception>> job = failingJobs.get(i);
+ jobManager.prepareComplete(job.getLeft(), JobStatus.FAILURE_BEFORE_EXECUTION, job.getRight());
} catch (HyracksException e) {
LOGGER.log(Level.ERROR, e.getMessage(), e);
}