[ASTERIXDB-3343][API] Add redact param to redact active/completed requests
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add "redact" param to redact active/completed requests.
- Make sure a completed request info is updated even if
the request was cancelled.
Change-Id: Ied406f4d803c5ca717e7ed6280550a3e96142fe4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18283
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index 25fcbf5..0fafc47 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -94,6 +94,15 @@
@Override
public ObjectNode asJson() {
+ return putJson();
+ }
+
+ @Override
+ public ObjectNode asRedactedJson() {
+ return putJson();
+ }
+
+ private ObjectNode putJson() {
ObjectNode json = JSONUtil.createObject();
json.put("uuid", requestReference.getUuid());
json.put("requestTime", new ADateTime(requestReference.getTime()).toSimpleString());
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 9f2cbdd..31f1979 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -33,6 +33,7 @@
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.util.LogRedactionUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -95,11 +96,21 @@
@Override
public ObjectNode asJson() {
ObjectNode json = super.asJson();
- putJobDetails(json);
- json.put("statement", statement);
+ return asJson(json, false);
+ }
+
+ @Override
+ public ObjectNode asRedactedJson() {
+ ObjectNode json = super.asRedactedJson();
+ return asJson(json, true);
+ }
+
+ private ObjectNode asJson(ObjectNode json, boolean redact) {
+ putJobDetails(json, redact);
+ json.put("statement", redact ? LogRedactionUtil.statement(statement) : statement);
json.put("clientContextID", clientContextId);
if (plan != null) {
- json.put("plan", plan);
+ json.put("plan", redact ? LogRedactionUtil.userData(plan) : plan);
}
return json;
}
@@ -132,16 +143,16 @@
return ExceptionUtils.unwrap(e).getMessage();
}
- private void putJobDetails(ObjectNode json) {
+ private void putJobDetails(ObjectNode json, boolean redact) {
try {
json.put("jobId", jobId != null ? jobId.toString() : null);
- putJobState(json, jobState);
+ putJobState(json, jobState, redact);
} catch (Throwable th) {
// ignore
}
}
- private static void putJobState(ObjectNode json, JobState state) {
+ private static void putJobState(ObjectNode json, JobState state, boolean redact) {
AMutableDateTime dateTime = new AMutableDateTime(0);
putTime(json, state.createTime, "jobCreateTime", dateTime);
putTime(json, state.startTime, "jobStartTime", dateTime);
@@ -152,7 +163,7 @@
json.put("jobRequiredCPUs", state.requiredCPUs);
json.put("jobRequiredMemory", state.requiredMemoryInBytes);
if (state.errorMsg != null) {
- json.put("error", state.errorMsg);
+ json.put("error", redact ? LogRedactionUtil.userData(state.errorMsg) : state.errorMsg);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
index 285c4c8..b6c6a71 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
@@ -35,6 +35,7 @@
public abstract class AbstractRequestsServlet extends AbstractServlet {
+ public static final String REDACT_PARAM = "redact";
protected final ICcApplicationContext appCtx;
public AbstractRequestsServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, String... paths) {
@@ -46,8 +47,15 @@
protected void get(IServletRequest request, IServletResponse response) throws Exception {
ArrayNode requestsJson = JSONUtil.createArray();
Collection<IClientRequest> requests = getRequests();
- for (IClientRequest req : requests) {
- requestsJson.add(req.asJson());
+ String redact = request.getParameter(REDACT_PARAM);
+ if (Boolean.parseBoolean(redact)) {
+ for (IClientRequest req : requests) {
+ requestsJson.add(req.asRedactedJson());
+ }
+ } else {
+ for (IClientRequest req : requests) {
+ requestsJson.add(req.asJson());
+ }
}
HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request);
response.setStatus(HttpResponseStatus.OK);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
index 7eee42e..07f8eea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
@@ -18,5 +18,5 @@
*/
-- param client_context_id=ensure_running_query
-- polltimeoutsecs=15
-SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM active_requests() rqst
+SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid, rqst.jobStatus FROM active_requests() rqst
WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp
new file mode 100644
index 0000000..de35939
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ -- param client_context_id=ensure_completed_query
+ -- param ignoreextrafields=true
+SELECT VALUE (SELECT VALUE r FROM completed_requests() r
+WHERE r.state="completed" AND r.clientContextID = "completed_requests_query")[0];
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
index e31fe3b..170838e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
@@ -1 +1 @@
-/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*" \}/
\ No newline at end of file
+/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*", "jobStatus": "RUNNING" \}/
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson
new file mode 100644
index 0000000..15affd1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson
@@ -0,0 +1,19 @@
+{
+ "cancellable": true,
+ "clientContextID": "completed_requests_query",
+ "elapsedTime": "R{.*}",
+ "jobCreateTime": "R{.*}",
+ "jobEndTime": "R{.*}",
+ "jobId": "R{.*}",
+ "jobQueueTime": "R{.*}",
+ "jobRequiredCPUs": "R{.*}",
+ "jobRequiredMemory": "R{.*}",
+ "jobStartTime": "R{.*}",
+ "jobStatus": "TERMINATED",
+ "node": "R{.*}",
+ "remoteAddr": "R{.*}",
+ "requestTime": "R{.*}",
+ "state": "completed",
+ "userAgent": "R{.*}",
+ "uuid": "R{.*}"
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 8d2c91a..ee518dd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -104,6 +104,11 @@
ObjectNode asJson();
/**
+ * @return A redacted json node representation of this request
+ */
+ ObjectNode asRedactedJson();
+
+ /**
* Called when the job is created.
*
* @param jobId the job id
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index 5771201..9875651 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -18,9 +18,9 @@
*/
package org.apache.asterix.runtime.utils;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -30,7 +30,6 @@
import org.apache.asterix.common.api.IClientRequest;
import org.apache.asterix.common.api.IRequestTracker;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
@@ -42,14 +41,22 @@
private final Map<String, IClientRequest> runningRequests = new ConcurrentHashMap<>();
private final Map<String, IClientRequest> clientIdRequests = new ConcurrentHashMap<>();
- private final CircularFifoQueue<IClientRequest> completedRequests;
+ private final Map<String, IClientRequest> completedRequests;
private final ICcApplicationContext ccAppCtx;
private final AtomicLong numRequests;
private final AtomicLong numOfFailedRequests;
public RequestTracker(ICcApplicationContext ccAppCtx) {
this.ccAppCtx = ccAppCtx;
- completedRequests = new CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize());
+ int archiveSize = ccAppCtx.getExternalProperties().getRequestsArchiveSize();
+ completedRequests = new LinkedHashMap<>(archiveSize) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<String, IClientRequest> eldest) {
+ return size() > archiveSize;
+ }
+ };
numRequests = new AtomicLong(0);
numOfFailedRequests = new AtomicLong(0);
}
@@ -102,7 +109,7 @@
@Override
public synchronized Collection<IClientRequest> getCompletedRequests() {
- return Collections.unmodifiableCollection(new ArrayList<>(completedRequests));
+ return Collections.unmodifiableCollection(completedRequests.values());
}
private void cancel(IClientRequest request) throws HyracksDataException {
@@ -122,7 +129,7 @@
}
private synchronized void archive(IClientRequest request) {
- completedRequests.add(request);
+ completedRequests.put(request.getId(), request);
}
public long getTotalNumberOfRequests() {
@@ -139,13 +146,14 @@
return numOfFailedRequests.get();
}
+ @Override
public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
throws HyracksException {
String requestId = spec.getRequestId();
if (requestId != null) {
- IClientRequest clientRequest = runningRequests.get(requestId);
- if (clientRequest != null) {
- clientRequest.jobCreated(jobId, spec.getRequiredClusterCapacity(), status);
+ IClientRequest request = getRequest(requestId);
+ if (request != null) {
+ request.jobCreated(jobId, spec.getRequiredClusterCapacity(), status);
}
}
}
@@ -154,9 +162,9 @@
public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
String requestId = spec.getRequestId();
if (requestId != null) {
- IClientRequest clientRequest = runningRequests.get(requestId);
- if (clientRequest != null) {
- clientRequest.jobStarted(jobId);
+ IClientRequest request = getRequest(requestId);
+ if (request != null) {
+ request.jobStarted(jobId);
}
}
}
@@ -166,10 +174,20 @@
throws HyracksException {
String requestId = spec.getRequestId();
if (requestId != null) {
- IClientRequest clientRequest = runningRequests.get(requestId);
- if (clientRequest != null) {
- clientRequest.jobFinished(jobId, jobStatus, exceptions);
+ IClientRequest request = getRequest(requestId);
+ if (request != null) {
+ request.jobFinished(jobId, jobStatus, exceptions);
}
}
}
+
+ private IClientRequest getRequest(String requestId) {
+ IClientRequest clientRequest = runningRequests.get(requestId);
+ if (clientRequest != null) {
+ return clientRequest;
+ }
+ synchronized (this) {
+ return completedRequests.get(requestId);
+ }
+ }
}