[ASTERIXDB-3343][API] Include job details in active/completed requests

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Include job status, create time, start time, queue time, and end time.

Change-Id: Id1bdc935be8bf84674aa6e35ee6c19f2ee1f7971
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18148
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index db633e6..0436ea8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -20,7 +20,6 @@
 
 import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.api.IRequestReference;
-import org.apache.asterix.common.api.RequestReference;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.om.base.ADateTime;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -98,11 +97,11 @@
         ObjectNode json = JSONUtil.createObject();
         json.put("uuid", requestReference.getUuid());
         json.put("requestTime", new ADateTime(requestReference.getTime()).toSimpleString());
-        json.put("elapsedTime", getElapsedTimeInSecs());
+        json.put("elapsedTime", getElapsedTimeInSecs() + "s");
         json.put("node", requestReference.getNode());
         json.put("state", state.getLabel());
-        json.put("userAgent", ((RequestReference) requestReference).getUserAgent());
-        json.put("remoteAddr", ((RequestReference) requestReference).getRemoteAddr());
+        json.put("userAgent", requestReference.getUserAgent());
+        json.put("remoteAddr", requestReference.getRemoteAddr());
         json.put("cancellable", cancellable);
         return json;
     }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 4f19366..983f99c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -18,11 +18,18 @@
  */
 package org.apache.asterix.translator;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.asterix.common.api.ICommonRequestParameters;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.om.base.AMutableDateTime;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.util.StorageUtil;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -31,14 +38,17 @@
     protected final long creationTime = System.nanoTime();
     protected final Thread executor;
     protected final String statement;
+    protected final ClusterControllerService ccService;
     protected final String clientContextId;
     protected volatile JobId jobId;
+    private String plan; // can be null
 
-    public ClientRequest(ICommonRequestParameters requestParameters) {
+    public ClientRequest(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx) {
         super(requestParameters.getRequestReference());
         this.clientContextId = requestParameters.getClientContextId();
         this.statement = requestParameters.getStatement();
         this.executor = Thread.currentThread();
+        this.ccService = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
     }
 
     @Override
@@ -46,6 +56,10 @@
         return clientContextId;
     }
 
+    public void setPlan(String plan) {
+        this.plan = plan;
+    }
+
     public synchronized void setJobId(JobId jobId) {
         this.jobId = jobId;
         setRunning();
@@ -78,9 +92,53 @@
     @Override
     public ObjectNode asJson() {
         ObjectNode json = super.asJson();
-        json.put("jobId", jobId != null ? jobId.toString() : null);
+        putJobDetails(json);
         json.put("statement", statement);
         json.put("clientContextID", clientContextId);
+        if (plan != null) {
+            json.put("plan", plan);
+        }
         return json;
     }
+
+    private void putJobDetails(ObjectNode json) {
+        if (jobId == null) {
+            json.putNull("jobId");
+        } else {
+            try {
+                json.put("jobId", jobId.toString());
+                JobRun jobRun = ccService.getJobManager().get(jobId);
+                if (jobRun != null) {
+                    json.put("jobStatus", String.valueOf(jobRun.getStatus()));
+                    putJobRequiredResources(json, jobRun);
+                    putTimes(json, jobRun);
+                }
+            } catch (Throwable th) {
+                // ignore
+            }
+        }
+    }
+
+    private static void putTimes(ObjectNode json, JobRun jobRun) {
+        AMutableDateTime dateTime = new AMutableDateTime(0);
+        putTime(json, jobRun.getCreateTime(), "jobCreateTime", dateTime);
+        putTime(json, jobRun.getStartTime(), "jobStartTime", dateTime);
+        putTime(json, jobRun.getEndTime(), "jobEndTime", dateTime);
+        json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(jobRun.getQueueWaitTimeInMillis()) + "s");
+    }
+
+    private static void putJobRequiredResources(ObjectNode json, JobRun jobRun) {
+        IClusterCapacity jobCapacity = jobRun.getJobSpecification().getRequiredClusterCapacity();
+        if (jobCapacity != null) {
+            json.put("jobRequiredCPUs", jobCapacity.getAggregatedCores());
+            json.put("jobRequiredMemory", StorageUtil.toHumanReadableSize(jobCapacity.getAggregatedMemoryByteSize()));
+        }
+    }
+
+    private static void putTime(ObjectNode json, long time, String label, AMutableDateTime dateTime) {
+        if (time > 0) {
+            dateTime.setValue(time);
+            json.put(label, dateTime.toSimpleString());
+        }
+    }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index ed68d8b..6ded860 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.common.api.IRequestReference;
 import org.apache.asterix.common.api.ISchedulableClientRequest;
 import org.apache.asterix.common.api.RequestReference;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.http.HttpHeaders;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -47,8 +48,9 @@
     }
 
     @Override
-    public IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException {
-        return new ClientRequest(requestParameters);
+    public IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx)
+            throws HyracksDataException {
+        return new ClientRequest(requestParameters, appCtx);
     }
 
     @Override
@@ -57,7 +59,7 @@
     }
 
     @Override
-    public void ensureAuthorized(ICommonRequestParameters requestParameters, IMetadataProvider metadataProvider)
+    public void ensureAuthorized(ICommonRequestParameters requestParameters, IMetadataProvider<?, ?> metadataProvider)
             throws HyracksDataException {
 
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
index a9a3aac..c419d0d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -22,6 +22,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.translator.ResultMetadata;
@@ -97,7 +98,7 @@
                     }
                 }
             }
-            metadata.setQueueWaitTimeInNanos(run.getJobProfile().getQueueWaitTimeInNanos());
+            metadata.setQueueWaitTimeInNanos(TimeUnit.MILLISECONDS.toNanos(run.getQueueWaitTimeInMillis()));
         }
         metadata.setProcessedObjects(processedObjects);
         metadata.setBufferCacheHitRatio(pagesRead > 0 ? (pagesRead - nonPagedReads) / (double) pagesRead : Double.NaN);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 100e524..34648dd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4101,13 +4101,13 @@
         };
 
         deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
-                requestParameters, true, null);
+                requestParameters, true, null, clientRequest);
     }
 
     public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery,
-            ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters,
-            Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) throws Exception {
+            ResultMetadata outMetadata, Stats stats, IRequestParameters reqParams, Map<String, IAObject> stmtParams,
+            IStatementRewriter stmtRewriter) throws Exception {
         InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
         String datasetName = stmtInsertUpsert.getDatasetName();
         metadataProvider.validateDatabaseObjectName(stmtInsertUpsert.getNamespace(), datasetName,
@@ -4147,9 +4147,11 @@
                 throw e;
             }
         };
+        IRequestTracker requestTracker = appCtx.getRequestTracker();
+        ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqParams.getRequestReference().getUuid());
         if (stmtInsertUpsert.getReturnExpression() != null) {
             deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
-                    requestParameters, false, stmt);
+                    reqParams, false, stmt, clientRequest);
         } else {
             locker.lock();
             JobId jobId = null;
@@ -4173,9 +4175,6 @@
                             participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions));
                 }
                 jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
-                final IRequestTracker requestTracker = appCtx.getRequestTracker();
-                final ClientRequest clientRequest =
-                        (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
                 clientRequest.setJobId(jobId);
                 String nameBefore = Thread.currentThread().getName();
                 try {
@@ -5262,13 +5261,13 @@
             }
         };
         deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
-                requestParameters, true, null);
+                requestParameters, true, null, clientRequest);
     }
 
     private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler,
             MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery,
             ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable,
-            Statement atomicStmt) throws Exception {
+            Statement atomicStmt, ClientRequest clientRequest) throws Exception {
         final ResultSetId resultSetId = metadataProvider.getResultSetId();
         switch (resultDelivery) {
             case ASYNC:
@@ -5284,7 +5283,7 @@
             case IMMEDIATE:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
                     final ResultReader resultReader = new ResultReader(resultSet, id, resultSetId);
-                    updateJobStats(id, stats, metadataProvider.getResultSetId());
+                    updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest);
                     responsePrinter.addResultPrinter(new ResultsPrinter(appCtx, resultReader,
                             metadataProvider.findOutputRecordType(), stats, sessionOutput));
                     responsePrinter.printResults();
@@ -5292,7 +5291,7 @@
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
-                    updateJobStats(id, stats, metadataProvider.getResultSetId());
+                    updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest);
                     responsePrinter.addResultPrinter(
                             new ResultHandlePrinter(sessionOutput, new ResultHandle(id, resultSetId)));
                     responsePrinter.printResults();
@@ -5307,7 +5306,8 @@
         }
     }
 
-    private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId) throws HyracksDataException {
+    private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId, ClientRequest clientRequest)
+            throws HyracksDataException {
         final ClusterControllerService controllerService =
                 (ClusterControllerService) appCtx.getServiceContext().getControllerService();
         org.apache.asterix.translator.ResultMetadata resultMetadata =
@@ -5321,6 +5321,7 @@
             stats.setJobProfile(resultMetadata.getJobProfile());
             apiFramework.generateOptimizedLogicalPlanWithProfile(resultMetadata.getJobProfile());
         }
+        clientRequest.setPlan(apiFramework.getExecutionPlans().getOptimizedLogicalPlan());
         stats.updateTotalWarningsCount(resultMetadata.getTotalWarningsCount());
         WarningUtil.mergeWarnings(resultMetadata.getWarnings(), warningCollector);
     }
@@ -5569,8 +5570,8 @@
     }
 
     protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException {
-        final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters);
-        appCtx.getRequestTracker().track(clientRequest);
+        final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters, appCtx);
+        this.appCtx.getRequestTracker().track(clientRequest);
     }
 
     protected void validateStatements(IRequestParameters requestParameters)
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index b38c366..0e063ca 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -35,8 +35,10 @@
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.runtime.utils.RequestTracker;
 import org.apache.asterix.translator.ClientRequest;
+import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.junit.Test;
@@ -66,13 +68,18 @@
         // Tests the case that query is not in the map.
         IServletRequest mockRequest = mockRequest("1");
         IServletResponse mockResponse = mock(IServletResponse.class);
+        ICCServiceContext mockCCServiceCtx = mock(ICCServiceContext.class);
+        ClusterControllerService mockCCService = mock(ClusterControllerService.class);
+        Mockito.when(appCtx.getServiceContext()).thenReturn(mockCCServiceCtx);
+        Mockito.when(appCtx.getServiceContext().getControllerService()).thenReturn(mockCCService);
+        Mockito.when(mockCCServiceCtx.getControllerService()).thenReturn(mockCCService);
         cancellationServlet.handle(mockRequest, mockResponse);
         verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND);
 
         final RequestReference requestReference = RequestReference.of("1", "node1", System.currentTimeMillis());
         RequestParameters requestParameters = new RequestParameters(requestReference, "select 1", null, null, null,
                 null, null, "1", null, null, null, true);
-        ClientRequest request = new ClientRequest(requestParameters);
+        ClientRequest request = new ClientRequest(requestParameters, appCtx);
         request.setJobId(new JobId(1));
         request.markCancellable();
         tracker.track(request);
@@ -89,7 +96,7 @@
         final RequestReference requestReference2 = RequestReference.of("2", "node1", System.currentTimeMillis());
         requestParameters = new RequestParameters(requestReference2, "select 1", null, null, null, null, null, "2",
                 null, null, null, true);
-        ClientRequest request2 = new ClientRequest(requestParameters);
+        ClientRequest request2 = new ClientRequest(requestParameters, appCtx);
         request2.setJobId(new JobId(2));
         request2.markCancellable();
         tracker.track(request2);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index d4ba5ca..57bff8f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.api;
 
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
@@ -36,10 +37,12 @@
      * Generates a {@link IClientRequest} based on the requests parameters
      *
      * @param requestParameters
+     * @param appCtx
      * @return the client request
      * @throws HyracksDataException
      */
-    IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException;
+    IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx)
+            throws HyracksDataException;
 
     /**
      * Ensures a client's request can be executed before its job is started
@@ -56,6 +59,6 @@
      * @param metadataProvider
      * @throws HyracksDataException
      */
-    void ensureAuthorized(ICommonRequestParameters requestParameters, IMetadataProvider metadataProvider)
+    void ensureAuthorized(ICommonRequestParameters requestParameters, IMetadataProvider<?, ?> metadataProvider)
             throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index d865b4f..da4d12d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -82,13 +82,13 @@
 
     private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap;
 
-    private long createTime;
+    private final long createTime;
 
-    private long startTime;
+    private volatile long startTime;
 
     private String startTimeZoneId;
 
-    private long endTime;
+    private volatile long endTime;
 
     private JobStatus status;
 
@@ -98,7 +98,7 @@
 
     private List<Exception> pendingExceptions;
 
-    private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
+    private final Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
 
     private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, JobSpecification spec,
             ActivityClusterGraph acg) {
@@ -222,6 +222,10 @@
         this.profile.setEndTime(endTime);
     }
 
+    public long getQueueWaitTimeInMillis() {
+        return startTime > 0 ? startTime - createTime : System.currentTimeMillis() - createTime;
+    }
+
     public void registerOperatorLocation(OperatorDescriptorId op, int partition, String location) {
         operatorLocations.computeIfAbsent(op, k -> new HashMap<>()).put(partition, location);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
index 40bc1ba..2031808 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
@@ -28,7 +28,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.TaskId;
@@ -92,10 +91,6 @@
         this.endTime = endTime;
     }
 
-    public long getQueueWaitTimeInNanos() {
-        return TimeUnit.MILLISECONDS.toNanos(startTime - createTime);
-    }
-
     @Override
     public ObjectNode toJSON() {
         ObjectMapper om = new ObjectMapper();