[NO ISSUE][RT] Fail Fast On Request Cancellation After Job Completion
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- If a request is cancelled after its job completion but before
the result written is returned to the caller, fail the request
with request cancelled exception.
- Freeze request elapsed time after the request completes to
make sure archived requests return consistent elapsed time.
- Ensure requests that might be cancelled and completed at the same
time are archived only once.
Change-Id: If4f154f11305e2f8286e0707b5b3adec905633a4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3321
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index b7ef4e6..51971a5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -33,6 +33,7 @@
private boolean complete;
private final IRequestReference requestReference;
private boolean cancellable = false;
+ private volatile long completionTime = -1;
protected volatile State state = State.RECEIVED;
public BaseClientRequest(IRequestReference requestReference) {
@@ -46,6 +47,7 @@
}
complete = true;
state = State.COMPLETED;
+ completionTime = System.currentTimeMillis();
}
@Override
@@ -66,6 +68,11 @@
}
@Override
+ public synchronized boolean isCancelled() {
+ return state == State.CANCELLED;
+ }
+
+ @Override
public String getId() {
// the uuid is generated by the node which received the request
// so there is a chance this might not be unique now
@@ -102,7 +109,8 @@
private String getElapsedTime() {
// this is just an estimation as the request might have been received on a node with a different system time
// TODO add dynamic time unit
- return System.currentTimeMillis() - requestReference.getTime() + "ms";
+ long runningTime = completionTime > 0 ? completionTime : System.currentTimeMillis();
+ return runningTime - requestReference.getTime() + "ms";
}
protected abstract void doCancel(ICcApplicationContext appCtx) throws HyracksDataException;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 0996c6c..42b573f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2623,6 +2623,7 @@
hcc.waitForCompletion(jobId);
} else {
hcc.waitForCompletion(jobId);
+ ensureNotCancelled(clientRequest);
printer.print(jobId);
}
} catch (Exception e) {
@@ -2997,4 +2998,10 @@
throws Exception {
validateIfResourceIsActiveInFeed(metadataProvider.getApplicationContext(), dataset, sourceLoc);
}
+
+ private static void ensureNotCancelled(ClientRequest clientRequest) throws RuntimeDataException {
+ if (clientRequest.isCancelled()) {
+ throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId());
+ }
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 515c837..921fb64 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -42,6 +42,13 @@
}
/**
+ * A flag indicating if the request has been cancelled
+ *
+ * @return true if the request was cancelled, otherwise false
+ */
+ boolean isCancelled();
+
+ /**
* A system wide unique id representing this {@link IClientRequest}
*
* @return the system request id
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index a8749b1..333c709 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -99,12 +99,14 @@
}
private void untrack(IClientRequest request) {
- runningRequests.remove(request.getId());
- final String clientContextId = request.getClientContextId();
- if (clientContextId != null) {
- clientIdRequests.remove(request.getClientContextId());
+ final IClientRequest completedRequest = runningRequests.remove(request.getId());
+ if (completedRequest != null) {
+ final String clientContextId = completedRequest.getClientContextId();
+ if (clientContextId != null) {
+ clientIdRequests.remove(completedRequest.getClientContextId());
+ }
+ archive(completedRequest);
}
- archive(request);
}
private synchronized void archive(IClientRequest request) {