[ASTERIXDB-3343][API] Include job details in active/completed requests
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Include job status, create time, start time, queue time, and end time.
Change-Id: Id1bdc935be8bf84674aa6e35ee6c19f2ee1f7971
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18148
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
(cherry picked from commit b767d093235ee01b2b39d98f64592f0ffd822cf4)
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18110
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index db633e6..25fcbf5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -20,7 +20,6 @@
import org.apache.asterix.common.api.IClientRequest;
import org.apache.asterix.common.api.IRequestReference;
-import org.apache.asterix.common.api.RequestReference;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.om.base.ADateTime;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -101,8 +100,8 @@
json.put("elapsedTime", getElapsedTimeInSecs());
json.put("node", requestReference.getNode());
json.put("state", state.getLabel());
- json.put("userAgent", ((RequestReference) requestReference).getUserAgent());
- json.put("remoteAddr", ((RequestReference) requestReference).getRemoteAddr());
+ json.put("userAgent", requestReference.getUserAgent());
+ json.put("remoteAddr", requestReference.getRemoteAddr());
json.put("cancellable", cancellable);
return json;
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 4f19366..28549f3 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -18,11 +18,17 @@
*/
package org.apache.asterix.translator;
+import java.util.concurrent.TimeUnit;
+
import org.apache.asterix.common.api.ICommonRequestParameters;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.om.base.AMutableDateTime;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobRun;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -31,14 +37,17 @@
protected final long creationTime = System.nanoTime();
protected final Thread executor;
protected final String statement;
+ protected final ClusterControllerService ccService;
protected final String clientContextId;
protected volatile JobId jobId;
+ private String plan; // can be null
- public ClientRequest(ICommonRequestParameters requestParameters) {
+ public ClientRequest(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx) {
super(requestParameters.getRequestReference());
this.clientContextId = requestParameters.getClientContextId();
this.statement = requestParameters.getStatement();
this.executor = Thread.currentThread();
+ this.ccService = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
}
@Override
@@ -46,6 +55,10 @@
return clientContextId;
}
+ public void setPlan(String plan) {
+ this.plan = plan;
+ }
+
public synchronized void setJobId(JobId jobId) {
this.jobId = jobId;
setRunning();
@@ -78,9 +91,53 @@
@Override
public ObjectNode asJson() {
ObjectNode json = super.asJson();
- json.put("jobId", jobId != null ? jobId.toString() : null);
+ putJobDetails(json);
json.put("statement", statement);
json.put("clientContextID", clientContextId);
+ if (plan != null) {
+ json.put("plan", plan);
+ }
return json;
}
+
+ private void putJobDetails(ObjectNode json) {
+ if (jobId == null) {
+ json.putNull("jobId");
+ } else {
+ try {
+ json.put("jobId", jobId.toString());
+ JobRun jobRun = ccService.getJobManager().get(jobId);
+ if (jobRun != null) {
+ json.put("jobStatus", String.valueOf(jobRun.getStatus()));
+ putJobRequiredResources(json, jobRun);
+ putTimes(json, jobRun);
+ }
+ } catch (Throwable th) {
+ // ignore
+ }
+ }
+ }
+
+ private static void putTimes(ObjectNode json, JobRun jobRun) {
+ AMutableDateTime dateTime = new AMutableDateTime(0);
+ putTime(json, jobRun.getCreateTime(), "jobCreateTime", dateTime);
+ putTime(json, jobRun.getStartTime(), "jobStartTime", dateTime);
+ putTime(json, jobRun.getEndTime(), "jobEndTime", dateTime);
+ json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(jobRun.getQueueWaitTimeInMillis()));
+ }
+
+ private static void putJobRequiredResources(ObjectNode json, JobRun jobRun) {
+ IClusterCapacity jobCapacity = jobRun.getJobSpecification().getRequiredClusterCapacity();
+ if (jobCapacity != null) {
+ json.put("jobRequiredCPUs", jobCapacity.getAggregatedCores());
+ json.put("jobRequiredMemory", jobCapacity.getAggregatedMemoryByteSize());
+ }
+ }
+
+ private static void putTime(ObjectNode json, long time, String label, AMutableDateTime dateTime) {
+ if (time > 0) {
+ dateTime.setValue(time);
+ json.put(label, dateTime.toSimpleString());
+ }
+ }
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index 84bee6a..6893d5c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.api.IRequestReference;
import org.apache.asterix.common.api.ISchedulableClientRequest;
import org.apache.asterix.common.api.RequestReference;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.http.HttpHeaders;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.http.api.IServletRequest;
@@ -46,8 +47,9 @@
}
@Override
- public IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException {
- return new ClientRequest(requestParameters);
+ public IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx)
+ throws HyracksDataException {
+ return new ClientRequest(requestParameters, appCtx);
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
index 7ac1431..fa1baa7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -22,6 +22,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.translator.ResultMetadata;
@@ -97,7 +98,7 @@
}
}
}
- metadata.setQueueWaitTimeInNanos(run.getJobProfile().getQueueWaitTimeInNanos());
+ metadata.setQueueWaitTimeInNanos(TimeUnit.MILLISECONDS.toNanos(run.getQueueWaitTimeInMillis()));
}
metadata.setProcessedObjects(processedObjects);
metadata.setBufferCacheHitRatio(pagesRead > 0 ? (pagesRead - nonPagedReads) / (double) pagesRead : Double.NaN);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 0db8dff..1110cd3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -3682,8 +3682,8 @@
public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery,
- ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters,
- Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) throws Exception {
+ ResultMetadata outMetadata, Stats stats, IRequestParameters reqParams, Map<String, IAObject> stmtParams,
+ IStatementRewriter stmtRewriter) throws Exception {
InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
String datasetName = stmtInsertUpsert.getDatasetName();
metadataProvider.validateDatabaseObjectName(stmtInsertUpsert.getDataverseName(), datasetName,
@@ -3720,10 +3720,11 @@
throw e;
}
};
-
+ IRequestTracker requestTracker = appCtx.getRequestTracker();
+ ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqParams.getRequestReference().getUuid());
if (stmtInsertUpsert.getReturnExpression() != null) {
deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
- requestParameters, false);
+ reqParams, false, clientRequest);
} else {
locker.lock();
try {
@@ -4726,13 +4727,13 @@
}
};
deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
- requestParameters, true);
+ requestParameters, true, clientRequest);
}
private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler,
MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery,
- ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable)
- throws Exception {
+ ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable,
+ ClientRequest clientRequest) throws Exception {
final ResultSetId resultSetId = metadataProvider.getResultSetId();
switch (resultDelivery) {
case ASYNC:
@@ -4748,7 +4749,7 @@
case IMMEDIATE:
createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
final ResultReader resultReader = new ResultReader(resultSet, id, resultSetId);
- updateJobStats(id, stats, metadataProvider.getResultSetId());
+ updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest);
responsePrinter.addResultPrinter(new ResultsPrinter(appCtx, resultReader,
metadataProvider.findOutputRecordType(), stats, sessionOutput));
responsePrinter.printResults();
@@ -4756,7 +4757,7 @@
break;
case DEFERRED:
createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
- updateJobStats(id, stats, metadataProvider.getResultSetId());
+ updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest);
responsePrinter.addResultPrinter(
new ResultHandlePrinter(sessionOutput, new ResultHandle(id, resultSetId)));
responsePrinter.printResults();
@@ -4771,7 +4772,8 @@
}
}
- private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId) throws HyracksDataException {
+ private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId, ClientRequest clientRequest)
+ throws HyracksDataException {
final ClusterControllerService controllerService =
(ClusterControllerService) appCtx.getServiceContext().getControllerService();
org.apache.asterix.translator.ResultMetadata resultMetadata =
@@ -4784,6 +4786,7 @@
stats.setJobProfile(resultMetadata.getJobProfile());
apiFramework.generateOptimizedLogicalPlanWithProfile(resultMetadata.getJobProfile());
}
+ clientRequest.setPlan(apiFramework.getExecutionPlans().getOptimizedLogicalPlan());
stats.updateTotalWarningsCount(resultMetadata.getTotalWarningsCount());
WarningUtil.mergeWarnings(resultMetadata.getWarnings(), warningCollector);
}
@@ -5234,8 +5237,8 @@
}
protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException {
- final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters);
- appCtx.getRequestTracker().track(clientRequest);
+ final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters, appCtx);
+ this.appCtx.getRequestTracker().track(clientRequest);
}
protected void validateStatements(IRequestParameters requestParameters)
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index b38c366..0e063ca 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -35,8 +35,10 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.runtime.utils.RequestTracker;
import org.apache.asterix.translator.ClientRequest;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.junit.Test;
@@ -66,13 +68,18 @@
// Tests the case that query is not in the map.
IServletRequest mockRequest = mockRequest("1");
IServletResponse mockResponse = mock(IServletResponse.class);
+ ICCServiceContext mockCCServiceCtx = mock(ICCServiceContext.class);
+ ClusterControllerService mockCCService = mock(ClusterControllerService.class);
+ Mockito.when(appCtx.getServiceContext()).thenReturn(mockCCServiceCtx);
+ Mockito.when(appCtx.getServiceContext().getControllerService()).thenReturn(mockCCService);
+ Mockito.when(mockCCServiceCtx.getControllerService()).thenReturn(mockCCService);
cancellationServlet.handle(mockRequest, mockResponse);
verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND);
final RequestReference requestReference = RequestReference.of("1", "node1", System.currentTimeMillis());
RequestParameters requestParameters = new RequestParameters(requestReference, "select 1", null, null, null,
null, null, "1", null, null, null, true);
- ClientRequest request = new ClientRequest(requestParameters);
+ ClientRequest request = new ClientRequest(requestParameters, appCtx);
request.setJobId(new JobId(1));
request.markCancellable();
tracker.track(request);
@@ -89,7 +96,7 @@
final RequestReference requestReference2 = RequestReference.of("2", "node1", System.currentTimeMillis());
requestParameters = new RequestParameters(requestReference2, "select 1", null, null, null, null, null, "2",
null, null, null, true);
- ClientRequest request2 = new ClientRequest(requestParameters);
+ ClientRequest request2 = new ClientRequest(requestParameters, appCtx);
request2.setJobId(new JobId(2));
request2.markCancellable();
tracker.track(request2);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index 95ed22e..f0b4944 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.common.api;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.http.api.IServletRequest;
@@ -35,10 +36,12 @@
* Generates a {@link IClientRequest} based on the requests parameters
*
* @param requestParameters
+ * @param appCtx
* @return the client request
* @throws HyracksDataException
*/
- IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException;
+ IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx)
+ throws HyracksDataException;
/**
* Ensures a client's request can be executed before its job is started
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index d865b4f..da4d12d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -82,13 +82,13 @@
private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap;
- private long createTime;
+ private final long createTime;
- private long startTime;
+ private volatile long startTime;
private String startTimeZoneId;
- private long endTime;
+ private volatile long endTime;
private JobStatus status;
@@ -98,7 +98,7 @@
private List<Exception> pendingExceptions;
- private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
+ private final Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, JobSpecification spec,
ActivityClusterGraph acg) {
@@ -222,6 +222,10 @@
this.profile.setEndTime(endTime);
}
+ public long getQueueWaitTimeInMillis() {
+ return startTime > 0 ? startTime - createTime : System.currentTimeMillis() - createTime;
+ }
+
public void registerOperatorLocation(OperatorDescriptorId op, int partition, String location) {
operatorLocations.computeIfAbsent(op, k -> new HashMap<>()).put(partition, location);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
index 40bc1ba..2031808 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
@@ -28,7 +28,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataflow.TaskId;
@@ -92,10 +91,6 @@
this.endTime = endTime;
}
- public long getQueueWaitTimeInNanos() {
- return TimeUnit.MILLISECONDS.toNanos(startTime - createTime);
- }
-
@Override
public ObjectNode toJSON() {
ObjectMapper om = new ObjectMapper();