[ASTERIXDB-3343][API] Add servlet to get acitve/completed requests
- user model changes: no
- storage format changes: no
- interface changes: yes
Backports from:
[ASTERIXDB-3343][API] Add servlet to get completed requests
(cherry picked from commit 88c25279458badf088ae36ecef2bf50a66d9638c)
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18109
[ASTERIXDB-3343][API] Include job details in active/completed requests
(cherry picked from commit b767d093235ee01b2b39d98f64592f0ffd822cf4)
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18110
[ASTERIXDB-3343][API] Capture job state changes in client requests
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18279
[ASTERIXDB-3343][API] Add redact param to redact active/completed requests
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18283
[ASTERIXDB-3343][API] Return new list when getting completed requests
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18306
Ext-ref: MB-62288
Change-Id: Iaf0ad268cb0629c1314d983cb30a499d554dafd4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18495
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index 99cda09..0fafc47 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -20,7 +20,6 @@
import org.apache.asterix.common.api.IClientRequest;
import org.apache.asterix.common.api.IRequestReference;
-import org.apache.asterix.common.api.RequestReference;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.om.base.ADateTime;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -93,15 +92,25 @@
return JSONUtil.convertNodeUnchecked(asJson());
}
- protected ObjectNode asJson() {
+ @Override
+ public ObjectNode asJson() {
+ return putJson();
+ }
+
+ @Override
+ public ObjectNode asRedactedJson() {
+ return putJson();
+ }
+
+ private ObjectNode putJson() {
ObjectNode json = JSONUtil.createObject();
json.put("uuid", requestReference.getUuid());
json.put("requestTime", new ADateTime(requestReference.getTime()).toSimpleString());
json.put("elapsedTime", getElapsedTimeInSecs());
json.put("node", requestReference.getNode());
json.put("state", state.getLabel());
- json.put("userAgent", ((RequestReference) requestReference).getUserAgent());
- json.put("remoteAddr", ((RequestReference) requestReference).getRemoteAddr());
+ json.put("userAgent", requestReference.getUserAgent());
+ json.put("remoteAddr", requestReference.getRemoteAddr());
json.put("cancellable", cancellable);
return json;
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index c19bb02..31f1979 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -18,11 +18,22 @@
*/
package org.apache.asterix.translator;
+import static org.apache.hyracks.api.job.resource.IJobCapacityController.JobSubmissionStatus.QUEUE;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import org.apache.asterix.common.api.ICommonRequestParameters;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.om.base.AMutableDateTime;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.util.LogRedactionUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -32,13 +43,16 @@
protected final Thread executor;
protected final String statement;
protected final String clientContextId;
+ protected final JobState jobState;
protected volatile JobId jobId;
+ private volatile String plan; // can be null
public ClientRequest(ICommonRequestParameters requestParameters) {
super(requestParameters.getRequestReference());
this.clientContextId = requestParameters.getClientContextId();
this.statement = requestParameters.getStatement();
this.executor = Thread.currentThread();
+ this.jobState = new JobState();
}
@Override
@@ -46,6 +60,10 @@
return clientContextId;
}
+ public void setPlan(String plan) {
+ this.plan = plan;
+ }
+
public synchronized void setJobId(JobId jobId) {
this.jobId = jobId;
setRunning();
@@ -76,11 +94,93 @@
}
@Override
- protected ObjectNode asJson() {
+ public ObjectNode asJson() {
ObjectNode json = super.asJson();
- json.put("jobId", jobId != null ? jobId.toString() : null);
- json.put("statement", statement);
+ return asJson(json, false);
+ }
+
+ @Override
+ public ObjectNode asRedactedJson() {
+ ObjectNode json = super.asRedactedJson();
+ return asJson(json, true);
+ }
+
+ private ObjectNode asJson(ObjectNode json, boolean redact) {
+ putJobDetails(json, redact);
+ json.put("statement", redact ? LogRedactionUtil.statement(statement) : statement);
json.put("clientContextID", clientContextId);
+ if (plan != null) {
+ json.put("plan", redact ? LogRedactionUtil.userData(plan) : plan);
+ }
return json;
}
+
+ @Override
+ public void jobCreated(JobId jobId, IReadOnlyClusterCapacity requiredClusterCapacity,
+ IJobCapacityController.JobSubmissionStatus status) {
+ jobState.createTime = System.currentTimeMillis();
+ jobState.status = status == QUEUE ? JobStatus.PENDING : JobStatus.RUNNING;
+ jobState.requiredCPUs = requiredClusterCapacity.getAggregatedCores();
+ jobState.requiredMemoryInBytes = requiredClusterCapacity.getAggregatedMemoryByteSize();
+ }
+
+ @Override
+ public void jobStarted(JobId jobId) {
+ jobState.startTime = System.currentTimeMillis();
+ jobState.status = JobStatus.RUNNING;
+ }
+
+ @Override
+ public void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) {
+ jobState.endTime = System.currentTimeMillis();
+ jobState.status = jobStatus;
+ if (exceptions != null && !exceptions.isEmpty()) {
+ jobState.errorMsg = processException(exceptions.get(0));
+ }
+ }
+
+ protected String processException(Exception e) {
+ return ExceptionUtils.unwrap(e).getMessage();
+ }
+
+ private void putJobDetails(ObjectNode json, boolean redact) {
+ try {
+ json.put("jobId", jobId != null ? jobId.toString() : null);
+ putJobState(json, jobState, redact);
+ } catch (Throwable th) {
+ // ignore
+ }
+ }
+
+ private static void putJobState(ObjectNode json, JobState state, boolean redact) {
+ AMutableDateTime dateTime = new AMutableDateTime(0);
+ putTime(json, state.createTime, "jobCreateTime", dateTime);
+ putTime(json, state.startTime, "jobStartTime", dateTime);
+ putTime(json, state.endTime, "jobEndTime", dateTime);
+ long queueTime = (state.startTime > 0 ? state.startTime : System.currentTimeMillis()) - state.createTime;
+ json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(queueTime));
+ json.put("jobStatus", String.valueOf(state.status));
+ json.put("jobRequiredCPUs", state.requiredCPUs);
+ json.put("jobRequiredMemory", state.requiredMemoryInBytes);
+ if (state.errorMsg != null) {
+ json.put("error", redact ? LogRedactionUtil.userData(state.errorMsg) : state.errorMsg);
+ }
+ }
+
+ private static void putTime(ObjectNode json, long time, String label, AMutableDateTime dateTime) {
+ if (time > 0) {
+ dateTime.setValue(time);
+ json.put(label, dateTime.toSimpleString());
+ }
+ }
+
+ static class JobState {
+ volatile long createTime;
+ volatile long startTime;
+ volatile long endTime;
+ volatile long requiredMemoryInBytes;
+ volatile int requiredCPUs;
+ volatile JobStatus status;
+ volatile String errorMsg;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
new file mode 100644
index 0000000..b6c6a71
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+import org.apache.hyracks.util.JSONUtil;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public abstract class AbstractRequestsServlet extends AbstractServlet {
+
+ public static final String REDACT_PARAM = "redact";
+ protected final ICcApplicationContext appCtx;
+
+ public AbstractRequestsServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, String... paths) {
+ super(ctx, paths);
+ this.appCtx = appCtx;
+ }
+
+ @Override
+ protected void get(IServletRequest request, IServletResponse response) throws Exception {
+ ArrayNode requestsJson = JSONUtil.createArray();
+ Collection<IClientRequest> requests = getRequests();
+ String redact = request.getParameter(REDACT_PARAM);
+ if (Boolean.parseBoolean(redact)) {
+ for (IClientRequest req : requests) {
+ requestsJson.add(req.asRedactedJson());
+ }
+ } else {
+ for (IClientRequest req : requests) {
+ requestsJson.add(req.asJson());
+ }
+ }
+ HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request);
+ response.setStatus(HttpResponseStatus.OK);
+ JSONUtil.writeNode(response.writer(), requestsJson);
+ response.writer().flush();
+ }
+
+ abstract Collection<IClientRequest> getRequests();
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveRequestsServlet.java
similarity index 83%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveRequestsServlet.java
index 7ba2867..7e00e9a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveRequestsServlet.java
@@ -19,6 +19,7 @@
package org.apache.asterix.api.http.server;
import java.io.IOException;
+import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import org.apache.asterix.api.http.server.QueryServiceRequestParameters.Parameter;
@@ -27,7 +28,6 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
-import org.apache.hyracks.http.server.AbstractServlet;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -35,24 +35,27 @@
import io.netty.handler.codec.http.HttpResponseStatus;
/**
- * The servlet provides a REST API for cancelling an on-going query.
+ * The servlet provides a REST API for getting the running queries or cancelling an on-going one.
*/
-public class CcQueryCancellationServlet extends AbstractServlet {
+public class ActiveRequestsServlet extends AbstractRequestsServlet {
public static final String REQUEST_UUID_PARAM_NAME = "request_id";
private static final Logger LOGGER = LogManager.getLogger();
- private final ICcApplicationContext appCtx;
- public CcQueryCancellationServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx,
- String... paths) {
- super(ctx, paths);
- this.appCtx = appCtx;
+ public ActiveRequestsServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, String... paths) {
+ super(ctx, appCtx, paths);
+ }
+
+ @Override
+ public Collection<IClientRequest> getRequests() {
+ return appCtx.getRequestTracker().getRunningRequests();
}
@Override
protected void delete(IServletRequest request, IServletResponse response) throws IOException {
String uuid = request.getParameter(REQUEST_UUID_PARAM_NAME);
String clientCtxId = request.getParameter(Parameter.CLIENT_ID.str());
+ LOGGER.debug("received cancel request, uuid={}, clientCtxId={}", uuid, clientCtxId);
if (uuid == null && clientCtxId == null) {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CompletedRequestsServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CompletedRequestsServlet.java
new file mode 100644
index 0000000..92eacbb
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CompletedRequestsServlet.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+
+public class CompletedRequestsServlet extends AbstractRequestsServlet {
+
+ public CompletedRequestsServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, String... paths) {
+ super(ctx, appCtx, paths);
+ }
+
+ @Override
+ public Collection<IClientRequest> getRequests() {
+ return appCtx.getRequestTracker().getCompletedRequests();
+ }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
index b2134dc..5dd9430 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.api.http.server;
-import static org.apache.asterix.api.http.server.CcQueryCancellationServlet.REQUEST_UUID_PARAM_NAME;
+import static org.apache.asterix.api.http.server.ActiveRequestsServlet.REQUEST_UUID_PARAM_NAME;
import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
import java.util.concurrent.ConcurrentMap;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 662884d..0f2780f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.util.SingleThreadEventProcessor;
import org.apache.hyracks.util.ExitUtil;
import org.apache.logging.log4j.Level;
@@ -89,7 +90,8 @@
// *** IJobLifecycleListener
@Override
- public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
+ public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification,
+ IJobCapacityController.JobSubmissionStatus status) throws HyracksDataException {
Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
if (!(property instanceof EntityId)) {
if (property != null) {
@@ -118,7 +120,7 @@
}
@Override
- public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+ public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
EntityId entityId = jobId2EntityId.get(jobId);
if (entityId != null) {
add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, null));
@@ -127,8 +129,8 @@
}
@Override
- public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
- throws HyracksException {
+ public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus,
+ List<Exception> exceptions) throws HyracksException {
EntityId entityId = jobId2EntityId.get(jobId);
if (entityId != null) {
LOGGER.debug("notified of ingestion job finish {}", jobId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 6b40cbf..31b903b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -3664,8 +3664,8 @@
public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery,
- ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters,
- Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) throws Exception {
+ ResultMetadata outMetadata, Stats stats, IRequestParameters reqParams, Map<String, IAObject> stmtParams,
+ IStatementRewriter stmtRewriter) throws Exception {
InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
String datasetName = stmtInsertUpsert.getDatasetName();
metadataProvider.validateDatabaseObjectName(stmtInsertUpsert.getDataverseName(), datasetName,
@@ -3702,10 +3702,11 @@
throw e;
}
};
-
+ IRequestTracker requestTracker = appCtx.getRequestTracker();
+ ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqParams.getRequestReference().getUuid());
if (stmtInsertUpsert.getReturnExpression() != null) {
deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
- requestParameters, false);
+ reqParams, false, clientRequest);
} else {
locker.lock();
try {
@@ -4702,13 +4703,13 @@
}
};
deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
- requestParameters, true);
+ requestParameters, true, clientRequest);
}
private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler,
MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery,
- ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable)
- throws Exception {
+ ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable,
+ ClientRequest clientRequest) throws Exception {
final ResultSetId resultSetId = metadataProvider.getResultSetId();
switch (resultDelivery) {
case ASYNC:
@@ -4724,7 +4725,7 @@
case IMMEDIATE:
createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
final ResultReader resultReader = new ResultReader(resultSet, id, resultSetId);
- updateJobStats(id, stats, metadataProvider.getResultSetId());
+ updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest);
responsePrinter.addResultPrinter(new ResultsPrinter(appCtx, resultReader,
metadataProvider.findOutputRecordType(), stats, sessionOutput));
responsePrinter.printResults();
@@ -4732,7 +4733,7 @@
break;
case DEFERRED:
createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
- updateJobStats(id, stats, metadataProvider.getResultSetId());
+ updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest);
responsePrinter.addResultPrinter(
new ResultHandlePrinter(sessionOutput, new ResultHandle(id, resultSetId)));
responsePrinter.printResults();
@@ -4747,7 +4748,8 @@
}
}
- private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId) throws HyracksDataException {
+ private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId, ClientRequest clientRequest)
+ throws HyracksDataException {
final ClusterControllerService controllerService =
(ClusterControllerService) appCtx.getServiceContext().getControllerService();
org.apache.asterix.translator.ResultMetadata resultMetadata =
@@ -4757,6 +4759,7 @@
if (jobFlags.contains(JobFlag.PROFILE_RUNTIME)) {
stats.setJobProfile(resultMetadata.getJobProfile());
}
+ clientRequest.setPlan(apiFramework.getExecutionPlans().getOptimizedLogicalPlan());
stats.updateTotalWarningsCount(resultMetadata.getTotalWarningsCount());
WarningUtil.mergeWarnings(resultMetadata.getWarnings(), warningCollector);
}
@@ -4836,6 +4839,7 @@
appCtx.getReceptionist().ensureSchedulable(schedulableRequest);
// ensure request not cancelled before running job
ensureNotCancelled(clientRequest);
+ jobSpec.setRequestId(clientRequest.getId());
final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Created job {} for query uuid:{}, clientContextID:{}", jobId,
@@ -5208,7 +5212,7 @@
protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException {
final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters);
- appCtx.getRequestTracker().track(clientRequest);
+ this.appCtx.getRequestTracker().track(clientRequest);
}
protected void validateStatements(IRequestParameters requestParameters)
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index bc67823..31ec44c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -37,9 +37,9 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.asterix.api.http.IQueryWebServerRegistrant;
+import org.apache.asterix.api.http.server.ActiveRequestsServlet;
import org.apache.asterix.api.http.server.ActiveStatsApiServlet;
import org.apache.asterix.api.http.server.ApiServlet;
-import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
import org.apache.asterix.api.http.server.ClusterApiServlet;
import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
import org.apache.asterix.api.http.server.ConnectorApiServlet;
@@ -171,6 +171,7 @@
ccServiceCtx.setDistributedState(proxy);
MetadataManager.initialize(proxy, metadataProperties, appCtx);
ccServiceCtx.addJobLifecycleListener(appCtx.getActiveNotificationHandler());
+ ccServiceCtx.addJobLifecycleListener(appCtx.getRequestTracker());
// create event loop groups
webManager = new WebManager();
@@ -315,7 +316,7 @@
ConcurrentMap<String, Object> ctx = server.ctx();
switch (key) {
case Servlets.RUNNING_REQUESTS:
- return new CcQueryCancellationServlet(ctx, appCtx, paths);
+ return new ActiveRequestsServlet(ctx, appCtx, paths);
case Servlets.QUERY_STATUS:
return new QueryStatusApiServlet(ctx, appCtx, paths);
case Servlets.QUERY_RESULT:
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index 68fb9a8..673611d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -27,7 +27,7 @@
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
+import org.apache.asterix.api.http.server.ActiveRequestsServlet;
import org.apache.asterix.api.http.server.ServletConstants;
import org.apache.asterix.app.translator.RequestParameters;
import org.apache.asterix.common.api.RequestReference;
@@ -35,8 +35,10 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.runtime.utils.RequestTracker;
import org.apache.asterix.translator.ClientRequest;
+import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.junit.Test;
@@ -57,8 +59,8 @@
RequestTracker tracker = new RequestTracker(appCtx);
Mockito.when(appCtx.getRequestTracker()).thenReturn(tracker);
// Creates a query cancellation servlet.
- CcQueryCancellationServlet cancellationServlet =
- new CcQueryCancellationServlet(new ConcurrentHashMap<>(), appCtx, new String[] { "/" });
+ ActiveRequestsServlet cancellationServlet =
+ new ActiveRequestsServlet(new ConcurrentHashMap<>(), appCtx, new String[] { "/" });
// Adds mocked Hyracks client connection into the servlet context.
IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class);
cancellationServlet.ctx().put(ServletConstants.HYRACKS_CONNECTION_ATTR, mockHcc);
@@ -66,6 +68,11 @@
// Tests the case that query is not in the map.
IServletRequest mockRequest = mockRequest("1");
IServletResponse mockResponse = mock(IServletResponse.class);
+ ICCServiceContext mockCCServiceCtx = mock(ICCServiceContext.class);
+ ClusterControllerService mockCCService = mock(ClusterControllerService.class);
+ Mockito.when(appCtx.getServiceContext()).thenReturn(mockCCServiceCtx);
+ Mockito.when(appCtx.getServiceContext().getControllerService()).thenReturn(mockCCService);
+ Mockito.when(mockCCServiceCtx.getControllerService()).thenReturn(mockCCService);
cancellationServlet.handle(mockRequest, mockResponse);
verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index cb123bf..c9d862c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -53,6 +53,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -135,8 +136,9 @@
TestUserActor user = new TestUserActor("Xikui", mdProvider, null);
Action start = user.startActivity(eventsListener);
startingSubscriber.sync();
- activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec);
- activeJobNotificationHandler.notifyJobStart(jobId);
+ activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec,
+ IJobCapacityController.JobSubmissionStatus.EXECUTE);
+ activeJobNotificationHandler.notifyJobStart(jobId, jobSpec);
try {
eventsListener.refreshStats(1000);
} catch (HyracksDataException e) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
index 52d4225..883a0cb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.mockito.Mockito;
public class TestClusterControllerActor extends Actor {
@@ -49,8 +50,8 @@
JobSpecification jobSpecification = Mockito.mock(JobSpecification.class);
Mockito.when(jobSpecification.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME))
.thenReturn(entityId);
- handler.notifyJobCreation(jobId, jobSpecification);
- handler.notifyJobStart(jobId);
+ handler.notifyJobCreation(jobId, jobSpecification, IJobCapacityController.JobSubmissionStatus.EXECUTE);
+ handler.notifyJobStart(jobId, null);
}
};
add(startJob);
@@ -72,7 +73,7 @@
Action delivery = new Action() {
@Override
protected void doExecute(MetadataProvider actorMdProvider) throws Exception {
- handler.notifyJobFinish(jobId, jobStatus, exceptions);
+ handler.notifyJobFinish(jobId, null, jobStatus, exceptions);
}
};
add(delivery);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
index 7eee42e..07f8eea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
@@ -18,5 +18,5 @@
*/
-- param client_context_id=ensure_running_query
-- polltimeoutsecs=15
-SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM active_requests() rqst
+SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid, rqst.jobStatus FROM active_requests() rqst
WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp
new file mode 100644
index 0000000..de35939
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ -- param client_context_id=ensure_completed_query
+ -- param ignoreextrafields=true
+SELECT VALUE (SELECT VALUE r FROM completed_requests() r
+WHERE r.state="completed" AND r.clientContextID = "completed_requests_query")[0];
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
index e31fe3b..170838e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
@@ -1 +1 @@
-/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*" \}/
\ No newline at end of file
+/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*", "jobStatus": "RUNNING" \}/
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson
new file mode 100644
index 0000000..15affd1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson
@@ -0,0 +1,19 @@
+{
+ "cancellable": true,
+ "clientContextID": "completed_requests_query",
+ "elapsedTime": "R{.*}",
+ "jobCreateTime": "R{.*}",
+ "jobEndTime": "R{.*}",
+ "jobId": "R{.*}",
+ "jobQueueTime": "R{.*}",
+ "jobRequiredCPUs": "R{.*}",
+ "jobRequiredMemory": "R{.*}",
+ "jobStartTime": "R{.*}",
+ "jobStatus": "TERMINATED",
+ "node": "R{.*}",
+ "remoteAddr": "R{.*}",
+ "requestTime": "R{.*}",
+ "state": "completed",
+ "userAgent": "R{.*}",
+ "uuid": "R{.*}"
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 921fb64..ee518dd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -18,8 +18,16 @@
*/
package org.apache.asterix.common.api;
+import java.util.List;
+
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
public interface IClientRequest {
@@ -86,7 +94,44 @@
void cancel(ICcApplicationContext appCtx) throws HyracksDataException;
/**
- * @return A json representation of this request
+ * @return A json string representation of this request
*/
String toJson();
+
+ /**
+ * @return A json node representation of this request
+ */
+ ObjectNode asJson();
+
+ /**
+ * @return A redacted json node representation of this request
+ */
+ ObjectNode asRedactedJson();
+
+ /**
+ * Called when the job is created.
+ *
+ * @param jobId the job id
+ * @param requiredClusterCapacity the required resources by the job
+ * @param status the status of the job; whether it will be executed or queued
+ */
+ void jobCreated(JobId jobId, IReadOnlyClusterCapacity requiredClusterCapacity,
+ IJobCapacityController.JobSubmissionStatus status);
+
+ /**
+ * Called when the job starts running.
+ *
+ * @param jobId the job id
+ */
+ void jobStarted(JobId jobId);
+
+ /**
+ * Called when the job finishes.
+ *
+ * @param jobId the job id
+ * @param jobStatus the final job status
+ * @param exceptions exceptions encountered if any
+ */
+ void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> exceptions);
+
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
index 95ed22e..153eea9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -34,9 +34,9 @@
/**
* Generates a {@link IClientRequest} based on the requests parameters
*
- * @param requestParameters
+ * @param requestParameters the request parameters
* @return the client request
- * @throws HyracksDataException
+ * @throws HyracksDataException HyracksDataException
*/
IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
index 0019015..5a99f09 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
@@ -21,8 +21,9 @@
import java.util.Collection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
-public interface IRequestTracker {
+public interface IRequestTracker extends IJobLifecycleListener {
/**
* Starts tracking {@code request}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
index a2ebd9a..02f20f8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.util.annotations.ThreadSafe;
@ThreadSafe
@@ -44,17 +45,19 @@
private final Map<String, Set<JobId>> nodeJobs = new HashMap<>();
@Override
- public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) {
+ public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
+ IJobCapacityController.JobSubmissionStatus status) {
getJobParticipatingNodes(spec).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId));
}
@Override
- public synchronized void notifyJobStart(JobId jobId) {
+ public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) {
// nothing to do
}
@Override
- public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) {
+ public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus,
+ List<Exception> exceptions) {
nodeJobs.values().forEach(jobsSet -> jobsSet.remove(jobId));
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index c9425c6..a754eb6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -21,6 +21,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -29,20 +31,32 @@
import org.apache.asterix.common.api.IClientRequest;
import org.apache.asterix.common.api.IRequestTracker;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
public class RequestTracker implements IRequestTracker {
private final Map<String, IClientRequest> runningRequests = new ConcurrentHashMap<>();
private final Map<String, IClientRequest> clientIdRequests = new ConcurrentHashMap<>();
- private final CircularFifoQueue<IClientRequest> completedRequests;
+ private final Map<String, IClientRequest> completedRequests;
private final ICcApplicationContext ccAppCtx;
private final AtomicLong numRequests;
public RequestTracker(ICcApplicationContext ccAppCtx) {
this.ccAppCtx = ccAppCtx;
- completedRequests = new CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize());
+ int archiveSize = ccAppCtx.getExternalProperties().getRequestsArchiveSize();
+ completedRequests = new LinkedHashMap<>(archiveSize) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<String, IClientRequest> eldest) {
+ return size() > archiveSize;
+ }
+ };
numRequests = new AtomicLong(0);
}
@@ -94,7 +108,7 @@
@Override
public synchronized Collection<IClientRequest> getCompletedRequests() {
- return Collections.unmodifiableCollection(new ArrayList<>(completedRequests));
+ return Collections.unmodifiableCollection(new ArrayList<>(completedRequests.values()));
}
private void cancel(IClientRequest request) throws HyracksDataException {
@@ -114,10 +128,55 @@
}
private synchronized void archive(IClientRequest request) {
- completedRequests.add(request);
+ completedRequests.put(request.getId(), request);
}
public long getTotalNumberOfRequests() {
return numRequests.get();
}
+
+ @Override
+ public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+ throws HyracksException {
+ String requestId = spec.getRequestId();
+ if (requestId != null) {
+ IClientRequest request = getRequest(requestId);
+ if (request != null) {
+ request.jobCreated(jobId, spec.getRequiredClusterCapacity(), status);
+ }
+ }
+ }
+
+ @Override
+ public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
+ String requestId = spec.getRequestId();
+ if (requestId != null) {
+ IClientRequest request = getRequest(requestId);
+ if (request != null) {
+ request.jobStarted(jobId);
+ }
+ }
+ }
+
+ @Override
+ public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+ throws HyracksException {
+ String requestId = spec.getRequestId();
+ if (requestId != null) {
+ IClientRequest request = getRequest(requestId);
+ if (request != null) {
+ request.jobFinished(jobId, jobStatus, exceptions);
+ }
+ }
+ }
+
+ private IClientRequest getRequest(String requestId) {
+ IClientRequest clientRequest = runningRequests.get(requestId);
+ if (clientRequest != null) {
+ return clientRequest;
+ }
+ synchronized (this) {
+ return completedRequests.get(requestId);
+ }
+ }
}
diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
index be75ecb..82d8cb7 100644
--- a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
+++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -50,18 +51,18 @@
jobSpec.getUserConstraints().add(new Constraint(lValueMock, unknownLocation));
JobId jobId = new JobId(1);
- nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+ nodeJobTracker.notifyJobCreation(jobId, jobSpec, IJobCapacityController.JobSubmissionStatus.EXECUTE);
// make sure nc1 has a pending job
Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
Assert.assertTrue(nodeJobTracker.getPendingJobs(unknown).isEmpty());
Assert.assertTrue(nodeJobTracker.getPendingJobs(nc2).isEmpty());
- nodeJobTracker.notifyJobFinish(jobId, JobStatus.TERMINATED, null);
+ nodeJobTracker.notifyJobFinish(jobId, jobSpec, JobStatus.TERMINATED, null);
// make sure nc1 doesn't have pending jobs anymore
Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
// make sure node doesn't have pending jobs after failure
jobId = new JobId(2);
- nodeJobTracker.notifyJobCreation(jobId, jobSpec);
+ nodeJobTracker.notifyJobCreation(jobId, jobSpec, IJobCapacityController.JobSubmissionStatus.EXECUTE);
Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1);
nodeJobTracker.notifyNodeFailure(Collections.singleton(nc1));
Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty());
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
index 338c331..6773dde 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
/**
* A listener for job related events
@@ -29,27 +30,33 @@
/**
* Notify the listener that a job has been created
*
- * @param jobId
- * @param spec
- * @throws HyracksException
+ * @param jobId the job id
+ * @param spec the job specification
+ * @param status the status of the job; whether it will be executed or queued
+ * @throws HyracksException HyracksException
*/
- void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException;
+ void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+ throws HyracksException;
/**
* Notify the listener that the job has started on the cluster controller
*
- * @param jobId
- * @throws HyracksException
+ * @param jobId the job id
+ * @param spec the job specification
+ *
+ * @throws HyracksException HyracksException
*/
- void notifyJobStart(JobId jobId) throws HyracksException;
+ void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException;
/**
* Notify the listener that the job has been terminated, passing exceptions in case of failure
*
- * @param jobId
- * @param jobStatus
- * @param exceptions
- * @throws HyracksException
+ * @param jobId the job id
+ * @param spec the job specification
+ * @param jobStatus the job status
+ * @param exceptions the job exceptions
+ * @throws HyracksException HyracksException
*/
- void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException;
+ void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+ throws HyracksException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 58336a0..6d0ce6b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -79,6 +79,8 @@
private long maxWarnings;
+ private String requestId;
+
private IJobletEventListenerFactory jobletEventListenerFactory;
private IGlobalJobDataFactory globalJobDataFactory;
@@ -254,6 +256,14 @@
this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
}
+ public void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
+ public String getRequestId() {
+ return requestId;
+ }
+
public void setFrameSize(int frameSize) {
this.frameSize = frameSize;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
index de166dd..555d37e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.application.ServiceContext;
@@ -79,22 +80,23 @@
jobLifecycleListeners.add(jobLifecycleListener);
}
- public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+ public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
- l.notifyJobStart(jobId);
+ l.notifyJobStart(jobId, spec);
}
}
- public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
- throws HyracksException {
+ public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus,
+ List<Exception> exceptions) throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
- l.notifyJobFinish(jobId, jobStatus, exceptions);
+ l.notifyJobFinish(jobId, spec, jobStatus, exceptions);
}
}
- public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+ public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
+ IJobCapacityController.JobSubmissionStatus status) throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
- l.notifyJobCreation(jobId, spec);
+ l.notifyJobCreation(jobId, spec, status);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index da840af..567d20c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -115,7 +115,7 @@
public void startJob() throws HyracksException {
startRunnableActivityClusters();
- ccs.getContext().notifyJobStart(jobRun.getJobId());
+ ccs.getContext().notifyJobStart(jobRun.getJobId(), jobRun.getJobSpecification());
}
public void cancelJob(IResultCallback<Void> callback) throws HyracksException {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index ad97188..d803c88 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -112,7 +112,7 @@
JobSpecification job = jobRun.getJobSpecification();
IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job);
CCServiceContext serviceCtx = ccs.getContext();
- serviceCtx.notifyJobCreation(jobRun.getJobId(), job);
+ serviceCtx.notifyJobCreation(jobRun.getJobId(), job, status);
switch (status) {
case QUEUE:
queueJob(jobRun);
@@ -148,7 +148,8 @@
CCServiceContext serviceCtx = ccs.getContext();
if (serviceCtx != null) {
try {
- serviceCtx.notifyJobFinish(jobId, JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
+ serviceCtx.notifyJobFinish(jobId, jobRun.getJobSpecification(), JobStatus.FAILURE_BEFORE_EXECUTION,
+ exceptions);
} catch (Exception e) {
LOGGER.error("Exception notifying cancel on pending job {}", jobId, e);
throw HyracksDataException.create(e);
@@ -224,7 +225,8 @@
Throwable caughtException = null;
CCServiceContext serviceCtx = ccs.getContext();
try {
- serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), run.getPendingExceptions());
+ serviceCtx.notifyJobFinish(jobId, run.getJobSpecification(), run.getPendingStatus(),
+ run.getPendingExceptions());
} catch (Exception e) {
LOGGER.error("Exception notifying job finish {}", jobId, e);
caughtException = e;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
index 0cc09b4..06e955a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java
@@ -82,13 +82,13 @@
private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap;
- private long createTime;
+ private final long createTime;
- private long startTime;
+ private volatile long startTime;
private String startTimeZoneId;
- private long endTime;
+ private volatile long endTime;
private JobStatus status;
@@ -98,7 +98,7 @@
private List<Exception> pendingExceptions;
- private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
+ private final Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations;
private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, JobSpecification spec,
ActivityClusterGraph acg) {
@@ -218,6 +218,10 @@
this.endTime = endTime;
}
+ public long getQueueWaitTimeInMillis() {
+ return startTime > 0 ? startTime - createTime : System.currentTimeMillis() - createTime;
+ }
+
public void registerOperatorLocation(OperatorDescriptorId op, int partition, String location) {
operatorLocations.computeIfAbsent(op, k -> new HashMap<>()).put(partition, location);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 80d7630..b6274d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -34,6 +34,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.result.IJobResultCallback;
import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.IResultStateRecord;
@@ -77,7 +78,8 @@
}
@Override
- public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+ public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec,
+ IJobCapacityController.JobSubmissionStatus status) throws HyracksException {
if (jobResultLocations.get(jobId) != null) {
throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, jobId);
}
@@ -85,12 +87,13 @@
}
@Override
- public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
+ public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
jobResultLocations.get(jobId).getRecord().start();
}
@Override
- public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
+ public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+ throws HyracksException {
if (exceptions == null || exceptions.isEmpty()) {
final ResultJobRecord resultJobRecord = getResultJobRecord(jobId);
if (resultJobRecord == null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
index 008be29..19fdcfe 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -47,7 +48,8 @@
private final Set<JobId> finishWithoutStart = new HashSet<>();
@Override
- public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException {
+ public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
+ throws HyracksException {
if (created.containsKey(jobId)) {
LOGGER.log(Level.WARN, "Job " + jobId + "has been created before");
increment(doubleCreated, jobId);
@@ -62,7 +64,7 @@
}
@Override
- public void notifyJobStart(JobId jobId) throws HyracksException {
+ public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
if (!created.containsKey(jobId)) {
LOGGER.log(Level.WARN, "Job " + jobId + "has not been created");
startWithoutCreate.add(jobId);
@@ -75,7 +77,8 @@
}
@Override
- public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException {
+ public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions)
+ throws HyracksException {
if (!started.contains(jobId)) {
LOGGER.log(Level.WARN, "Job " + jobId + "has not been started");
finishWithoutStart.add(jobId);