[ASTERIXDB-3343][API] Add servlet to get acitve/completed requests

- user model changes: no
- storage format changes: no
- interface changes: yes

Backports from:
[ASTERIXDB-3343][API] Add servlet to get completed requests
(cherry picked from commit 88c25279458badf088ae36ecef2bf50a66d9638c)
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18109

[ASTERIXDB-3343][API] Include job details in active/completed requests
(cherry picked from commit b767d093235ee01b2b39d98f64592f0ffd822cf4)
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18110

[ASTERIXDB-3343][API] Capture job state changes in client requests
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18279

[ASTERIXDB-3343][API] Add redact param to redact active/completed requests
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18283

[ASTERIXDB-3343][API] Return new list when getting completed requests
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18306

Ext-ref: MB-62288
Change-Id: Iaf0ad268cb0629c1314d983cb30a499d554dafd4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18495
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index 99cda09..0fafc47 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -20,7 +20,6 @@
 
 import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.api.IRequestReference;
-import org.apache.asterix.common.api.RequestReference;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.om.base.ADateTime;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -93,15 +92,25 @@
         return JSONUtil.convertNodeUnchecked(asJson());
     }
 
-    protected ObjectNode asJson() {
+    @Override
+    public ObjectNode asJson() {
+        return putJson();
+    }
+
+    @Override
+    public ObjectNode asRedactedJson() {
+        return putJson();
+    }
+
+    private ObjectNode putJson() {
         ObjectNode json = JSONUtil.createObject();
         json.put("uuid", requestReference.getUuid());
         json.put("requestTime", new ADateTime(requestReference.getTime()).toSimpleString());
         json.put("elapsedTime", getElapsedTimeInSecs());
         json.put("node", requestReference.getNode());
         json.put("state", state.getLabel());
-        json.put("userAgent", ((RequestReference) requestReference).getUserAgent());
-        json.put("remoteAddr", ((RequestReference) requestReference).getRemoteAddr());
+        json.put("userAgent", requestReference.getUserAgent());
+        json.put("remoteAddr", requestReference.getRemoteAddr());
         json.put("cancellable", cancellable);
         return json;
     }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index c19bb02..31f1979 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -18,11 +18,22 @@
  */
 package org.apache.asterix.translator;
 
+import static org.apache.hyracks.api.job.resource.IJobCapacityController.JobSubmissionStatus.QUEUE;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.asterix.common.api.ICommonRequestParameters;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.om.base.AMutableDateTime;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.util.LogRedactionUtil;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -32,13 +43,16 @@
     protected final Thread executor;
     protected final String statement;
     protected final String clientContextId;
+    protected final JobState jobState;
     protected volatile JobId jobId;
+    private volatile String plan; // can be null
 
     public ClientRequest(ICommonRequestParameters requestParameters) {
         super(requestParameters.getRequestReference());
         this.clientContextId = requestParameters.getClientContextId();
         this.statement = requestParameters.getStatement();
         this.executor = Thread.currentThread();
+        this.jobState = new JobState();
     }
 
     @Override
@@ -46,6 +60,10 @@
         return clientContextId;
     }
 
+    public void setPlan(String plan) {
+        this.plan = plan;
+    }
+
     public synchronized void setJobId(JobId jobId) {
         this.jobId = jobId;
         setRunning();
@@ -76,11 +94,93 @@
     }
 
     @Override
-    protected ObjectNode asJson() {
+    public ObjectNode asJson() {
         ObjectNode json = super.asJson();
-        json.put("jobId", jobId != null ? jobId.toString() : null);
-        json.put("statement", statement);
+        return asJson(json, false);
+    }
+
+    @Override
+    public ObjectNode asRedactedJson() {
+        ObjectNode json = super.asRedactedJson();
+        return asJson(json, true);
+    }
+
+    private ObjectNode asJson(ObjectNode json, boolean redact) {
+        putJobDetails(json, redact);
+        json.put("statement", redact ? LogRedactionUtil.statement(statement) : statement);
         json.put("clientContextID", clientContextId);
+        if (plan != null) {
+            json.put("plan", redact ? LogRedactionUtil.userData(plan) : plan);
+        }
         return json;
     }
+
+    @Override
+    public void jobCreated(JobId jobId, IReadOnlyClusterCapacity requiredClusterCapacity,
+            IJobCapacityController.JobSubmissionStatus status) {
+        jobState.createTime = System.currentTimeMillis();
+        jobState.status = status == QUEUE ? JobStatus.PENDING : JobStatus.RUNNING;
+        jobState.requiredCPUs = requiredClusterCapacity.getAggregatedCores();
+        jobState.requiredMemoryInBytes = requiredClusterCapacity.getAggregatedMemoryByteSize();
+    }
+
+    @Override
+    public void jobStarted(JobId jobId) {
+        jobState.startTime = System.currentTimeMillis();
+        jobState.status = JobStatus.RUNNING;
+    }
+
+    @Override
+    public void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) {
+        jobState.endTime = System.currentTimeMillis();
+        jobState.status = jobStatus;
+        if (exceptions != null && !exceptions.isEmpty()) {
+            jobState.errorMsg = processException(exceptions.get(0));
+        }
+    }
+
+    protected String processException(Exception e) {
+        return ExceptionUtils.unwrap(e).getMessage();
+    }
+
+    private void putJobDetails(ObjectNode json, boolean redact) {
+        try {
+            json.put("jobId", jobId != null ? jobId.toString() : null);
+            putJobState(json, jobState, redact);
+        } catch (Throwable th) {
+            // ignore
+        }
+    }
+
+    private static void putJobState(ObjectNode json, JobState state, boolean redact) {
+        AMutableDateTime dateTime = new AMutableDateTime(0);
+        putTime(json, state.createTime, "jobCreateTime", dateTime);
+        putTime(json, state.startTime, "jobStartTime", dateTime);
+        putTime(json, state.endTime, "jobEndTime", dateTime);
+        long queueTime = (state.startTime > 0 ? state.startTime : System.currentTimeMillis()) - state.createTime;
+        json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(queueTime));
+        json.put("jobStatus", String.valueOf(state.status));
+        json.put("jobRequiredCPUs", state.requiredCPUs);
+        json.put("jobRequiredMemory", state.requiredMemoryInBytes);
+        if (state.errorMsg != null) {
+            json.put("error", redact ? LogRedactionUtil.userData(state.errorMsg) : state.errorMsg);
+        }
+    }
+
+    private static void putTime(ObjectNode json, long time, String label, AMutableDateTime dateTime) {
+        if (time > 0) {
+            dateTime.setValue(time);
+            json.put(label, dateTime.toSimpleString());
+        }
+    }
+
+    static class JobState {
+        volatile long createTime;
+        volatile long startTime;
+        volatile long endTime;
+        volatile long requiredMemoryInBytes;
+        volatile int requiredCPUs;
+        volatile JobStatus status;
+        volatile String errorMsg;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
new file mode 100644
index 0000000..b6c6a71
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+import org.apache.hyracks.util.JSONUtil;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public abstract class AbstractRequestsServlet extends AbstractServlet {
+
+    public static final String REDACT_PARAM = "redact";
+    protected final ICcApplicationContext appCtx;
+
+    public AbstractRequestsServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, String... paths) {
+        super(ctx, paths);
+        this.appCtx = appCtx;
+    }
+
+    @Override
+    protected void get(IServletRequest request, IServletResponse response) throws Exception {
+        ArrayNode requestsJson = JSONUtil.createArray();
+        Collection<IClientRequest> requests = getRequests();
+        String redact = request.getParameter(REDACT_PARAM);
+        if (Boolean.parseBoolean(redact)) {
+            for (IClientRequest req : requests) {
+                requestsJson.add(req.asRedactedJson());
+            }
+        } else {
+            for (IClientRequest req : requests) {
+                requestsJson.add(req.asJson());
+            }
+        }
+        HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request);
+        response.setStatus(HttpResponseStatus.OK);
+        JSONUtil.writeNode(response.writer(), requestsJson);
+        response.writer().flush();
+    }
+
+    abstract Collection<IClientRequest> getRequests();
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveRequestsServlet.java
similarity index 83%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveRequestsServlet.java
index 7ba2867..7e00e9a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveRequestsServlet.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.api.http.server;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.asterix.api.http.server.QueryServiceRequestParameters.Parameter;
@@ -27,7 +28,6 @@
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
-import org.apache.hyracks.http.server.AbstractServlet;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -35,24 +35,27 @@
 import io.netty.handler.codec.http.HttpResponseStatus;
 
 /**
- * The servlet provides a REST API for cancelling an on-going query.
+ * The servlet provides a REST API for getting the running queries or cancelling an on-going one.
  */
-public class CcQueryCancellationServlet extends AbstractServlet {
+public class ActiveRequestsServlet extends AbstractRequestsServlet {
 
     public static final String REQUEST_UUID_PARAM_NAME = "request_id";
     private static final Logger LOGGER = LogManager.getLogger();
-    private final ICcApplicationContext appCtx;
 
-    public CcQueryCancellationServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx,
-            String... paths) {
-        super(ctx, paths);
-        this.appCtx = appCtx;
+    public ActiveRequestsServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, String... paths) {
+        super(ctx, appCtx, paths);
+    }
+
+    @Override
+    public Collection<IClientRequest> getRequests() {
+        return appCtx.getRequestTracker().getRunningRequests();
     }
 
     @Override
     protected void delete(IServletRequest request, IServletResponse response) throws IOException {
         String uuid = request.getParameter(REQUEST_UUID_PARAM_NAME);
         String clientCtxId = request.getParameter(Parameter.CLIENT_ID.str());
+        LOGGER.debug("received cancel request, uuid={}, clientCtxId={}", uuid, clientCtxId);
         if (uuid == null && clientCtxId == null) {
             response.setStatus(HttpResponseStatus.BAD_REQUEST);
             return;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CompletedRequestsServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CompletedRequestsServlet.java
new file mode 100644
index 0000000..92eacbb
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CompletedRequestsServlet.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+
+public class CompletedRequestsServlet extends AbstractRequestsServlet {
+
+    public CompletedRequestsServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, String... paths) {
+        super(ctx, appCtx, paths);
+    }
+
+    @Override
+    public Collection<IClientRequest> getRequests() {
+        return appCtx.getRequestTracker().getCompletedRequests();
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
index b2134dc..5dd9430 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static org.apache.asterix.api.http.server.CcQueryCancellationServlet.REQUEST_UUID_PARAM_NAME;
+import static org.apache.asterix.api.http.server.ActiveRequestsServlet.REQUEST_UUID_PARAM_NAME;
 import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
 
 import java.util.concurrent.ConcurrentMap;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 662884d..0f2780f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -41,6 +41,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.util.SingleThreadEventProcessor;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.logging.log4j.Level;
@@ -89,7 +90,8 @@
     // *** IJobLifecycleListener
 
     @Override
-    public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
+    public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification,
+            IJobCapacityController.JobSubmissionStatus status) throws HyracksDataException {
         Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
         if (!(property instanceof EntityId)) {
             if (property != null) {
@@ -118,7 +120,7 @@
     }
 
     @Override
-    public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
         EntityId entityId = jobId2EntityId.get(jobId);
         if (entityId != null) {
             add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, null));
@@ -127,8 +129,8 @@
     }
 
     @Override
-    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
-            throws HyracksException {
+    public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus,
+            List<Exception> exceptions) throws HyracksException {
         EntityId entityId = jobId2EntityId.get(jobId);
         if (entityId != null) {
             LOGGER.debug("notified of ingestion job finish {}", jobId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 6b40cbf..31b903b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -3664,8 +3664,8 @@
 
     public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery,
-            ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters,
-            Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) throws Exception {
+            ResultMetadata outMetadata, Stats stats, IRequestParameters reqParams, Map<String, IAObject> stmtParams,
+            IStatementRewriter stmtRewriter) throws Exception {
         InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
         String datasetName = stmtInsertUpsert.getDatasetName();
         metadataProvider.validateDatabaseObjectName(stmtInsertUpsert.getDataverseName(), datasetName,
@@ -3702,10 +3702,11 @@
                 throw e;
             }
         };
-
+        IRequestTracker requestTracker = appCtx.getRequestTracker();
+        ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqParams.getRequestReference().getUuid());
         if (stmtInsertUpsert.getReturnExpression() != null) {
             deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
-                    requestParameters, false);
+                    reqParams, false, clientRequest);
         } else {
             locker.lock();
             try {
@@ -4702,13 +4703,13 @@
             }
         };
         deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
-                requestParameters, true);
+                requestParameters, true, clientRequest);
     }
 
     private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler,
             MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery,
-            ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable)
-            throws Exception {
+            ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable,
+            ClientRequest clientRequest) throws Exception {
         final ResultSetId resultSetId = metadataProvider.getResultSetId();
         switch (resultDelivery) {
             case ASYNC:
@@ -4724,7 +4725,7 @@
             case IMMEDIATE:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
                     final ResultReader resultReader = new ResultReader(resultSet, id, resultSetId);
-                    updateJobStats(id, stats, metadataProvider.getResultSetId());
+                    updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest);
                     responsePrinter.addResultPrinter(new ResultsPrinter(appCtx, resultReader,
                             metadataProvider.findOutputRecordType(), stats, sessionOutput));
                     responsePrinter.printResults();
@@ -4732,7 +4733,7 @@
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
-                    updateJobStats(id, stats, metadataProvider.getResultSetId());
+                    updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest);
                     responsePrinter.addResultPrinter(
                             new ResultHandlePrinter(sessionOutput, new ResultHandle(id, resultSetId)));
                     responsePrinter.printResults();
@@ -4747,7 +4748,8 @@
         }
     }
 
-    private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId) throws HyracksDataException {
+    private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId, ClientRequest clientRequest)
+            throws HyracksDataException {
         final ClusterControllerService controllerService =
                 (ClusterControllerService) appCtx.getServiceContext().getControllerService();
         org.apache.asterix.translator.ResultMetadata resultMetadata =
@@ -4757,6 +4759,7 @@
         if (jobFlags.contains(JobFlag.PROFILE_RUNTIME)) {
             stats.setJobProfile(resultMetadata.getJobProfile());
         }
+        clientRequest.setPlan(apiFramework.getExecutionPlans().getOptimizedLogicalPlan());
         stats.updateTotalWarningsCount(resultMetadata.getTotalWarningsCount());
         WarningUtil.mergeWarnings(resultMetadata.getWarnings(), warningCollector);
     }
@@ -4836,6 +4839,7 @@
             appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
             // ensure request not cancelled before running job
             ensureNotCancelled(clientRequest);
+            jobSpec.setRequestId(clientRequest.getId());
             final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             if (LOGGER.isInfoEnabled()) {
                 LOGGER.info("Created job {} for query uuid:{}, clientContextID:{}", jobId,
@@ -5208,7 +5212,7 @@
 
     protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException {
         final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters);
-        appCtx.getRequestTracker().track(clientRequest);
+        this.appCtx.getRequestTracker().track(clientRequest);
     }
 
     protected void validateStatements(IRequestParameters requestParameters)
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index bc67823..31ec44c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -37,9 +37,9 @@
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.asterix.api.http.IQueryWebServerRegistrant;
+import org.apache.asterix.api.http.server.ActiveRequestsServlet;
 import org.apache.asterix.api.http.server.ActiveStatsApiServlet;
 import org.apache.asterix.api.http.server.ApiServlet;
-import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
 import org.apache.asterix.api.http.server.ClusterApiServlet;
 import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
 import org.apache.asterix.api.http.server.ConnectorApiServlet;
@@ -171,6 +171,7 @@
         ccServiceCtx.setDistributedState(proxy);
         MetadataManager.initialize(proxy, metadataProperties, appCtx);
         ccServiceCtx.addJobLifecycleListener(appCtx.getActiveNotificationHandler());
+        ccServiceCtx.addJobLifecycleListener(appCtx.getRequestTracker());
 
         // create event loop groups
         webManager = new WebManager();
@@ -315,7 +316,7 @@
         ConcurrentMap<String, Object> ctx = server.ctx();
         switch (key) {
             case Servlets.RUNNING_REQUESTS:
-                return new CcQueryCancellationServlet(ctx, appCtx, paths);
+                return new ActiveRequestsServlet(ctx, appCtx, paths);
             case Servlets.QUERY_STATUS:
                 return new QueryStatusApiServlet(ctx, appCtx, paths);
             case Servlets.QUERY_RESULT:
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index 68fb9a8..673611d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -27,7 +27,7 @@
 
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
+import org.apache.asterix.api.http.server.ActiveRequestsServlet;
 import org.apache.asterix.api.http.server.ServletConstants;
 import org.apache.asterix.app.translator.RequestParameters;
 import org.apache.asterix.common.api.RequestReference;
@@ -35,8 +35,10 @@
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.runtime.utils.RequestTracker;
 import org.apache.asterix.translator.ClientRequest;
+import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.junit.Test;
@@ -57,8 +59,8 @@
         RequestTracker tracker = new RequestTracker(appCtx);
         Mockito.when(appCtx.getRequestTracker()).thenReturn(tracker);
         // Creates a query cancellation servlet.
-        CcQueryCancellationServlet cancellationServlet =
-                new CcQueryCancellationServlet(new ConcurrentHashMap<>(), appCtx, new String[] { "/" });
+        ActiveRequestsServlet cancellationServlet =
+                new ActiveRequestsServlet(new ConcurrentHashMap<>(), appCtx, new String[] { "/" });
         // Adds mocked Hyracks client connection into the servlet context.
         IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class);
         cancellationServlet.ctx().put(ServletConstants.HYRACKS_CONNECTION_ATTR, mockHcc);
@@ -66,6 +68,11 @@
         // Tests the case that query is not in the map.
         IServletRequest mockRequest = mockRequest("1");
         IServletResponse mockResponse = mock(IServletResponse.class);
+        ICCServiceContext mockCCServiceCtx = mock(ICCServiceContext.class);
+        ClusterControllerService mockCCService = mock(ClusterControllerService.class);
+        Mockito.when(appCtx.getServiceContext()).thenReturn(mockCCServiceCtx);
+        Mockito.when(appCtx.getServiceContext().getControllerService()).thenReturn(mockCCService);
+        Mockito.when(mockCCServiceCtx.getControllerService()).thenReturn(mockCCService);
         cancellationServlet.handle(mockRequest, mockResponse);
         verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND);
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index cb123bf..c9d862c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -53,6 +53,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -135,8 +136,9 @@
         TestUserActor user = new TestUserActor("Xikui", mdProvider, null);
         Action start = user.startActivity(eventsListener);
         startingSubscriber.sync();
-        activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec);
-        activeJobNotificationHandler.notifyJobStart(jobId);
+        activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec,
+                IJobCapacityController.JobSubmissionStatus.EXECUTE);
+        activeJobNotificationHandler.notifyJobStart(jobId, jobSpec);
         try {
             eventsListener.refreshStats(1000);
         } catch (HyracksDataException e) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
index 52d4225..883a0cb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.mockito.Mockito;
 
 public class TestClusterControllerActor extends Actor {
@@ -49,8 +50,8 @@
                 JobSpecification jobSpecification = Mockito.mock(JobSpecification.class);
                 Mockito.when(jobSpecification.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME))
                         .thenReturn(entityId);
-                handler.notifyJobCreation(jobId, jobSpecification);
-                handler.notifyJobStart(jobId);
+                handler.notifyJobCreation(jobId, jobSpecification, IJobCapacityController.JobSubmissionStatus.EXECUTE);
+                handler.notifyJobStart(jobId, null);
             }
         };
         add(startJob);
@@ -72,7 +73,7 @@
         Action delivery = new Action() {
             @Override
             protected void doExecute(MetadataProvider actorMdProvider) throws Exception {
-                handler.notifyJobFinish(jobId, jobStatus, exceptions);
+                handler.notifyJobFinish(jobId, null, jobStatus, exceptions);
             }
         };
         add(delivery);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
index 7eee42e..07f8eea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
@@ -18,5 +18,5 @@
  */
  -- param client_context_id=ensure_running_query
  -- polltimeoutsecs=15
-SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM active_requests() rqst
+SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid, rqst.jobStatus FROM active_requests() rqst
 WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp
new file mode 100644
index 0000000..de35939
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ -- param client_context_id=ensure_completed_query
+ -- param ignoreextrafields=true
+SELECT VALUE (SELECT VALUE r FROM completed_requests() r
+WHERE r.state="completed" AND r.clientContextID = "completed_requests_query")[0];
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
index e31fe3b..170838e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
@@ -1 +1 @@
-/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*" \}/
\ No newline at end of file
+/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*", "jobStatus": "RUNNING" \}/
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson
new file mode 100644
index 0000000..15affd1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson
@@ -0,0 +1,19 @@
+{
+    "cancellable": true,
+    "clientContextID": "completed_requests_query",
+    "elapsedTime": "R{.*}",
+    "jobCreateTime": "R{.*}",
+    "jobEndTime": "R{.*}",
+    "jobId": "R{.*}",
+    "jobQueueTime": "R{.*}",
+    "jobRequiredCPUs": "R{.*}",
+    "jobRequiredMemory": "R{.*}",
+    "jobStartTime": "R{.*}",
+    "jobStatus": "TERMINATED",
+    "node": "R{.*}",
+    "remoteAddr": "R{.*}",
+    "requestTime": "R{.*}",
+    "state": "completed",
+    "userAgent": "R{.*}",
+    "uuid": "R{.*}"
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 921fb64..ee518dd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -18,8 +18,16 @@
  */
 package org.apache.asterix.common.api;
 
+import java.util.List;
+
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public interface IClientRequest {
 
@@ -86,7 +94,44 @@
     void cancel(ICcApplicationContext appCtx) throws HyracksDataException;
 
     /**
-     * @return A json representation of this request
+     * @return A json string representation of this request
      */
     String toJson();
+
+    /**
+     * @return A json node representation of this request
+     */
+    ObjectNode asJson();
+
+    /**
+     * @return A redacted json node representation of this request
+     */
+    ObjectNode asRedactedJson();
+
+    /**
+     * Called when the job is created.
+     *
+     * @param jobId the job id
+     * @param requiredClusterCapacity the required resources by the job
+     * @param status the status of the job; whether it will be executed or queued
+     */
+    void jobCreated(JobId jobId, IReadOnlyClusterCapacity requiredClusterCapacity,
+            IJobCapacityController.JobSubmissionStatus status);
+
+    /**
+     * Called when the job starts running.
+     *
+     * @param jobId the job id
+     */
+    void jobStarted(JobId jobId);
+
+    /**
+     * Called when the job finishes.
+     *
+     * @param jobId the job id
+     * @param jobStatus the final job status
+     * @param exceptions exceptions encountered if any
+     */
+    void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> exceptions);
+
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index 95ed22e..153eea9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -34,9 +34,9 @@
     /**
      * Generates a {@link IClientRequest} based on the requests parameters
      *
-     * @param requestParameters
+     * @param requestParameters the request parameters
      * @return the client request
-     * @throws HyracksDataException
+     * @throws HyracksDataException HyracksDataException
      */
     IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException;
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
index 0019015..5a99f09 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
@@ -21,8 +21,9 @@
 import java.util.Collection;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
 
-public interface IRequestTracker {
+public interface IRequestTracker extends IJobLifecycleListener {
 
     /**
      * Starts tracking {@code request}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
index a2ebd9a..02f20f8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@ -36,6 +36,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 
 @ThreadSafe
@@ -44,17 +45,19 @@
     private final Map<String, Set<JobId>> nodeJobs = new HashMap<>();
 
     @Override
-    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) {
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
+            IJobCapacityController.JobSubmissionStatus status) {
         getJobParticipatingNodes(spec).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
     }
 
     @Override
-    public synchronized void notifyJobStart(JobId jobId) {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) {
         // nothing to do
     }
 
     @Override
-    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) {
+    public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus,
+            List<Exception> exceptions) {
         nodeJobs.values().forEach(jobsSet -> jobsSet.remove(jobId));
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index c9425c6..a754eb6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -21,6 +21,8 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -29,20 +31,32 @@
 import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 
 public class RequestTracker implements IRequestTracker {
 
     private final Map<String, IClientRequest> runningRequests = new ConcurrentHashMap<>();
     private final Map<String, IClientRequest> clientIdRequests = new ConcurrentHashMap<>();
-    private final CircularFifoQueue<IClientRequest> completedRequests;
+    private final Map<String, IClientRequest> completedRequests;
     private final ICcApplicationContext ccAppCtx;
     private final AtomicLong numRequests;
 
     public RequestTracker(ICcApplicationContext ccAppCtx) {
         this.ccAppCtx = ccAppCtx;
-        completedRequests = new CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize());
+        int archiveSize = ccAppCtx.getExternalProperties().getRequestsArchiveSize();
+        completedRequests = new LinkedHashMap<>(archiveSize) {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<String, IClientRequest> eldest) {
+                return size() > archiveSize;
+            }
+        };
         numRequests = new AtomicLong(0);
     }
 
@@ -94,7 +108,7 @@
 
     @Override
     public synchronized Collection<IClientRequest> getCompletedRequests() {
-        return Collections.unmodifiableCollection(new ArrayList<>(completedRequests));
+        return Collections.unmodifiableCollection(new ArrayList<>(completedRequests.values()));
     }
 
     private void cancel(IClientRequest request) throws HyracksDataException {
@@ -114,10 +128,55 @@
     }
 
     private synchronized void archive(IClientRequest request) {
-        completedRequests.add(request);
+        completedRequests.put(request.getId(), request);
     }
 
     public long getTotalNumberOfRequests() {
         return numRequests.get();
     }
+
+    @Override
+    public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+            throws HyracksException {
+        String requestId = spec.getRequestId();
+        if (requestId != null) {
+            IClientRequest request = getRequest(requestId);
+            if (request != null) {
+                request.jobCreated(jobId, spec.getRequiredClusterCapacity(), status);
+            }
+        }
+    }
+
+    @Override
+    public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
+        String requestId = spec.getRequestId();
+        if (requestId != null) {
+            IClientRequest request = getRequest(requestId);
+            if (request != null) {
+                request.jobStarted(jobId);
+            }
+        }
+    }
+
+    @Override
+    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+            throws HyracksException {
+        String requestId = spec.getRequestId();
+        if (requestId != null) {
+            IClientRequest request = getRequest(requestId);
+            if (request != null) {
+                request.jobFinished(jobId, jobStatus, exceptions);
+            }
+        }
+    }
+
+    private IClientRequest getRequest(String requestId) {
+        IClientRequest clientRequest = runningRequests.get(requestId);
+        if (clientRequest != null) {
+            return clientRequest;
+        }
+        synchronized (this) {
+            return completedRequests.get(requestId);
+        }
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
index be75ecb..82d8cb7 100644
--- a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
+++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
@@ -26,6 +26,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -50,18 +51,18 @@
         jobSpec.getUserConstraints().add(new Constraint(lValueMock, unknownLocation));
 
         JobId jobId = new JobId(1);
-        nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+        nodeJobTracker.notifyJobCreation(jobId, jobSpec, IJobCapacityController.JobSubmissionStatus.EXECUTE);
         // make sure nc1 has a pending job
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
         Assert.assertTrue(nodeJobTracker.getPendingJobs(unknown).isEmpty());
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc2).isEmpty());
-        nodeJobTracker.notifyJobFinish(jobId, JobStatus.TERMINATED, null);
+        nodeJobTracker.notifyJobFinish(jobId, jobSpec, JobStatus.TERMINATED, null);
         // make sure nc1 doesn't have pending jobs anymore
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
 
         // make sure node doesn't have pending jobs after failure
         jobId = new JobId(2);
-        nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+        nodeJobTracker.notifyJobCreation(jobId, jobSpec, IJobCapacityController.JobSubmissionStatus.EXECUTE);
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
         nodeJobTracker.notifyNodeFailure(Collections.singleton(nc1));
         Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
index 338c331..6773dde 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
@@ -21,6 +21,7 @@
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 
 /**
  * A listener for job related events
@@ -29,27 +30,33 @@
     /**
      * Notify the listener that a job has been created
      *
-     * @param jobId
-     * @param spec
-     * @throws HyracksException
+     * @param jobId the job id
+     * @param spec the job specification
+     * @param status the status of the job; whether it will be executed or queued
+     * @throws HyracksException HyracksException
      */
-    void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException;
+    void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+            throws HyracksException;
 
     /**
      * Notify the listener that the job has started on the cluster controller
      *
-     * @param jobId
-     * @throws HyracksException
+     * @param jobId the job id
+     * @param spec the job specification
+     *
+     * @throws HyracksException HyracksException
      */
-    void notifyJobStart(JobId jobId) throws HyracksException;
+    void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException;
 
     /**
      * Notify the listener that the job has been terminated, passing exceptions in case of failure
      *
-     * @param jobId
-     * @param jobStatus
-     * @param exceptions
-     * @throws HyracksException
+     * @param jobId the job id
+     * @param spec the job specification
+     * @param jobStatus the job status
+     * @param exceptions the job exceptions
+     * @throws HyracksException HyracksException
      */
-    void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException;
+    void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+            throws HyracksException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 58336a0..6d0ce6b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -79,6 +79,8 @@
 
     private long maxWarnings;
 
+    private String requestId;
+
     private IJobletEventListenerFactory jobletEventListenerFactory;
 
     private IGlobalJobDataFactory globalJobDataFactory;
@@ -254,6 +256,14 @@
         this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
     }
 
+    public void setRequestId(String requestId) {
+        this.requestId = requestId;
+    }
+
+    public String getRequestId() {
+        return requestId;
+    }
+
     public void setFrameSize(int frameSize) {
         this.frameSize = frameSize;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
index de166dd..555d37e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
@@ -37,6 +37,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.application.ServiceContext;
@@ -79,22 +80,23 @@
         jobLifecycleListeners.add(jobLifecycleListener);
     }
 
-    public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobStart(jobId);
+            l.notifyJobStart(jobId, spec);
         }
     }
 
-    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
-            throws HyracksException {
+    public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus,
+            List<Exception> exceptions) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobFinish(jobId, jobStatus, exceptions);
+            l.notifyJobFinish(jobId, spec, jobStatus, exceptions);
         }
     }
 
-    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
+            IJobCapacityController.JobSubmissionStatus status) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobCreation(jobId, spec);
+            l.notifyJobCreation(jobId, spec, status);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index da840af..567d20c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -115,7 +115,7 @@
 
     public void startJob() throws HyracksException {
         startRunnableActivityClusters();
-        ccs.getContext().notifyJobStart(jobRun.getJobId());
+        ccs.getContext().notifyJobStart(jobRun.getJobId(), jobRun.getJobSpecification());
     }
 
     public void cancelJob(IResultCallback<Void> callback) throws HyracksException {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index ad97188..d803c88 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -112,7 +112,7 @@
         JobSpecification job = jobRun.getJobSpecification();
         IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
         CCServiceContext serviceCtx = ccs.getContext();
-        serviceCtx.notifyJobCreation(jobRun.getJobId(), job);
+        serviceCtx.notifyJobCreation(jobRun.getJobId(), job, status);
         switch (status) {
             case QUEUE:
                 queueJob(jobRun);
@@ -148,7 +148,8 @@
             CCServiceContext serviceCtx = ccs.getContext();
             if (serviceCtx != null) {
                 try {
-                    serviceCtx.notifyJobFinish(jobId, JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
+                    serviceCtx.notifyJobFinish(jobId, jobRun.getJobSpecification(), JobStatus.FAILURE_BEFORE_EXECUTION,
+                            exceptions);
                 } catch (Exception e) {
                     LOGGER.error("Exception notifying cancel on pending job {}", jobId, e);
                     throw HyracksDataException.create(e);
@@ -224,7 +225,8 @@
         Throwable caughtException = null;
         CCServiceContext serviceCtx = ccs.getContext();
         try {
-            serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), run.getPendingExceptions());
+            serviceCtx.notifyJobFinish(jobId, run.getJobSpecification(), run.getPendingStatus(),
+                    run.getPendingExceptions());
         } catch (Exception e) {
             LOGGER.error("Exception notifying job finish {}", jobId, e);
             caughtException = e;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index 0cc09b4..06e955a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -82,13 +82,13 @@
 
     private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap;
 
-    private long createTime;
+    private final long createTime;
 
-    private long startTime;
+    private volatile long startTime;
 
     private String startTimeZoneId;
 
-    private long endTime;
+    private volatile long endTime;
 
     private JobStatus status;
 
@@ -98,7 +98,7 @@
 
     private List<Exception> pendingExceptions;
 
-    private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
+    private final Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
 
     private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, JobSpecification spec,
             ActivityClusterGraph acg) {
@@ -218,6 +218,10 @@
         this.endTime = endTime;
     }
 
+    public long getQueueWaitTimeInMillis() {
+        return startTime > 0 ? startTime - createTime : System.currentTimeMillis() - createTime;
+    }
+
     public void registerOperatorLocation(OperatorDescriptorId op, int partition, String location) {
         operatorLocations.computeIfAbsent(op, k -> new HashMap<>()).put(partition, location);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 80d7630..b6274d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -34,6 +34,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.result.IJobResultCallback;
 import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.IResultStateRecord;
@@ -77,7 +78,8 @@
     }
 
     @Override
-    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
+            IJobCapacityController.JobSubmissionStatus status) throws HyracksException {
         if (jobResultLocations.get(jobId) != null) {
             throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, jobId);
         }
@@ -85,12 +87,13 @@
     }
 
     @Override
-    public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
         jobResultLocations.get(jobId).getRecord().start();
     }
 
     @Override
-    public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
+    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+            throws HyracksException {
         if (exceptions == null || exceptions.isEmpty()) {
             final ResultJobRecord resultJobRecord = getResultJobRecord(jobId);
             if (resultJobRecord == null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
index 008be29..19fdcfe 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -47,7 +48,8 @@
     private final Set<JobId> finishWithoutStart = new HashSet<>();
 
     @Override
-    public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+    public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+            throws HyracksException {
         if (created.containsKey(jobId)) {
             LOGGER.log(Level.WARN, "Job " + jobId + "has been created before");
             increment(doubleCreated, jobId);
@@ -62,7 +64,7 @@
     }
 
     @Override
-    public void notifyJobStart(JobId jobId) throws HyracksException {
+    public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
         if (!created.containsKey(jobId)) {
             LOGGER.log(Level.WARN, "Job " + jobId + "has not been created");
             startWithoutCreate.add(jobId);
@@ -75,7 +77,8 @@
     }
 
     @Override
-    public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
+    public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+            throws HyracksException {
         if (!started.contains(jobId)) {
             LOGGER.log(Level.WARN, "Job " + jobId + "has not been started");
             finishWithoutStart.add(jobId);