[NO ISSUE][HYR] Ignore Job Notifications For Unknown Jobs
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Ignore notifications for unknown jobs.
Change-Id: I743b469f56f6126c2fdd4161fcaa48fc5f4d7218
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2793
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 3ba25f5..b6b3e40 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -157,8 +157,8 @@
@Override
public void prepareComplete(JobRun run, JobStatus status, List<Exception> exceptions) throws HyracksException {
- ccs.removeJobParameterByteStore(run.getJobId());
checkJob(run);
+ ccs.removeJobParameterByteStore(run.getJobId());
if (status == JobStatus.FAILURE_BEFORE_EXECUTION) {
run.setPendingStatus(JobStatus.FAILURE, exceptions);
finalComplete(run);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index f847cdb..77d2f82 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -52,10 +52,14 @@
@Override
public void run() {
if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Cleanup for JobRun with id: " + jobId);
+ LOGGER.info("Cleanup for job: {}", jobId);
+ }
+ final JobRun jobRun = jobManager.get(jobId);
+ if (jobRun == null) {
+ LOGGER.debug("Ignoring cleanup for unknown job: {}", jobId);
+ return;
}
try {
- JobRun jobRun = jobManager.get(jobId);
jobManager.prepareComplete(jobRun, status, exceptions);
callback.setValue(null);
} catch (HyracksException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
index e2f8b0d..0c53142 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
@@ -43,6 +43,9 @@
protected void performEvent(TaskAttempt ta) {
IJobManager jobManager = ccs.getJobManager();
JobRun run = jobManager.get(jobId);
+ if (run == null) {
+ return;
+ }
if (statistics != null) {
JobProfile jobProfile = run.getJobProfile();
Map<String, JobletProfile> jobletProfiles = jobProfile.getJobletProfiles();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index a2be15c..e7af300 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -45,6 +45,9 @@
LOGGER.log(Level.WARN, "Executing task failure work for " + this, exceptions.get(0));
IJobManager jobManager = ccs.getJobManager();
JobRun run = jobManager.get(jobId);
+ if (run == null) {
+ return;
+ }
ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
run.getExecutor().notifyTaskFailure(ta, exceptions);
}