[NO ISSUE][ACTIVE] Account for force stop while suspending
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- When a failure happens while trying to suspend ingestion,
we will force stop the active job. If the job completes
ungracefully, we set the listener state to TEMPORARILY_FAILED.
However, since force to stop only waits for STOPPED state,
the thread waiting for ingestion to be suspended will wait
forever. This change accounts for such case and makes
the force stop waits for TEMPORARILY_FAILED too.
Change-Id: Ib33f191be2b84d97a08e3bc6d607b0edbf35bed1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13144
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 0242ecd..ddd3d64 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -195,7 +195,7 @@
@SuppressWarnings("unchecked")
protected void finish(ActiveEvent event) throws HyracksDataException {
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "the job " + jobId + " finished");
+ LOGGER.log(level, "the job {} finished", jobId);
}
JobId lastJobId = jobId;
if (numRegistered != numDeRegistered) {
@@ -208,7 +208,7 @@
JobStatus jobStatus = status.getLeft();
List<Exception> exceptions = status.getRight();
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "The job finished with status: " + jobStatus);
+ LOGGER.log(level, "The job finished with status: {}", jobStatus);
}
if (!jobSuccessfullyTerminated(jobStatus)) {
jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
@@ -440,8 +440,9 @@
private void cancelJob(Throwable th) {
cancelJobSafely(metadataProvider, th);
+ // we can come here due to a failure while in suspending state
final WaitForStateSubscriber cancelSubscriber =
- new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED));
+ new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED, ActivityState.TEMPORARILY_FAILED));
final Span span = Span.start(2, TimeUnit.MINUTES);
InvokeUtil.doUninterruptibly(() -> {
if (!cancelSubscriber.sync(span)) {
@@ -491,6 +492,7 @@
forceStop(subscriber, ie);
Thread.currentThread().interrupt();
} catch (Throwable e) {
+ LOGGER.error("forcing active job stop due to", e);
forceStop(subscriber, e);
} finally {
Thread.currentThread().setName(nameBefore);