[NO ISSUE][MISC] Introduce IClientRequest
- user model changes: no
- storage format changes: no
- interface changes: yes
+ IClientRequest: used to represent a client request
that can be cancelled.
Details:
- Introduce IClientRequest to allow for multiple types of requests
to be cancellable.
Change-Id: I8f65da1744ea7ecf26ea3f8a576ebaf4472ccd62
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2774
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
index 15267aa..a37f802 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
@@ -22,6 +22,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -45,10 +46,11 @@
* @param requestParameters
* @param metadataProvider
* @param resultSetId
+ * @param executorCtx
* @throws HyracksDataException
* @throws AlgebricksException
*/
public abstract void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
- IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
- throws HyracksDataException, AlgebricksException;
+ IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId,
+ IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException;
}
\ No newline at end of file
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
new file mode 100644
index 0000000..a9bd856
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class BaseClientRequest implements IClientRequest {
+ protected final IStatementExecutorContext ctx;
+ protected final String contextId;
+ private boolean complete;
+
+ public BaseClientRequest(IStatementExecutorContext ctx, String contextId) {
+ this.ctx = ctx;
+ this.contextId = contextId;
+ }
+
+ @Override
+ public synchronized void complete() {
+ if (complete) {
+ return;
+ }
+ complete = true;
+ ctx.remove(contextId);
+ }
+
+ @Override
+ public synchronized void cancel(ICcApplicationContext appCtx) throws HyracksDataException {
+ if (complete) {
+ return;
+ }
+ complete();
+ doCancel(appCtx);
+ }
+
+ protected abstract void doCancel(ICcApplicationContext appCtx) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
new file mode 100644
index 0000000..520ce03
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+
+public class ClientJobRequest extends BaseClientRequest {
+ private final JobId jobId;
+
+ public ClientJobRequest(IStatementExecutorContext ctx, String clientCtxId, JobId jobId) {
+ super(ctx, clientCtxId);
+ this.jobId = jobId;
+ }
+
+ @Override
+ protected void doCancel(ICcApplicationContext appCtx) throws HyracksDataException {
+ IHyracksClientConnection hcc = appCtx.getHcc();
+ try {
+ hcc.cancelJob(jobId);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ ctx.remove(contextId);
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
index 81e1ebf..78080f3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
@@ -19,32 +19,31 @@
package org.apache.asterix.translator;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.asterix.common.api.IClientRequest;
/**
- * The context for statement executors, which maintains the meta information of all queries.
- * TODO(yingyi): also maintain the mapping from server generated request ids to jobs.
+ * The context for statement executors. Maintains ongoing user requests.
*/
public interface IStatementExecutorContext {
/**
- * Gets the Hyracks JobId from the user-provided client context id.
+ * Gets the client request from the user-provided client context id.
*
* @param clientContextId,
* a user provided client context id.
- * @return the Hyracks job id of class {@link org.apache.hyracks.api.job.JobId}.
+ * @return the client request
*/
- JobId getJobIdFromClientContextId(String clientContextId);
+ IClientRequest get(String clientContextId);
/**
- * Puts a client context id for a statement and the corresponding Hyracks job id.
+ * Puts a client context id for a statement and the corresponding request.
*
* @param clientContextId,
* a user provided client context id.
- * @param jobId,
+ * @param req,
* the Hyracks job id of class {@link org.apache.hyracks.api.job.JobId}.
*/
- void put(String clientContextId, JobId jobId);
+ void put(String clientContextId, IClientRequest req);
/**
* Removes the information about the query corresponding to a user-provided client context id.
@@ -52,5 +51,5 @@
* @param clientContextId,
* a user provided client context id.
*/
- JobId removeJobIdFromClientContextId(String clientContextId);
+ IClientRequest remove(String clientContextId);
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
index ec181bb..c4e2859 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.translator;
-import org.apache.hyracks.api.job.JobId;
+import org.apache.asterix.common.api.IClientRequest;
public class NoOpStatementExecutorContext implements IStatementExecutorContext {
@@ -28,17 +28,17 @@
}
@Override
- public JobId getJobIdFromClientContextId(String clientContextId) {
+ public IClientRequest get(String clientContextId) {
return null;
}
@Override
- public void put(String clientContextId, JobId jobId) {
+ public void put(String clientContextId, IClientRequest req) {
// Dummy for when a statement doesn't support cancellation
}
@Override
- public JobId removeJobIdFromClientContextId(String clientContextId) {
+ public IClientRequest remove(String clientContextId) {
return null;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
index 7c06762..a4da189 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
@@ -16,31 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.asterix.api.http.ctx;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.asterix.common.api.IClientRequest;
import org.apache.asterix.translator.IStatementExecutorContext;
-import org.apache.hyracks.api.job.JobId;
public class StatementExecutorContext implements IStatementExecutorContext {
- private final Map<String, JobId> runningQueries = new ConcurrentHashMap<>();
+ private final Map<String, IClientRequest> runningQueries = new ConcurrentHashMap<>();
@Override
- public JobId getJobIdFromClientContextId(String clientContextId) {
+ public IClientRequest get(String clientContextId) {
return runningQueries.get(clientContextId);
}
@Override
- public void put(String clientContextId, JobId jobId) {
- runningQueries.put(clientContextId, jobId);
+ public void put(String clientContextId, IClientRequest req) {
+ runningQueries.put(clientContextId, req);
}
@Override
- public JobId removeJobIdFromClientContextId(String clientContextId) {
+ public IClientRequest remove(String clientContextId) {
return runningQueries.remove(clientContextId);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
similarity index 76%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
index a8b3aef..5f5692d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
@@ -22,9 +22,9 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.asterix.api.http.server.QueryServiceServlet.Parameter;
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.translator.IStatementExecutorContext;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
@@ -37,11 +37,14 @@
/**
* The servlet provides a REST API for cancelling an on-going query.
*/
-public class QueryCancellationServlet extends AbstractServlet {
+public class CcQueryCancellationServlet extends AbstractServlet {
private static final Logger LOGGER = LogManager.getLogger();
+ private final ICcApplicationContext appCtx;
- public QueryCancellationServlet(ConcurrentMap<String, Object> ctx, String... paths) {
+ public CcQueryCancellationServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx,
+ String... paths) {
super(ctx, paths);
+ this.appCtx = appCtx;
}
@Override
@@ -51,23 +54,17 @@
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
-
- // Retrieves the corresponding Hyracks job id.
- IStatementExecutorContext runningQueries =
+ IStatementExecutorContext executorCtx =
(IStatementExecutorContext) ctx.get(ServletConstants.RUNNING_QUERIES_ATTR);
- IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(ServletConstants.HYRACKS_CONNECTION_ATTR);
- JobId jobId = runningQueries.getJobIdFromClientContextId(clientContextId);
-
- if (jobId == null) {
+ IClientRequest req = executorCtx.get(clientContextId);
+ if (req == null) {
// response: NOT FOUND
response.setStatus(HttpResponseStatus.NOT_FOUND);
return;
}
try {
// Cancels the on-going job.
- hcc.cancelJob(jobId);
- // Removes the cancelled query from the map activeQueries.
- runningQueries.removeJobIdFromClientContextId(clientContextId);
+ req.cancel(appCtx);
// response: OK
response.setStatus(HttpResponseStatus.OK);
} catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
index 5ad451f..60806d3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
@@ -32,6 +32,7 @@
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -41,7 +42,7 @@
/**
* The servlet provides a REST API on an NC for cancelling an on-going query.
*/
-public class NCQueryCancellationServlet extends QueryCancellationServlet {
+public class NCQueryCancellationServlet extends AbstractServlet {
private static final Logger LOGGER = LogManager.getLogger();
private final INCServiceContext serviceCtx;
private final INCMessageBroker messageBroker;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index ce6185e..316646d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -53,9 +53,9 @@
import org.apache.asterix.translator.ExecutionPlansJsonPrintUtil;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.ResultProperties;
import org.apache.asterix.translator.SessionConfig;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
index a1c87d8..943aad3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -18,15 +18,14 @@
*/
package org.apache.asterix.app.message;
+import org.apache.asterix.common.api.IClientRequest;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.utils.RequestStatus;
import org.apache.asterix.hyracks.bootstrap.CCApplication;
import org.apache.asterix.messaging.CCMessageBroker;
import org.apache.asterix.translator.IStatementExecutorContext;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -51,17 +50,16 @@
ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
CCApplication application = (CCApplication) ccs.getApplication();
IStatementExecutorContext executorsCtx = application.getStatementExecutorContext();
- JobId jobId = executorsCtx.getJobIdFromClientContextId(contextId);
+ IClientRequest req = executorsCtx.get(contextId);
RequestStatus status;
- if (jobId == null) {
+ if (req == null) {
LOGGER.log(Level.WARN, "No job found for context id " + contextId);
status = RequestStatus.NOT_FOUND;
} else {
try {
- IHyracksClientConnection hcc = application.getHcc();
- hcc.cancelJob(jobId);
- executorsCtx.removeJobIdFromClientContextId(contextId);
+ req.cancel(appCtx);
+ executorsCtx.remove(contextId);
status = RequestStatus.SUCCESS;
} catch (Exception e) {
LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 469a2dd..0d884ba 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -152,6 +152,7 @@
import org.apache.asterix.om.types.TypeSignature;
import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
import org.apache.asterix.translator.AbstractLangTranslator;
+import org.apache.asterix.translator.ClientJobRequest;
import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
@@ -408,7 +409,7 @@
break;
case EXTENSION:
((ExtensionStatement) stmt).handle(hcc, this, requestParameters, metadataProvider,
- resultSetIdCounter);
+ resultSetIdCounter, ctx);
break;
default:
throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, stmt.getSourceLocation(),
@@ -2601,6 +2602,7 @@
private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
String clientContextId, IStatementExecutorContext ctx) throws Exception {
+ ClientJobRequest req = null;
locker.lock();
try {
final JobSpecification jobSpec = compiler.compile();
@@ -2609,7 +2611,8 @@
}
final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
if (ctx != null && clientContextId != null) {
- ctx.put(clientContextId, jobId); // Adds the running job into the context.
+ req = new ClientJobRequest(ctx, clientContextId, jobId);
+ ctx.put(clientContextId, req); // Adds the running job into the context.
}
if (jId != null) {
jId.setValue(jobId);
@@ -2624,8 +2627,8 @@
} finally {
locker.unlock();
// No matter the job succeeds or fails, removes it into the context.
- if (ctx != null && clientContextId != null) {
- ctx.removeJobIdFromClientContextId(clientContextId);
+ if (req != null) {
+ req.complete();
}
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index b180ef0..f4d24e2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -36,12 +36,12 @@
import org.apache.asterix.api.http.ctx.StatementExecutorContext;
import org.apache.asterix.api.http.server.ActiveStatsApiServlet;
import org.apache.asterix.api.http.server.ApiServlet;
+import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
import org.apache.asterix.api.http.server.ClusterApiServlet;
import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
import org.apache.asterix.api.http.server.ConnectorApiServlet;
import org.apache.asterix.api.http.server.DiagnosticsApiServlet;
import org.apache.asterix.api.http.server.NodeControllerDetailsApiServlet;
-import org.apache.asterix.api.http.server.QueryCancellationServlet;
import org.apache.asterix.api.http.server.QueryResultApiServlet;
import org.apache.asterix.api.http.server.QueryServiceServlet;
import org.apache.asterix.api.http.server.QueryStatusApiServlet;
@@ -271,7 +271,7 @@
protected IServlet createServlet(ConcurrentMap<String, Object> ctx, String key, String... paths) {
switch (key) {
case Servlets.RUNNING_REQUESTS:
- return new QueryCancellationServlet(ctx, paths);
+ return new CcQueryCancellationServlet(ctx, appCtx, paths);
case Servlets.QUERY_STATUS:
return new QueryStatusApiServlet(ctx, appCtx, paths);
case Servlets.QUERY_RESULT:
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index d5262cf..eae82af 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -28,8 +28,10 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.asterix.api.http.ctx.StatementExecutorContext;
-import org.apache.asterix.api.http.server.QueryCancellationServlet;
+import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
import org.apache.asterix.api.http.server.ServletConstants;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.translator.ClientJobRequest;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.JobId;
@@ -46,16 +48,17 @@
@Test
public void testDelete() throws Exception {
+ ICcApplicationContext appCtx = mock(ICcApplicationContext.class);
// Creates a query cancellation servlet.
- QueryCancellationServlet cancellationServlet =
- new QueryCancellationServlet(new ConcurrentHashMap<>(), new String[] { "/" });
+ CcQueryCancellationServlet cancellationServlet =
+ new CcQueryCancellationServlet(new ConcurrentHashMap<>(), appCtx, new String[] { "/" });
// Adds mocked Hyracks client connection into the servlet context.
IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class);
cancellationServlet.ctx().put(ServletConstants.HYRACKS_CONNECTION_ATTR, mockHcc);
// Adds a query context into the servlet context.
IStatementExecutorContext queryCtx = new StatementExecutorContext();
cancellationServlet.ctx().put(ServletConstants.RUNNING_QUERIES_ATTR, queryCtx);
-
+ Mockito.when(appCtx.getHcc()).thenReturn(mockHcc);
// Tests the case that query is not in the map.
IServletRequest mockRequest = mockRequest("1");
IServletResponse mockResponse = mock(IServletResponse.class);
@@ -63,7 +66,7 @@
verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND);
// Tests the case that query is in the map.
- queryCtx.put("1", new JobId(1));
+ queryCtx.put("1", new ClientJobRequest(queryCtx, "1", new JobId(1)));
cancellationServlet.handle(mockRequest, mockResponse);
verify(mockResponse, times(1)).setStatus(HttpResponseStatus.OK);
@@ -73,7 +76,7 @@
verify(mockResponse, times(1)).setStatus(HttpResponseStatus.BAD_REQUEST);
// Tests the case that the job cancellation hit some exception from Hyracks.
- queryCtx.put("2", new JobId(2));
+ queryCtx.put("2", new ClientJobRequest(queryCtx, "2", new JobId(2)));
Mockito.doThrow(new Exception()).when(mockHcc).cancelJob(any());
mockRequest = mockRequest("2");
cancellationServlet.handle(mockRequest, mockResponse);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
new file mode 100644
index 0000000..30759de
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IClientRequest {
+
+ /**
+ * Mark the request as complete, non-cancellable anymore
+ */
+ void complete();
+
+ /**
+ * Cancel a request
+ *
+ * @param appCtx
+ * @throws HyracksDataException
+ */
+ void cancel(ICcApplicationContext appCtx) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 7f8d31d..f2be6cd 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1545,6 +1545,7 @@
}
}
+ @Override
public AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
throws AlgebricksException {