[NO ISSUE][API] Allow Request Cancellation By Request UUID
- user model changes: yes
- storage format changes: no
- interface changes: no
Details:
- Accept UUID as a parameter in query cancellation API.
- Pass request UUID to CC in cancellation requests to
allow cancellation of requests that do not have
clientContextId.
Change-Id: Ic811b2107b71e50ca9c216db39755467c4320fd1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3225
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
index d260a0b..8d435c4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
@@ -38,6 +38,8 @@
* The servlet provides a REST API for cancelling an on-going query.
*/
public class CcQueryCancellationServlet extends AbstractServlet {
+
+ public static final String REQUEST_UUID_PARAM_NAME = "uuid";
private static final Logger LOGGER = LogManager.getLogger();
private final ICcApplicationContext appCtx;
@@ -49,15 +51,15 @@
@Override
protected void delete(IServletRequest request, IServletResponse response) throws IOException {
- String clientContextId = request.getParameter(Parameter.CLIENT_ID.str());
- if (clientContextId == null) {
+ String uuid = request.getParameter(REQUEST_UUID_PARAM_NAME);
+ String clientCtxId = request.getParameter(Parameter.CLIENT_ID.str());
+ if (uuid == null && clientCtxId == null) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
final IRequestTracker requestTracker = appCtx.getRequestTracker();
- final IClientRequest req = requestTracker.getByClientContextId(clientContextId);
+ IClientRequest req = uuid != null ? requestTracker.get(uuid) : requestTracker.getByClientContextId(clientCtxId);
if (req == null) {
- // response: NOT FOUND
response.setStatus(HttpResponseStatus.NOT_FOUND);
return;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
index 60806d3..8a4ac5c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.api.http.server;
+import static org.apache.asterix.api.http.server.CcQueryCancellationServlet.REQUEST_UUID_PARAM_NAME;
import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
import java.util.concurrent.ConcurrentMap;
@@ -55,16 +56,16 @@
@Override
protected void delete(IServletRequest request, IServletResponse response) {
- // gets the parameter client_context_id from the request.
+ String uuid = request.getParameter(REQUEST_UUID_PARAM_NAME);
String clientContextId = request.getParameter(Parameter.CLIENT_ID.str());
- if (clientContextId == null) {
+ if (uuid == null && clientContextId == null) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
final MessageFuture cancelQueryFuture = messageBroker.registerMessageFuture();
try {
- CancelQueryRequest cancelQueryMessage =
- new CancelQueryRequest(serviceCtx.getNodeId(), cancelQueryFuture.getFutureId(), clientContextId);
+ CancelQueryRequest cancelQueryMessage = new CancelQueryRequest(serviceCtx.getNodeId(),
+ cancelQueryFuture.getFutureId(), uuid, clientContextId);
// TODO(mblow): multicc -- need to send cancellation to the correct cc
messageBroker.sendMessageToPrimaryCC(cancelQueryMessage);
CancelQueryResponse cancelResponse =
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index d9b69cb..5a6bdd7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -94,13 +94,13 @@
try {
responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- cancelQuery(ncMb, ncCtx.getNodeId(), param.getClientContextID(), e, false);
+ cancelQuery(ncMb, ncCtx.getNodeId(), requestReference.getUuid(), param.getClientContextID(), e, false);
throw e;
} catch (TimeoutException exception) {
RuntimeDataException hde = new RuntimeDataException(ErrorCode.REQUEST_TIMEOUT);
hde.addSuppressed(exception);
// cancel query
- cancelQuery(ncMb, ncCtx.getNodeId(), param.getClientContextID(), hde, true);
+ cancelQuery(ncMb, ncCtx.getNodeId(), requestReference.getUuid(), param.getClientContextID(), hde, true);
throw hde;
}
execution.end();
@@ -134,15 +134,15 @@
printExecutionPlans(sessionOutput, responseMsg.getExecutionPlans());
}
- private void cancelQuery(INCMessageBroker messageBroker, String nodeId, String clientContextID, Exception exception,
- boolean wait) {
- if (clientContextID == null) {
+ private void cancelQuery(INCMessageBroker messageBroker, String nodeId, String uuid, String clientContextID,
+ Exception exception, boolean wait) {
+ if (uuid == null && clientContextID == null) {
return;
}
MessageFuture cancelQueryFuture = messageBroker.registerMessageFuture();
try {
CancelQueryRequest cancelQueryMessage =
- new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), clientContextID);
+ new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), uuid, clientContextID);
// TODO(mblow): multicc -- need to send cancellation to the correct cc
LOGGER.info("Cancelling query due to {}", exception.getClass().getSimpleName());
messageBroker.sendMessageToPrimaryCC(cancelQueryMessage);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
index e4cef7f..6154faa 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -35,22 +35,24 @@
private static final long serialVersionUID = 1L;
private final String nodeId;
private final long reqId;
+ private final String uuid;
private final String contextId;
- public CancelQueryRequest(String nodeId, long reqId, String contextId) {
+ public CancelQueryRequest(String nodeId, long reqId, String uuid, String contextId) {
this.nodeId = nodeId;
this.reqId = reqId;
+ this.uuid = uuid;
this.contextId = contextId;
}
@Override
public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
final IRequestTracker requestTracker = appCtx.getRequestTracker();
- IClientRequest req = requestTracker.getByClientContextId(contextId);
+ IClientRequest req = uuid != null ? requestTracker.get(uuid) : requestTracker.getByClientContextId(contextId);
RequestStatus status;
if (req == null) {
- LOGGER.log(Level.WARN, "No job found for context id " + contextId);
+ LOGGER.log(Level.INFO, "No request found for uuid {} or context id {}", uuid, contextId);
status = RequestStatus.NOT_FOUND;
} else {
if (!req.isCancellable()) {