Add Parameter To Control Job Queue Capacity
Exposed CC property job.queue.capacity (-job-queue-capacity) to enable
ability to override default capacity of 4k
Change-Id: I36d0727de58dbe9697e3693e49b39f8c8ab32ce8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1562
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
index 21fc08f..8fe542f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -118,4 +118,8 @@
*/
Collection<JobRun> getArchivedJobs();
+ /**
+ * @return the maximum number of jobs to queue before rejecting new jobs
+ */
+ int getJobQueueCapacity();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index b35de3d..649487b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -283,6 +283,11 @@
return runMapHistory.get(jobId);
}
+ @Override
+ public int getJobQueueCapacity() {
+ return ccs.getCCConfig().getJobQueueCapacity();
+ }
+
private void pickJobsToRun() throws HyracksException {
List<JobRun> selectedRuns = jobQueue.pull();
for (JobRun run : selectedRuns) {
@@ -335,5 +340,4 @@
throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER);
}
}
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index 0377692..833baac 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -44,22 +44,23 @@
public class FIFOJobQueue implements IJobQueue {
private static final Logger LOGGER = Logger.getLogger(FIFOJobQueue.class.getName());
- private static final int CAPACITY = 4096;
private final Map<JobId, JobRun> jobListMap = new LinkedHashMap<>();
private final IJobManager jobManager;
private final IJobCapacityController jobCapacityController;
+ private final int jobQueueCapacity;
public FIFOJobQueue(IJobManager jobManager, IJobCapacityController jobCapacityController) {
this.jobManager = jobManager;
this.jobCapacityController = jobCapacityController;
+ this.jobQueueCapacity = jobManager.getJobQueueCapacity();
}
@Override
public void add(JobRun run) throws HyracksException {
int size = jobListMap.size();
- if (size >= CAPACITY) {
- throw HyracksException.create(ErrorCode.JOB_QUEUE_FULL, CAPACITY);
+ if (size >= jobQueueCapacity) {
+ throw HyracksException.create(ErrorCode.JOB_QUEUE_FULL, jobQueueCapacity);
}
jobListMap.put(run.getJobId(), run);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 97b05e7..1628248 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -285,6 +285,7 @@
when(ccs.getApplicationContext()).thenReturn(appCtx);
when(ccs.getJobLogFile()).thenReturn(logFile);
when(ccs.getNodeManager()).thenReturn(nodeManager);
+ when(ccs.getCCConfig()).thenReturn(ccConfig);
return ccs;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index fbde58c..97438c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -35,6 +35,7 @@
import org.apache.hyracks.util.file.FileUtil;
import org.ini4j.Ini;
+@SuppressWarnings("SameParameterValue")
public class CCConfig extends ControllerConfig {
public static String defaultDir = System.getProperty("java.io.tmpdir");
@@ -60,6 +61,7 @@
ROOT_DIR(STRING, (Supplier<String>)() -> FileUtil.joinPath(defaultDir, "ClusterControllerService")),
CLUSTER_TOPOLOGY(STRING),
JOB_QUEUE_CLASS(STRING, "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue"),
+ JOB_QUEUE_CAPACITY(INTEGER, 4096),
JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager");
private final IOptionType parser;
@@ -143,6 +145,8 @@
return "Sets the XML file that defines the cluster topology";
case JOB_QUEUE_CLASS:
return "Specify the implementation class name for the job queue";
+ case JOB_QUEUE_CAPACITY:
+ return "The maximum number of jobs to queue before rejecting new jobs";
case JOB_MANAGER_CLASS:
return "Specify the implementation class name for the job manager";
default:
@@ -333,4 +337,8 @@
public void setJobManagerClass(String jobManagerClass) {
configManager.set(Option.JOB_MANAGER_CLASS, jobManagerClass);
}
+
+ public int getJobQueueCapacity() {
+ return getAppConfig().getInt(Option.JOB_QUEUE_CAPACITY);
+ }
}