[NO ISSUE][CLUS] Ensure Active Jobs Capacity is Released Only Once
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Ensure active jobs capacity is released only once.
- Warn if the cluster maximum capacity is exceeded.
Change-Id: Ia53c6918a68f7050bd8af482dbe8e161d1315844
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2938
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
index 8ea1fa7..b123a5e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -26,12 +26,15 @@
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
import org.apache.hyracks.control.cc.scheduler.IResourceManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
// To avoid the computation cost for checking the capacity constraint for each node,
// currently the admit/allocation decisions are based on the aggregated resource information.
// TODO(buyingyi): investigate partition-aware resource control.
public class JobCapacityController implements IJobCapacityController {
+ private static final Logger LOGGER = LogManager.getLogger();
private final IResourceManager resourceManager;
public JobCapacityController(IResourceManager resourceManager) {
@@ -71,6 +74,16 @@
int aggregatedNumCores = currentCapacity.getAggregatedCores();
currentCapacity.setAggregatedMemoryByteSize(aggregatedMemoryByteSize + reqAggregatedMemoryByteSize);
currentCapacity.setAggregatedCores(aggregatedNumCores + reqAggregatedNumCores);
+ ensureMaxCapacity();
}
+ private void ensureMaxCapacity() {
+ final IClusterCapacity currentCapacity = resourceManager.getCurrentCapacity();
+ final IReadOnlyClusterCapacity maximumCapacity = resourceManager.getMaximumCapacity();
+ if (currentCapacity.getAggregatedCores() > maximumCapacity.getAggregatedCores()
+ || currentCapacity.getAggregatedMemoryByteSize() > maximumCapacity.getAggregatedMemoryByteSize()) {
+ LOGGER.warn("Current cluster available capacity {} is more than its maximum capacity {}", currentCapacity,
+ maximumCapacity);
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 7dc636c..7e1ca61 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -229,8 +229,9 @@
}
run.setStatus(run.getPendingStatus(), run.getPendingExceptions());
run.setEndTime(System.currentTimeMillis());
- if (activeRunMap.remove(jobId) == null) {
- LOGGER.warn("Job {} was not found running but is getting archived and capacity released", jobId);
+ if (activeRunMap.remove(jobId) != null) {
+ // non-active jobs have zero capacity
+ releaseJobCapacity(run);
}
runMapArchive.put(jobId, run);
runMapHistory.put(jobId, run.getExceptions());
@@ -247,10 +248,6 @@
}
}
- // Releases cluster capacitys occupied by the job.
- JobSpecification job = run.getJobSpecification();
- jobCapacityController.release(job);
-
// Picks the next job to execute.
pickJobsToRun();
@@ -347,4 +344,9 @@
throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
}
}
+
+ private void releaseJobCapacity(JobRun jobRun) {
+ final JobSpecification job = jobRun.getJobSpecification();
+ jobCapacityController.release(job);
+ }
}