Minor cleanups for JobManager and ICCApplicationContext.
Change-Id: Iba7b6fd9b75a141fcc9f589db4b8cdeac570ec2d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1471
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
index 5f4877d..55cc5fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
@@ -22,7 +22,6 @@
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.job.IJobLifecycleListener;
-import org.apache.hyracks.api.job.resource.IJobCapacityController;
/**
* Application Context at the Cluster Controller for an application.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 1a363c7..ae0f361 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -21,6 +21,7 @@
import java.io.File;
import java.io.FileReader;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -54,11 +55,12 @@
import org.apache.hyracks.control.cc.dataset.DatasetDirectoryService;
import org.apache.hyracks.control.cc.dataset.IDatasetDirectoryService;
import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.control.cc.job.JobManager;
import org.apache.hyracks.control.cc.scheduler.IResourceManager;
import org.apache.hyracks.control.cc.scheduler.ResourceManager;
import org.apache.hyracks.control.cc.web.WebServer;
-import org.apache.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
import org.apache.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
+import org.apache.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
import org.apache.hyracks.control.cc.work.GetThreadDumpWork.ThreadDumpRun;
import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork;
import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork;
@@ -203,10 +205,19 @@
}
// Job manager is in charge of job lifecycle management.
- Constructor<?> jobManagerConstructor = this.getClass().getClassLoader().loadClass(ccConfig.jobManagerClassName)
- .getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
- jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController);
-
+ try {
+ Constructor<?> jobManagerConstructor = this.getClass().getClassLoader()
+ .loadClass(ccConfig.jobManagerClassName)
+ .getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
+ jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController);
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
+ | InvocationTargetException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "class " + ccConfig.jobManagerClassName + " could not be used: ", e);
+ }
+ // Falls back to the default implementation if the user-provided class name is not valid.
+ jobManager = new JobManager(ccConfig, this, jobCapacityController);
+ }
}
private void connectNCs() throws Exception {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
index 8f621df..180e850 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -86,13 +86,14 @@
JobRun get(JobId jobId);
/**
- * Retrieves a historical job from a given job id.
+ * Retrieves the exception records for a historical job.
*
* @param jobId,
* the job id.
- * @return the matched historical jobs that have been run but not yet discarded.
+ * @return the exception records of a job that has been removed from the archive but still stays in the stored
+ * history, where each historical job is paired with exceptions during its execution.
*/
- List<Exception> getRunHistory(JobId jobId);
+ List<Exception> getExceptionHistory(JobId jobId);
/**
* @return all jobs that are currently running.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 52ad301..e3f9557 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -44,6 +44,7 @@
import org.apache.hyracks.control.cc.NodeControllerState;
import org.apache.hyracks.control.cc.application.CCApplicationContext;
import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue;
import org.apache.hyracks.control.cc.scheduler.IJobQueue;
import org.apache.hyracks.control.cc.work.JobCleanupWork;
import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -58,11 +59,11 @@
private static final Logger LOGGER = Logger.getLogger(JobManager.class.getName());
private final ClusterControllerService ccs;
- private final IJobQueue jobQueue;
private final Map<JobId, JobRun> activeRunMap;
private final Map<JobId, JobRun> runMapArchive;
private final Map<JobId, List<Exception>> runMapHistory;
private final IJobCapacityController jobCapacityController;
+ private IJobQueue jobQueue;
public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController)
throws HyracksException {
@@ -74,7 +75,11 @@
jobQueue = (IJobQueue) jobQueueConstructor.newInstance(this, this.jobCapacityController);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
| InvocationTargetException e) {
- throw HyracksException.create(ErrorCode.CLASS_LOADING_ISSUE, e, e.getMessage());
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "class " + ccConfig.jobQueueClassName + " could not be used: ", e);
+ }
+ // Falls back to the default implementation if the user-provided class name is not valid.
+ jobQueue = new FIFOJobQueue(this, jobCapacityController);
}
activeRunMap = new HashMap<>();
runMapArchive = new LinkedHashMap<JobId, JobRun>() {
@@ -248,7 +253,7 @@
}
@Override
- public List<Exception> getRunHistory(JobId jobId) {
+ public List<Exception> getExceptionHistory(JobId jobId) {
return runMapHistory.get(jobId);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
index f7ef175..713cf96 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -55,7 +55,7 @@
}
});
} else {
- final List<Exception> exceptions = jobManager.getRunHistory(jobId);
+ final List<Exception> exceptions = jobManager.getExceptionHistory(jobId);
ccs.getExecutor().execute(new Runnable() {
@Override
public void run() {