[NO ISSUE][MISC] Introduce IClientRequest

- user model changes: no
- storage format changes: no
- interface changes: yes
  + IClientRequest: used to represent a client request
    that can be cancelled.

Details:
- Introduce IClientRequest to allow for multiple types of requests
  to be cancellable.

Change-Id: I8f65da1744ea7ecf26ea3f8a576ebaf4472ccd62
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2774
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
index 15267aa..a37f802 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
@@ -22,6 +22,7 @@
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -45,10 +46,11 @@
      * @param requestParameters
      * @param metadataProvider
      * @param resultSetId
+     * @param executorCtx
      * @throws HyracksDataException
      * @throws AlgebricksException
      */
     public abstract void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor,
-            IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId)
-            throws HyracksDataException, AlgebricksException;
+            IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId,
+            IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException;
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
new file mode 100644
index 0000000..a9bd856
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class BaseClientRequest implements IClientRequest {
+    protected final IStatementExecutorContext ctx;
+    protected final String contextId;
+    private boolean complete;
+
+    public BaseClientRequest(IStatementExecutorContext ctx, String contextId) {
+        this.ctx = ctx;
+        this.contextId = contextId;
+    }
+
+    @Override
+    public synchronized void complete() {
+        if (complete) {
+            return;
+        }
+        complete = true;
+        ctx.remove(contextId);
+    }
+
+    @Override
+    public synchronized void cancel(ICcApplicationContext appCtx) throws HyracksDataException {
+        if (complete) {
+            return;
+        }
+        complete();
+        doCancel(appCtx);
+    }
+
+    protected abstract void doCancel(ICcApplicationContext appCtx) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
new file mode 100644
index 0000000..520ce03
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+
+public class ClientJobRequest extends BaseClientRequest {
+    private final JobId jobId;
+
+    public ClientJobRequest(IStatementExecutorContext ctx, String clientCtxId, JobId jobId) {
+        super(ctx, clientCtxId);
+        this.jobId = jobId;
+    }
+
+    @Override
+    protected void doCancel(ICcApplicationContext appCtx) throws HyracksDataException {
+        IHyracksClientConnection hcc = appCtx.getHcc();
+        try {
+            hcc.cancelJob(jobId);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+        ctx.remove(contextId);
+    }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
index 81e1ebf..78080f3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
@@ -19,32 +19,31 @@
 
 package org.apache.asterix.translator;
 
-import org.apache.hyracks.api.job.JobId;
+import org.apache.asterix.common.api.IClientRequest;
 
 /**
- * The context for statement executors, which maintains the meta information of all queries.
- * TODO(yingyi): also maintain the mapping from server generated request ids to jobs.
+ * The context for statement executors. Maintains ongoing user requests.
  */
 public interface IStatementExecutorContext {
 
     /**
-     * Gets the Hyracks JobId from the user-provided client context id.
+     * Gets the client request from the user-provided client context id.
      *
      * @param clientContextId,
      *            a user provided client context id.
-     * @return the Hyracks job id of class {@link org.apache.hyracks.api.job.JobId}.
+     * @return the client request
      */
-    JobId getJobIdFromClientContextId(String clientContextId);
+    IClientRequest get(String clientContextId);
 
     /**
-     * Puts a client context id for a statement and the corresponding Hyracks job id.
+     * Puts a client context id for a statement and the corresponding request.
      *
      * @param clientContextId,
      *            a user provided client context id.
-     * @param jobId,
+     * @param req,
      *            the Hyracks job id of class {@link org.apache.hyracks.api.job.JobId}.
      */
-    void put(String clientContextId, JobId jobId);
+    void put(String clientContextId, IClientRequest req);
 
     /**
      * Removes the information about the query corresponding to a user-provided client context id.
@@ -52,5 +51,5 @@
      * @param clientContextId,
      *            a user provided client context id.
      */
-    JobId removeJobIdFromClientContextId(String clientContextId);
+    IClientRequest remove(String clientContextId);
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
index ec181bb..c4e2859 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.translator;
 
-import org.apache.hyracks.api.job.JobId;
+import org.apache.asterix.common.api.IClientRequest;
 
 public class NoOpStatementExecutorContext implements IStatementExecutorContext {
 
@@ -28,17 +28,17 @@
     }
 
     @Override
-    public JobId getJobIdFromClientContextId(String clientContextId) {
+    public IClientRequest get(String clientContextId) {
         return null;
     }
 
     @Override
-    public void put(String clientContextId, JobId jobId) {
+    public void put(String clientContextId, IClientRequest req) {
         // Dummy for when a statement doesn't support cancellation
     }
 
     @Override
-    public JobId removeJobIdFromClientContextId(String clientContextId) {
+    public IClientRequest remove(String clientContextId) {
         return null;
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
index 7c06762..a4da189 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
@@ -16,31 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.asterix.api.http.ctx;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.translator.IStatementExecutorContext;
-import org.apache.hyracks.api.job.JobId;
 
 public class StatementExecutorContext implements IStatementExecutorContext {
 
-    private final Map<String, JobId> runningQueries = new ConcurrentHashMap<>();
+    private final Map<String, IClientRequest> runningQueries = new ConcurrentHashMap<>();
 
     @Override
-    public JobId getJobIdFromClientContextId(String clientContextId) {
+    public IClientRequest get(String clientContextId) {
         return runningQueries.get(clientContextId);
     }
 
     @Override
-    public void put(String clientContextId, JobId jobId) {
-        runningQueries.put(clientContextId, jobId);
+    public void put(String clientContextId, IClientRequest req) {
+        runningQueries.put(clientContextId, req);
     }
 
     @Override
-    public JobId removeJobIdFromClientContextId(String clientContextId) {
+    public IClientRequest remove(String clientContextId) {
         return runningQueries.remove(clientContextId);
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
similarity index 76%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
index a8b3aef..5f5692d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
@@ -22,9 +22,9 @@
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.asterix.api.http.server.QueryServiceServlet.Parameter;
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.translator.IStatementExecutorContext;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.AbstractServlet;
@@ -37,11 +37,14 @@
 /**
  * The servlet provides a REST API for cancelling an on-going query.
  */
-public class QueryCancellationServlet extends AbstractServlet {
+public class CcQueryCancellationServlet extends AbstractServlet {
     private static final Logger LOGGER = LogManager.getLogger();
+    private final ICcApplicationContext appCtx;
 
-    public QueryCancellationServlet(ConcurrentMap<String, Object> ctx, String... paths) {
+    public CcQueryCancellationServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx,
+            String... paths) {
         super(ctx, paths);
+        this.appCtx = appCtx;
     }
 
     @Override
@@ -51,23 +54,17 @@
             response.setStatus(HttpResponseStatus.BAD_REQUEST);
             return;
         }
-
-        // Retrieves the corresponding Hyracks job id.
-        IStatementExecutorContext runningQueries =
+        IStatementExecutorContext executorCtx =
                 (IStatementExecutorContext) ctx.get(ServletConstants.RUNNING_QUERIES_ATTR);
-        IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(ServletConstants.HYRACKS_CONNECTION_ATTR);
-        JobId jobId = runningQueries.getJobIdFromClientContextId(clientContextId);
-
-        if (jobId == null) {
+        IClientRequest req = executorCtx.get(clientContextId);
+        if (req == null) {
             // response: NOT FOUND
             response.setStatus(HttpResponseStatus.NOT_FOUND);
             return;
         }
         try {
             // Cancels the on-going job.
-            hcc.cancelJob(jobId);
-            // Removes the cancelled query from the map activeQueries.
-            runningQueries.removeJobIdFromClientContextId(clientContextId);
+            req.cancel(appCtx);
             // response: OK
             response.setStatus(HttpResponseStatus.OK);
         } catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
index 5ad451f..60806d3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -41,7 +42,7 @@
 /**
  * The servlet provides a REST API on an NC for cancelling an on-going query.
  */
-public class NCQueryCancellationServlet extends QueryCancellationServlet {
+public class NCQueryCancellationServlet extends AbstractServlet {
     private static final Logger LOGGER = LogManager.getLogger();
     private final INCServiceContext serviceCtx;
     private final INCMessageBroker messageBroker;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index ce6185e..316646d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -53,9 +53,9 @@
 import org.apache.asterix.translator.ExecutionPlansJsonPrintUtil;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.ResultProperties;
 import org.apache.asterix.translator.SessionConfig;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
index a1c87d8..943aad3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -18,15 +18,14 @@
  */
 package org.apache.asterix.app.message;
 
+import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.utils.RequestStatus;
 import org.apache.asterix.hyracks.bootstrap.CCApplication;
 import org.apache.asterix.messaging.CCMessageBroker;
 import org.apache.asterix.translator.IStatementExecutorContext;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -51,17 +50,16 @@
         ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
         CCApplication application = (CCApplication) ccs.getApplication();
         IStatementExecutorContext executorsCtx = application.getStatementExecutorContext();
-        JobId jobId = executorsCtx.getJobIdFromClientContextId(contextId);
+        IClientRequest req = executorsCtx.get(contextId);
         RequestStatus status;
 
-        if (jobId == null) {
+        if (req == null) {
             LOGGER.log(Level.WARN, "No job found for context id " + contextId);
             status = RequestStatus.NOT_FOUND;
         } else {
             try {
-                IHyracksClientConnection hcc = application.getHcc();
-                hcc.cancelJob(jobId);
-                executorsCtx.removeJobIdFromClientContextId(contextId);
+                req.cancel(appCtx);
+                executorsCtx.remove(contextId);
                 status = RequestStatus.SUCCESS;
             } catch (Exception e) {
                 LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 469a2dd..0d884ba 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -152,6 +152,7 @@
 import org.apache.asterix.om.types.TypeSignature;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
+import org.apache.asterix.translator.ClientJobRequest;
 import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
@@ -408,7 +409,7 @@
                         break;
                     case EXTENSION:
                         ((ExtensionStatement) stmt).handle(hcc, this, requestParameters, metadataProvider,
-                                resultSetIdCounter);
+                                resultSetIdCounter, ctx);
                         break;
                     default:
                         throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, stmt.getSourceLocation(),
@@ -2601,6 +2602,7 @@
     private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
             IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
             String clientContextId, IStatementExecutorContext ctx) throws Exception {
+        ClientJobRequest req = null;
         locker.lock();
         try {
             final JobSpecification jobSpec = compiler.compile();
@@ -2609,7 +2611,8 @@
             }
             final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             if (ctx != null && clientContextId != null) {
-                ctx.put(clientContextId, jobId); // Adds the running job into the context.
+                req = new ClientJobRequest(ctx, clientContextId, jobId);
+                ctx.put(clientContextId, req); // Adds the running job into the context.
             }
             if (jId != null) {
                 jId.setValue(jobId);
@@ -2624,8 +2627,8 @@
         } finally {
             locker.unlock();
             // No matter the job succeeds or fails, removes it into the context.
-            if (ctx != null && clientContextId != null) {
-                ctx.removeJobIdFromClientContextId(clientContextId);
+            if (req != null) {
+                req.complete();
             }
         }
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index b180ef0..f4d24e2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -36,12 +36,12 @@
 import org.apache.asterix.api.http.ctx.StatementExecutorContext;
 import org.apache.asterix.api.http.server.ActiveStatsApiServlet;
 import org.apache.asterix.api.http.server.ApiServlet;
+import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
 import org.apache.asterix.api.http.server.ClusterApiServlet;
 import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
 import org.apache.asterix.api.http.server.ConnectorApiServlet;
 import org.apache.asterix.api.http.server.DiagnosticsApiServlet;
 import org.apache.asterix.api.http.server.NodeControllerDetailsApiServlet;
-import org.apache.asterix.api.http.server.QueryCancellationServlet;
 import org.apache.asterix.api.http.server.QueryResultApiServlet;
 import org.apache.asterix.api.http.server.QueryServiceServlet;
 import org.apache.asterix.api.http.server.QueryStatusApiServlet;
@@ -271,7 +271,7 @@
     protected IServlet createServlet(ConcurrentMap<String, Object> ctx, String key, String... paths) {
         switch (key) {
             case Servlets.RUNNING_REQUESTS:
-                return new QueryCancellationServlet(ctx, paths);
+                return new CcQueryCancellationServlet(ctx, appCtx, paths);
             case Servlets.QUERY_STATUS:
                 return new QueryStatusApiServlet(ctx, appCtx, paths);
             case Servlets.QUERY_RESULT:
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index d5262cf..eae82af 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -28,8 +28,10 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.api.http.ctx.StatementExecutorContext;
-import org.apache.asterix.api.http.server.QueryCancellationServlet;
+import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
 import org.apache.asterix.api.http.server.ServletConstants;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.translator.ClientJobRequest;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.JobId;
@@ -46,16 +48,17 @@
 
     @Test
     public void testDelete() throws Exception {
+        ICcApplicationContext appCtx = mock(ICcApplicationContext.class);
         // Creates a query cancellation servlet.
-        QueryCancellationServlet cancellationServlet =
-                new QueryCancellationServlet(new ConcurrentHashMap<>(), new String[] { "/" });
+        CcQueryCancellationServlet cancellationServlet =
+                new CcQueryCancellationServlet(new ConcurrentHashMap<>(), appCtx, new String[] { "/" });
         // Adds mocked Hyracks client connection into the servlet context.
         IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class);
         cancellationServlet.ctx().put(ServletConstants.HYRACKS_CONNECTION_ATTR, mockHcc);
         // Adds a query context into the servlet context.
         IStatementExecutorContext queryCtx = new StatementExecutorContext();
         cancellationServlet.ctx().put(ServletConstants.RUNNING_QUERIES_ATTR, queryCtx);
-
+        Mockito.when(appCtx.getHcc()).thenReturn(mockHcc);
         // Tests the case that query is not in the map.
         IServletRequest mockRequest = mockRequest("1");
         IServletResponse mockResponse = mock(IServletResponse.class);
@@ -63,7 +66,7 @@
         verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND);
 
         // Tests the case that query is in the map.
-        queryCtx.put("1", new JobId(1));
+        queryCtx.put("1", new ClientJobRequest(queryCtx, "1", new JobId(1)));
         cancellationServlet.handle(mockRequest, mockResponse);
         verify(mockResponse, times(1)).setStatus(HttpResponseStatus.OK);
 
@@ -73,7 +76,7 @@
         verify(mockResponse, times(1)).setStatus(HttpResponseStatus.BAD_REQUEST);
 
         // Tests the case that the job cancellation hit some exception from Hyracks.
-        queryCtx.put("2", new JobId(2));
+        queryCtx.put("2", new ClientJobRequest(queryCtx, "2", new JobId(2)));
         Mockito.doThrow(new Exception()).when(mockHcc).cancelJob(any());
         mockRequest = mockRequest("2");
         cancellationServlet.handle(mockRequest, mockResponse);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
new file mode 100644
index 0000000..30759de
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IClientRequest {
+
+    /**
+     * Mark the request as complete, non-cancellable anymore
+     */
+    void complete();
+
+    /**
+     * Cancel a request
+     *
+     * @param appCtx
+     * @throws HyracksDataException
+     */
+    void cancel(ICcApplicationContext appCtx) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 7f8d31d..f2be6cd 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1545,6 +1545,7 @@
         }
     }
 
+    @Override
     public AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
             IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
             throws AlgebricksException {