[NO ISSUE][HYR] Ensure IJobLifecycleListener is notified on cancelled queued jobs
Change-Id: I7e26c9d1015725f895876f5873eccd3f86b17653
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2624
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index 7aa84e2..210779e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -40,6 +40,17 @@
return new HyracksException(cause);
}
+ public static HyracksException wrapOrThrowUnchecked(Throwable cause) {
+ if (cause instanceof Error) {
+ throw (Error) cause;
+ } else if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ } else if (cause instanceof HyracksException) {
+ return (HyracksException) cause;
+ }
+ return new HyracksException(cause);
+ }
+
public static HyracksException create(int code, Serializable... params) {
return new HyracksException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), params);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 9fd1a02..3ba25f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -31,12 +31,14 @@
import java.util.Set;
import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
import org.apache.hyracks.control.cc.application.CCServiceContext;
@@ -65,8 +67,7 @@
private final IJobCapacityController jobCapacityController;
private IJobQueue jobQueue;
- public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController)
- throws HyracksException {
+ public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController) {
this.ccs = ccs;
this.jobCapacityController = jobCapacityController;
try {
@@ -116,6 +117,8 @@
case EXECUTE:
executeJob(jobRun);
break;
+ default:
+ throw new IllegalStateException("unknown submission status: " + status);
}
}
@@ -136,9 +139,18 @@
List<Exception> exceptions =
Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId));
// Since the job has not been executed, we only need to update its status and lifecyle here.
- jobRun.setStatus(JobStatus.FAILURE, exceptions);
+ jobRun.setStatus(JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
runMapArchive.put(jobId, jobRun);
runMapHistory.put(jobId, exceptions);
+ CCServiceContext serviceCtx = ccs.getContext();
+ if (serviceCtx != null) {
+ try {
+ serviceCtx.notifyJobFinish(jobId, JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
+ } catch (Exception e) {
+ LOGGER.error("Exception notifying cancel on pending job {}", jobId, e);
+ throw HyracksDataException.create(e);
+ }
+ }
}
callback.setValue(null);
}
@@ -152,14 +164,12 @@
finalComplete(run);
return;
}
- JobId jobId = run.getJobId();
- HyracksException caughtException = null;
if (run.getPendingStatus() != null && run.getCleanupPendingNodeIds().isEmpty()) {
finalComplete(run);
return;
}
if (run.getPendingStatus() != null) {
- LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: " + jobId);
+ LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: {}", run::getJobId);
return;
}
Set<String> targetNodes = run.getParticipatingNodeIds();
@@ -168,38 +178,40 @@
run.setPendingStatus(status, exceptions);
}
- if (targetNodes != null && !targetNodes.isEmpty()) {
- INodeManager nodeManager = ccs.getNodeManager();
- Set<String> toDelete = new HashSet<>();
- for (String n : targetNodes) {
- NodeControllerState ncs = nodeManager.getNodeControllerState(n);
- try {
- if (ncs == null) {
- toDelete.add(n);
- } else {
- ncs.getNodeController().cleanUpJoblet(jobId, status);
- }
- } catch (Exception e) {
- LOGGER.log(Level.ERROR, e.getMessage(), e);
- if (caughtException == null) {
- caughtException = HyracksException.create(e);
- } else {
- caughtException.addSuppressed(e);
- }
- }
- }
- targetNodes.removeAll(toDelete);
- run.getCleanupPendingNodeIds().removeAll(toDelete);
- if (run.getCleanupPendingNodeIds().isEmpty()) {
- finalComplete(run);
- }
+ if (!targetNodes.isEmpty()) {
+ cleanupJobOnNodes(run, status, targetNodes);
} else {
finalComplete(run);
}
+ }
+
+ private void cleanupJobOnNodes(JobRun run, JobStatus status, Set<String> targetNodes) throws HyracksException {
+ Throwable caughtException = null;
+ JobId jobId = run.getJobId();
+ INodeManager nodeManager = ccs.getNodeManager();
+ Set<String> toDelete = new HashSet<>();
+ for (String n : targetNodes) {
+ NodeControllerState ncs = nodeManager.getNodeControllerState(n);
+ if (ncs == null) {
+ toDelete.add(n);
+ } else {
+ try {
+ ncs.getNodeController().cleanUpJoblet(jobId, status);
+ } catch (Exception e) {
+ LOGGER.error("Exception cleaning up joblet {} on node {}", jobId, n, e);
+ caughtException = ExceptionUtils.suppress(caughtException, e);
+ }
+ }
+ }
+ targetNodes.removeAll(toDelete);
+ run.getCleanupPendingNodeIds().removeAll(toDelete);
+ if (run.getCleanupPendingNodeIds().isEmpty()) {
+ finalComplete(run);
+ }
// throws caught exceptions if any
if (caughtException != null) {
- throw caughtException;
+ throw HyracksException.wrapOrThrowUnchecked(caughtException);
}
}
@@ -207,13 +219,13 @@
public void finalComplete(JobRun run) throws HyracksException {
checkJob(run);
JobId jobId = run.getJobId();
- HyracksException caughtException = null;
+ Throwable caughtException = null;
CCServiceContext serviceCtx = ccs.getContext();
if (serviceCtx != null) {
try {
serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), run.getPendingExceptions());
- } catch (HyracksException e) {
- LOGGER.log(Level.ERROR, e.getMessage(), e);
+ } catch (Exception e) {
+ LOGGER.error("Exception notifying job finish {}", jobId, e);
caughtException = e;
}
}
@@ -224,18 +236,14 @@
runMapHistory.put(jobId, run.getExceptions());
if (run.getActivityClusterGraph().isReportTaskDetails()) {
- /**
+ /*
* log job details when profiling is enabled
*/
try {
ccs.getJobLogFile().log(createJobLogObject(run));
} catch (Exception e) {
- LOGGER.log(Level.ERROR, e.getMessage(), e);
- if (caughtException == null) {
- caughtException = HyracksException.create(e);
- } else {
- caughtException.addSuppressed(e);
- }
+ LOGGER.error("Exception reporting task details for job {}", jobId, e);
+ caughtException = ExceptionUtils.suppress(caughtException, e);
}
}
@@ -248,7 +256,7 @@
// throws caught exceptions if any
if (caughtException != null) {
- throw caughtException;
+ throw HyracksException.wrapOrThrowUnchecked(caughtException);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index e4699c7..5b98260 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -209,17 +209,12 @@
}
public void registerOperatorLocation(OperatorDescriptorId op, int partition, String location) {
- Map<Integer, String> locations = operatorLocations.get(op);
- if (locations == null) {
- locations = new HashMap<Integer, String>();
- operatorLocations.put(op, locations);
- }
- locations.put(partition, location);
+ operatorLocations.computeIfAbsent(op, k -> new HashMap<>()).put(partition, location);
}
@Override
public synchronized void waitForCompletion() throws Exception {
- while (status != JobStatus.TERMINATED && status != JobStatus.FAILURE) {
+ while (status == JobStatus.PENDING || status == JobStatus.RUNNING) {
wait();
}
if (exceptions != null && !exceptions.isEmpty()) {