[ASTERIXDB-3343][API] Include job details in active/completed requests
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Include job status, create time, start time, queue time, and end time.
Change-Id: Id1bdc935be8bf84674aa6e35ee6c19f2ee1f7971
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18148
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index db633e6..0436ea8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -20,7 +20,6 @@
import org.apache.asterix.common.api.IClientRequest;
import org.apache.asterix.common.api.IRequestReference;
-import org.apache.asterix.common.api.RequestReference;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.om.base.ADateTime;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -98,11 +97,11 @@
ObjectNode json = JSONUtil.createObject();
json.put("uuid", requestReference.getUuid());
json.put("requestTime", new ADateTime(requestReference.getTime()).toSimpleString());
- json.put("elapsedTime", getElapsedTimeInSecs());
+ json.put("elapsedTime", getElapsedTimeInSecs() + "s");
json.put("node", requestReference.getNode());
json.put("state", state.getLabel());
- json.put("userAgent", ((RequestReference) requestReference).getUserAgent());
- json.put("remoteAddr", ((RequestReference) requestReference).getRemoteAddr());
+ json.put("userAgent", requestReference.getUserAgent());
+ json.put("remoteAddr", requestReference.getRemoteAddr());
json.put("cancellable", cancellable);
return json;
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 4f19366..983f99c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -18,11 +18,18 @@
*/
package org.apache.asterix.translator;
+import java.util.concurrent.TimeUnit;
+
import org.apache.asterix.common.api.ICommonRequestParameters;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.om.base.AMutableDateTime;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobRun;
+import org.apache.hyracks.util.StorageUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -31,14 +38,17 @@
protected final long creationTime = System.nanoTime();
protected final Thread executor;
protected final String statement;
+ protected final ClusterControllerService ccService;
protected final String clientContextId;
protected volatile JobId jobId;
+ private String plan; // can be null
- public ClientRequest(ICommonRequestParameters requestParameters) {
+ public ClientRequest(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx) {
super(requestParameters.getRequestReference());
this.clientContextId = requestParameters.getClientContextId();
this.statement = requestParameters.getStatement();
this.executor = Thread.currentThread();
+ this.ccService = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
}
@Override
@@ -46,6 +56,10 @@
return clientContextId;
}
+ public void setPlan(String plan) {
+ this.plan = plan;
+ }
+
public synchronized void setJobId(JobId jobId) {
this.jobId = jobId;
setRunning();
@@ -78,9 +92,53 @@
@Override
public ObjectNode asJson() {
ObjectNode json = super.asJson();
- json.put("jobId", jobId != null ? jobId.toString() : null);
+ putJobDetails(json);
json.put("statement", statement);
json.put("clientContextID", clientContextId);
+ if (plan != null) {
+ json.put("plan", plan);
+ }
return json;
}
+
+ private void putJobDetails(ObjectNode json) {
+ if (jobId == null) {
+ json.putNull("jobId");
+ } else {
+ try {
+ json.put("jobId", jobId.toString());
+ JobRun jobRun = ccService.getJobManager().get(jobId);
+ if (jobRun != null) {
+ json.put("jobStatus", String.valueOf(jobRun.getStatus()));
+ putJobRequiredResources(json, jobRun);
+ putTimes(json, jobRun);
+ }
+ } catch (Throwable th) {
+ // ignore
+ }
+ }
+ }
+
+ private static void putTimes(ObjectNode json, JobRun jobRun) {
+ AMutableDateTime dateTime = new AMutableDateTime(0);
+ putTime(json, jobRun.getCreateTime(), "jobCreateTime", dateTime);
+ putTime(json, jobRun.getStartTime(), "jobStartTime", dateTime);
+ putTime(json, jobRun.getEndTime(), "jobEndTime", dateTime);
+ json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(jobRun.getQueueWaitTimeInMillis()) + "s");
+ }
+
+ private static void putJobRequiredResources(ObjectNode json, JobRun jobRun) {
+ IClusterCapacity jobCapacity = jobRun.getJobSpecification().getRequiredClusterCapacity();
+ if (jobCapacity != null) {
+ json.put("jobRequiredCPUs", jobCapacity.getAggregatedCores());
+ json.put("jobRequiredMemory", StorageUtil.toHumanReadableSize(jobCapacity.getAggregatedMemoryByteSize()));
+ }
+ }
+
+ private static void putTime(ObjectNode json, long time, String label, AMutableDateTime dateTime) {
+ if (time > 0) {
+ dateTime.setValue(time);
+ json.put(label, dateTime.toSimpleString());
+ }
+ }
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
index ed68d8b..6ded860 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.api.IRequestReference;
import org.apache.asterix.common.api.ISchedulableClientRequest;
import org.apache.asterix.common.api.RequestReference;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.http.HttpHeaders;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -47,8 +48,9 @@
}
@Override
- public IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException {
- return new ClientRequest(requestParameters);
+ public IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx)
+ throws HyracksDataException {
+ return new ClientRequest(requestParameters, appCtx);
}
@Override
@@ -57,7 +59,7 @@
}
@Override
- public void ensureAuthorized(ICommonRequestParameters requestParameters, IMetadataProvider metadataProvider)
+ public void ensureAuthorized(ICommonRequestParameters requestParameters, IMetadataProvider<?, ?> metadataProvider)
throws HyracksDataException {
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
index a9a3aac..c419d0d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -22,6 +22,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.translator.ResultMetadata;
@@ -97,7 +98,7 @@
}
}
}
- metadata.setQueueWaitTimeInNanos(run.getJobProfile().getQueueWaitTimeInNanos());
+ metadata.setQueueWaitTimeInNanos(TimeUnit.MILLISECONDS.toNanos(run.getQueueWaitTimeInMillis()));
}
metadata.setProcessedObjects(processedObjects);
metadata.setBufferCacheHitRatio(pagesRead > 0 ? (pagesRead - nonPagedReads) / (double) pagesRead : Double.NaN);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 100e524..34648dd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4101,13 +4101,13 @@
};
deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
- requestParameters, true, null);
+ requestParameters, true, null, clientRequest);
}
public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery,
- ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters,
- Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) throws Exception {
+ ResultMetadata outMetadata, Stats stats, IRequestParameters reqParams, Map<String, IAObject> stmtParams,
+ IStatementRewriter stmtRewriter) throws Exception {
InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
String datasetName = stmtInsertUpsert.getDatasetName();
metadataProvider.validateDatabaseObjectName(stmtInsertUpsert.getNamespace(), datasetName,
@@ -4147,9 +4147,11 @@
throw e;
}
};
+ IRequestTracker requestTracker = appCtx.getRequestTracker();
+ ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqParams.getRequestReference().getUuid());
if (stmtInsertUpsert.getReturnExpression() != null) {
deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
- requestParameters, false, stmt);
+ reqParams, false, stmt, clientRequest);
} else {
locker.lock();
JobId jobId = null;
@@ -4173,9 +4175,6 @@
participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions));
}
jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
- final IRequestTracker requestTracker = appCtx.getRequestTracker();
- final ClientRequest clientRequest =
- (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
clientRequest.setJobId(jobId);
String nameBefore = Thread.currentThread().getName();
try {
@@ -5262,13 +5261,13 @@
}
};
deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
- requestParameters, true, null);
+ requestParameters, true, null, clientRequest);
}
private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler,
MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery,
ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable,
- Statement atomicStmt) throws Exception {
+ Statement atomicStmt, ClientRequest clientRequest) throws Exception {
final ResultSetId resultSetId = metadataProvider.getResultSetId();
switch (resultDelivery) {
case ASYNC:
@@ -5284,7 +5283,7 @@
case IMMEDIATE:
createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
final ResultReader resultReader = new ResultReader(resultSet, id, resultSetId);
- updateJobStats(id, stats, metadataProvider.getResultSetId());
+ updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest);
responsePrinter.addResultPrinter(new ResultsPrinter(appCtx, resultReader,
metadataProvider.findOutputRecordType(), stats, sessionOutput));
responsePrinter.printResults();
@@ -5292,7 +5291,7 @@
break;
case DEFERRED:
createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
- updateJobStats(id, stats, metadataProvider.getResultSetId());
+ updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest);
responsePrinter.addResultPrinter(
new ResultHandlePrinter(sessionOutput, new ResultHandle(id, resultSetId)));
responsePrinter.printResults();
@@ -5307,7 +5306,8 @@
}
}
- private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId) throws HyracksDataException {
+ private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId, ClientRequest clientRequest)
+ throws HyracksDataException {
final ClusterControllerService controllerService =
(ClusterControllerService) appCtx.getServiceContext().getControllerService();
org.apache.asterix.translator.ResultMetadata resultMetadata =
@@ -5321,6 +5321,7 @@
stats.setJobProfile(resultMetadata.getJobProfile());
apiFramework.generateOptimizedLogicalPlanWithProfile(resultMetadata.getJobProfile());
}
+ clientRequest.setPlan(apiFramework.getExecutionPlans().getOptimizedLogicalPlan());
stats.updateTotalWarningsCount(resultMetadata.getTotalWarningsCount());
WarningUtil.mergeWarnings(resultMetadata.getWarnings(), warningCollector);
}
@@ -5569,8 +5570,8 @@
}
protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException {
- final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters);
- appCtx.getRequestTracker().track(clientRequest);
+ final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters, appCtx);
+ this.appCtx.getRequestTracker().track(clientRequest);
}
protected void validateStatements(IRequestParameters requestParameters)
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index b38c366..0e063ca 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -35,8 +35,10 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.runtime.utils.RequestTracker;
import org.apache.asterix.translator.ClientRequest;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.junit.Test;
@@ -66,13 +68,18 @@
// Tests the case that query is not in the map.
IServletRequest mockRequest = mockRequest("1");
IServletResponse mockResponse = mock(IServletResponse.class);
+ ICCServiceContext mockCCServiceCtx = mock(ICCServiceContext.class);
+ ClusterControllerService mockCCService = mock(ClusterControllerService.class);
+ Mockito.when(appCtx.getServiceContext()).thenReturn(mockCCServiceCtx);
+ Mockito.when(appCtx.getServiceContext().getControllerService()).thenReturn(mockCCService);
+ Mockito.when(mockCCServiceCtx.getControllerService()).thenReturn(mockCCService);
cancellationServlet.handle(mockRequest, mockResponse);
verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND);
final RequestReference requestReference = RequestReference.of("1", "node1", System.currentTimeMillis());
RequestParameters requestParameters = new RequestParameters(requestReference, "select 1", null, null, null,
null, null, "1", null, null, null, true);
- ClientRequest request = new ClientRequest(requestParameters);
+ ClientRequest request = new ClientRequest(requestParameters, appCtx);
request.setJobId(new JobId(1));
request.markCancellable();
tracker.track(request);
@@ -89,7 +96,7 @@
final RequestReference requestReference2 = RequestReference.of("2", "node1", System.currentTimeMillis());
requestParameters = new RequestParameters(requestReference2, "select 1", null, null, null, null, null, "2",
null, null, null, true);
- ClientRequest request2 = new ClientRequest(requestParameters);
+ ClientRequest request2 = new ClientRequest(requestParameters, appCtx);
request2.setJobId(new JobId(2));
request2.markCancellable();
tracker.track(request2);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index d4ba5ca..57bff8f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.common.api;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.http.api.IServletRequest;
@@ -36,10 +37,12 @@
* Generates a {@link IClientRequest} based on the requests parameters
*
* @param requestParameters
+ * @param appCtx
* @return the client request
* @throws HyracksDataException
*/
- IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException;
+ IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx)
+ throws HyracksDataException;
/**
* Ensures a client's request can be executed before its job is started
@@ -56,6 +59,6 @@
* @param metadataProvider
* @throws HyracksDataException
*/
- void ensureAuthorized(ICommonRequestParameters requestParameters, IMetadataProvider metadataProvider)
+ void ensureAuthorized(ICommonRequestParameters requestParameters, IMetadataProvider<?, ?> metadataProvider)
throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index d865b4f..da4d12d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -82,13 +82,13 @@
private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap;
- private long createTime;
+ private final long createTime;
- private long startTime;
+ private volatile long startTime;
private String startTimeZoneId;
- private long endTime;
+ private volatile long endTime;
private JobStatus status;
@@ -98,7 +98,7 @@
private List<Exception> pendingExceptions;
- private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
+ private final Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, JobSpecification spec,
ActivityClusterGraph acg) {
@@ -222,6 +222,10 @@
this.profile.setEndTime(endTime);
}
+ public long getQueueWaitTimeInMillis() {
+ return startTime > 0 ? startTime - createTime : System.currentTimeMillis() - createTime;
+ }
+
public void registerOperatorLocation(OperatorDescriptorId op, int partition, String location) {
operatorLocations.computeIfAbsent(op, k -> new HashMap<>()).put(partition, location);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
index 40bc1ba..2031808 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java
@@ -28,7 +28,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataflow.TaskId;
@@ -92,10 +91,6 @@
this.endTime = endTime;
}
- public long getQueueWaitTimeInNanos() {
- return TimeUnit.MILLISECONDS.toNanos(startTime - createTime);
- }
-
@Override
public ObjectNode toJSON() {
ObjectMapper om = new ObjectMapper();