[ASTERIXDB-3343][API] Include job details in active/completed requests

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Include job status, create time, start time, queue time, and end time.

Change-Id: Id1bdc935be8bf84674aa6e35ee6c19f2ee1f7971
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18148
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
(cherry picked from commit b767d093235ee01b2b39d98f64592f0ffd822cf4)
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18110
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index db633e6..25fcbf5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -20,7 +20,6 @@
 
 import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.api.IRequestReference;
-import org.apache.asterix.common.api.RequestReference;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.om.base.ADateTime;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -101,8 +100,8 @@
         json.put("elapsedTime", getElapsedTimeInSecs());
         json.put("node", requestReference.getNode());
         json.put("state", state.getLabel());
-        json.put("userAgent", ((RequestReference) requestReference).getUserAgent());
-        json.put("remoteAddr", ((RequestReference) requestReference).getRemoteAddr());
+        json.put("userAgent", requestReference.getUserAgent());
+        json.put("remoteAddr", requestReference.getRemoteAddr());
         json.put("cancellable", cancellable);
         return json;
     }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 4f19366..28549f3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -18,11 +18,17 @@
  */
 package org.apache.asterix.translator;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.asterix.common.api.ICommonRequestParameters;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.om.base.AMutableDateTime;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobRun;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -31,14 +37,17 @@
     protected final long creationTime = System.nanoTime();
     protected final Thread executor;
     protected final String statement;
+    protected final ClusterControllerService ccService;
     protected final String clientContextId;
     protected volatile JobId jobId;
+    private String plan; // can be null
 
-    public ClientRequest(ICommonRequestParameters requestParameters) {
+    public ClientRequest(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx) {
         super(requestParameters.getRequestReference());
         this.clientContextId = requestParameters.getClientContextId();
         this.statement = requestParameters.getStatement();
         this.executor = Thread.currentThread();
+        this.ccService = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
     }
 
     @Override
@@ -46,6 +55,10 @@
         return clientContextId;
     }
 
+    public void setPlan(String plan) {
+        this.plan = plan;
+    }
+
     public synchronized void setJobId(JobId jobId) {
         this.jobId = jobId;
         setRunning();
@@ -78,9 +91,53 @@
     @Override
     public ObjectNode asJson() {
         ObjectNode json = super.asJson();
-        json.put("jobId", jobId != null ? jobId.toString() : null);
+        putJobDetails(json);
         json.put("statement", statement);
         json.put("clientContextID", clientContextId);
+        if (plan != null) {
+            json.put("plan", plan);
+        }
         return json;
     }
+
+    private void putJobDetails(ObjectNode json) {
+        if (jobId == null) {
+            json.putNull("jobId");
+        } else {
+            try {
+                json.put("jobId", jobId.toString());
+                JobRun jobRun = ccService.getJobManager().get(jobId);
+                if (jobRun != null) {
+                    json.put("jobStatus", String.valueOf(jobRun.getStatus()));
+                    putJobRequiredResources(json, jobRun);
+                    putTimes(json, jobRun);
+                }
+            } catch (Throwable th) {
+                // ignore
+            }
+        }
+    }
+
+    private static void putTimes(ObjectNode json, JobRun jobRun) {
+        AMutableDateTime dateTime = new AMutableDateTime(0);
+        putTime(json, jobRun.getCreateTime(), "jobCreateTime", dateTime);
+        putTime(json, jobRun.getStartTime(), "jobStartTime", dateTime);
+        putTime(json, jobRun.getEndTime(), "jobEndTime", dateTime);
+        json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(jobRun.getQueueWaitTimeInMillis()));
+    }
+
+    private static void putJobRequiredResources(ObjectNode json, JobRun jobRun) {
+        IClusterCapacity jobCapacity = jobRun.getJobSpecification().getRequiredClusterCapacity();
+        if (jobCapacity != null) {
+            json.put("jobRequiredCPUs", jobCapacity.getAggregatedCores());
+            json.put("jobRequiredMemory", jobCapacity.getAggregatedMemoryByteSize());
+        }
+    }
+
+    private static void putTime(ObjectNode json, long time, String label, AMutableDateTime dateTime) {
+        if (time > 0) {
+            dateTime.setValue(time);
+            json.put(label, dateTime.toSimpleString());
+        }
+    }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index 84bee6a..6893d5c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.common.api.ISchedulableClientRequest;
 import org.apache.asterix.common.api.RequestReference;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.http.HttpHeaders;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
@@ -46,8 +47,9 @@
     }
 
     @Override
-    public IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException {
-        return new ClientRequest(requestParameters);
+    public IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx)
+            throws HyracksDataException {
+        return new ClientRequest(requestParameters, appCtx);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
index 7ac1431..fa1baa7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -22,6 +22,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.translator.ResultMetadata;
@@ -97,7 +98,7 @@
                     }
                 }
             }
-            metadata.setQueueWaitTimeInNanos(run.getJobProfile().getQueueWaitTimeInNanos());
+            metadata.setQueueWaitTimeInNanos(TimeUnit.MILLISECONDS.toNanos(run.getQueueWaitTimeInMillis()));
         }
         metadata.setProcessedObjects(processedObjects);
         metadata.setBufferCacheHitRatio(pagesRead > 0 ? (pagesRead - nonPagedReads) / (double) pagesRead : Double.NaN);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 0db8dff..1110cd3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -3682,8 +3682,8 @@
 
     public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery,
-            ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters,
-            Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) throws Exception {
+            ResultMetadata outMetadata, Stats stats, IRequestParameters reqParams, Map<String, IAObject> stmtParams,
+            IStatementRewriter stmtRewriter) throws Exception {
         InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
         String datasetName = stmtInsertUpsert.getDatasetName();
         metadataProvider.validateDatabaseObjectName(stmtInsertUpsert.getDataverseName(), datasetName,
@@ -3720,10 +3720,11 @@
                 throw e;
             }
         };
-
+        IRequestTracker requestTracker = appCtx.getRequestTracker();
+        ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqParams.getRequestReference().getUuid());
         if (stmtInsertUpsert.getReturnExpression() != null) {
             deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
-                    requestParameters, false);
+                    reqParams, false, clientRequest);
         } else {
             locker.lock();
             try {
@@ -4726,13 +4727,13 @@
             }
         };
         deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
-                requestParameters, true);
+                requestParameters, true, clientRequest);
     }
 
     private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler,
             MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery,
-            ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable)
-            throws Exception {
+            ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable,
+            ClientRequest clientRequest) throws Exception {
         final ResultSetId resultSetId = metadataProvider.getResultSetId();
         switch (resultDelivery) {
             case ASYNC:
@@ -4748,7 +4749,7 @@
             case IMMEDIATE:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
                     final ResultReader resultReader = new ResultReader(resultSet, id, resultSetId);
-                    updateJobStats(id, stats, metadataProvider.getResultSetId());
+                    updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest);
                     responsePrinter.addResultPrinter(new ResultsPrinter(appCtx, resultReader,
                             metadataProvider.findOutputRecordType(), stats, sessionOutput));
                     responsePrinter.printResults();
@@ -4756,7 +4757,7 @@
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
-                    updateJobStats(id, stats, metadataProvider.getResultSetId());
+                    updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest);
                     responsePrinter.addResultPrinter(
                             new ResultHandlePrinter(sessionOutput, new ResultHandle(id, resultSetId)));
                     responsePrinter.printResults();
@@ -4771,7 +4772,8 @@
         }
     }
 
-    private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId) throws HyracksDataException {
+    private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId, ClientRequest clientRequest)
+            throws HyracksDataException {
         final ClusterControllerService controllerService =
                 (ClusterControllerService) appCtx.getServiceContext().getControllerService();
         org.apache.asterix.translator.ResultMetadata resultMetadata =
@@ -4784,6 +4786,7 @@
             stats.setJobProfile(resultMetadata.getJobProfile());
             apiFramework.generateOptimizedLogicalPlanWithProfile(resultMetadata.getJobProfile());
         }
+        clientRequest.setPlan(apiFramework.getExecutionPlans().getOptimizedLogicalPlan());
         stats.updateTotalWarningsCount(resultMetadata.getTotalWarningsCount());
         WarningUtil.mergeWarnings(resultMetadata.getWarnings(), warningCollector);
     }
@@ -5234,8 +5237,8 @@
     }
 
     protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException {
-        final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters);
-        appCtx.getRequestTracker().track(clientRequest);
+        final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters, appCtx);
+        this.appCtx.getRequestTracker().track(clientRequest);
     }
 
     protected void validateStatements(IRequestParameters requestParameters)
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index b38c366..0e063ca 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -35,8 +35,10 @@
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.runtime.utils.RequestTracker;
 import org.apache.asterix.translator.ClientRequest;
+import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.junit.Test;
@@ -66,13 +68,18 @@
         // Tests the case that query is not in the map.
         IServletRequest mockRequest = mockRequest("1");
         IServletResponse mockResponse = mock(IServletResponse.class);
+        ICCServiceContext mockCCServiceCtx = mock(ICCServiceContext.class);
+        ClusterControllerService mockCCService = mock(ClusterControllerService.class);
+        Mockito.when(appCtx.getServiceContext()).thenReturn(mockCCServiceCtx);
+        Mockito.when(appCtx.getServiceContext().getControllerService()).thenReturn(mockCCService);
+        Mockito.when(mockCCServiceCtx.getControllerService()).thenReturn(mockCCService);
         cancellationServlet.handle(mockRequest, mockResponse);
         verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND);
 
         final RequestReference requestReference = RequestReference.of("1", "node1", System.currentTimeMillis());
         RequestParameters requestParameters = new RequestParameters(requestReference, "select 1", null, null, null,
                 null, null, "1", null, null, null, true);
-        ClientRequest request = new ClientRequest(requestParameters);
+        ClientRequest request = new ClientRequest(requestParameters, appCtx);
         request.setJobId(new JobId(1));
         request.markCancellable();
         tracker.track(request);
@@ -89,7 +96,7 @@
         final RequestReference requestReference2 = RequestReference.of("2", "node1", System.currentTimeMillis());
         requestParameters = new RequestParameters(requestReference2, "select 1", null, null, null, null, null, "2",
                 null, null, null, true);
-        ClientRequest request2 = new ClientRequest(requestParameters);
+        ClientRequest request2 = new ClientRequest(requestParameters, appCtx);
         request2.setJobId(new JobId(2));
         request2.markCancellable();
         tracker.track(request2);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index 95ed22e..f0b4944 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.api;
 
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
 
@@ -35,10 +36,12 @@
      * Generates a {@link IClientRequest} based on the requests parameters
      *
      * @param requestParameters
+     * @param appCtx
      * @return the client request
      * @throws HyracksDataException
      */
-    IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException;
+    IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx)
+            throws HyracksDataException;
 
     /**
      * Ensures a client's request can be executed before its job is started
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index d865b4f..da4d12d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -82,13 +82,13 @@
 
     private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap;
 
-    private long createTime;
+    private final long createTime;
 
-    private long startTime;
+    private volatile long startTime;
 
     private String startTimeZoneId;
 
-    private long endTime;
+    private volatile long endTime;
 
     private JobStatus status;
 
@@ -98,7 +98,7 @@
 
     private List<Exception> pendingExceptions;
 
-    private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
+    private final Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
 
     private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, JobSpecification spec,
             ActivityClusterGraph acg) {
@@ -222,6 +222,10 @@
         this.profile.setEndTime(endTime);
     }
 
+    public long getQueueWaitTimeInMillis() {
+        return startTime > 0 ? startTime - createTime : System.currentTimeMillis() - createTime;
+    }
+
     public void registerOperatorLocation(OperatorDescriptorId op, int partition, String location) {
         operatorLocations.computeIfAbsent(op, k -> new HashMap<>()).put(partition, location);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
index 40bc1ba..2031808 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
@@ -28,7 +28,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.TaskId;
@@ -92,10 +91,6 @@
         this.endTime = endTime;
     }
 
-    public long getQueueWaitTimeInNanos() {
-        return TimeUnit.MILLISECONDS.toNanos(startTime - createTime);
-    }
-
     @Override
     public ObjectNode toJSON() {
         ObjectMapper om = new ObjectMapper();