[ASTERIXDB-3343][API] Add redact param to redact active/completed requests

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Add "redact" param to redact active/completed requests.
- Make sure a completed request info is updated even if
  the request was cancelled.

Change-Id: Ied406f4d803c5ca717e7ed6280550a3e96142fe4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18283
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index 25fcbf5..0fafc47 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -94,6 +94,15 @@
 
     @Override
     public ObjectNode asJson() {
+        return putJson();
+    }
+
+    @Override
+    public ObjectNode asRedactedJson() {
+        return putJson();
+    }
+
+    private ObjectNode putJson() {
         ObjectNode json = JSONUtil.createObject();
         json.put("uuid", requestReference.getUuid());
         json.put("requestTime", new ADateTime(requestReference.getTime()).toSimpleString());
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
index 9f2cbdd..31f1979 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -33,6 +33,7 @@
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
 import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.util.LogRedactionUtil;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
@@ -95,11 +96,21 @@
     @Override
     public ObjectNode asJson() {
         ObjectNode json = super.asJson();
-        putJobDetails(json);
-        json.put("statement", statement);
+        return asJson(json, false);
+    }
+
+    @Override
+    public ObjectNode asRedactedJson() {
+        ObjectNode json = super.asRedactedJson();
+        return asJson(json, true);
+    }
+
+    private ObjectNode asJson(ObjectNode json, boolean redact) {
+        putJobDetails(json, redact);
+        json.put("statement", redact ? LogRedactionUtil.statement(statement) : statement);
         json.put("clientContextID", clientContextId);
         if (plan != null) {
-            json.put("plan", plan);
+            json.put("plan", redact ? LogRedactionUtil.userData(plan) : plan);
         }
         return json;
     }
@@ -132,16 +143,16 @@
         return ExceptionUtils.unwrap(e).getMessage();
     }
 
-    private void putJobDetails(ObjectNode json) {
+    private void putJobDetails(ObjectNode json, boolean redact) {
         try {
             json.put("jobId", jobId != null ? jobId.toString() : null);
-            putJobState(json, jobState);
+            putJobState(json, jobState, redact);
         } catch (Throwable th) {
             // ignore
         }
     }
 
-    private static void putJobState(ObjectNode json, JobState state) {
+    private static void putJobState(ObjectNode json, JobState state, boolean redact) {
         AMutableDateTime dateTime = new AMutableDateTime(0);
         putTime(json, state.createTime, "jobCreateTime", dateTime);
         putTime(json, state.startTime, "jobStartTime", dateTime);
@@ -152,7 +163,7 @@
         json.put("jobRequiredCPUs", state.requiredCPUs);
         json.put("jobRequiredMemory", state.requiredMemoryInBytes);
         if (state.errorMsg != null) {
-            json.put("error", state.errorMsg);
+            json.put("error", redact ? LogRedactionUtil.userData(state.errorMsg) : state.errorMsg);
         }
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
index 285c4c8..b6c6a71 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java
@@ -35,6 +35,7 @@
 
 public abstract class AbstractRequestsServlet extends AbstractServlet {
 
+    public static final String REDACT_PARAM = "redact";
     protected final ICcApplicationContext appCtx;
 
     public AbstractRequestsServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, String... paths) {
@@ -46,8 +47,15 @@
     protected void get(IServletRequest request, IServletResponse response) throws Exception {
         ArrayNode requestsJson = JSONUtil.createArray();
         Collection<IClientRequest> requests = getRequests();
-        for (IClientRequest req : requests) {
-            requestsJson.add(req.asJson());
+        String redact = request.getParameter(REDACT_PARAM);
+        if (Boolean.parseBoolean(redact)) {
+            for (IClientRequest req : requests) {
+                requestsJson.add(req.asRedactedJson());
+            }
+        } else {
+            for (IClientRequest req : requests) {
+                requestsJson.add(req.asJson());
+            }
         }
         HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request);
         response.setStatus(HttpResponseStatus.OK);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
index 7eee42e..07f8eea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
@@ -18,5 +18,5 @@
  */
  -- param client_context_id=ensure_running_query
  -- polltimeoutsecs=15
-SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM active_requests() rqst
+SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid, rqst.jobStatus FROM active_requests() rqst
 WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp
new file mode 100644
index 0000000..de35939
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ -- param client_context_id=ensure_completed_query
+ -- param ignoreextrafields=true
+SELECT VALUE (SELECT VALUE r FROM completed_requests() r
+WHERE r.state="completed" AND r.clientContextID = "completed_requests_query")[0];
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
index e31fe3b..170838e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
@@ -1 +1 @@
-/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*" \}/
\ No newline at end of file
+/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*", "jobStatus": "RUNNING" \}/
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson
new file mode 100644
index 0000000..15affd1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson
@@ -0,0 +1,19 @@
+{
+    "cancellable": true,
+    "clientContextID": "completed_requests_query",
+    "elapsedTime": "R{.*}",
+    "jobCreateTime": "R{.*}",
+    "jobEndTime": "R{.*}",
+    "jobId": "R{.*}",
+    "jobQueueTime": "R{.*}",
+    "jobRequiredCPUs": "R{.*}",
+    "jobRequiredMemory": "R{.*}",
+    "jobStartTime": "R{.*}",
+    "jobStatus": "TERMINATED",
+    "node": "R{.*}",
+    "remoteAddr": "R{.*}",
+    "requestTime": "R{.*}",
+    "state": "completed",
+    "userAgent": "R{.*}",
+    "uuid": "R{.*}"
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 8d2c91a..ee518dd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -104,6 +104,11 @@
     ObjectNode asJson();
 
     /**
+     * @return A redacted json node representation of this request
+     */
+    ObjectNode asRedactedJson();
+
+    /**
      * Called when the job is created.
      *
      * @param jobId the job id
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index 5771201..9875651 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -18,9 +18,9 @@
  */
 package org.apache.asterix.runtime.utils;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -30,7 +30,6 @@
 import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.api.IRequestTracker;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
@@ -42,14 +41,22 @@
 
     private final Map<String, IClientRequest> runningRequests = new ConcurrentHashMap<>();
     private final Map<String, IClientRequest> clientIdRequests = new ConcurrentHashMap<>();
-    private final CircularFifoQueue<IClientRequest> completedRequests;
+    private final Map<String, IClientRequest> completedRequests;
     private final ICcApplicationContext ccAppCtx;
     private final AtomicLong numRequests;
     private final AtomicLong numOfFailedRequests;
 
     public RequestTracker(ICcApplicationContext ccAppCtx) {
         this.ccAppCtx = ccAppCtx;
-        completedRequests = new CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize());
+        int archiveSize = ccAppCtx.getExternalProperties().getRequestsArchiveSize();
+        completedRequests = new LinkedHashMap<>(archiveSize) {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<String, IClientRequest> eldest) {
+                return size() > archiveSize;
+            }
+        };
         numRequests = new AtomicLong(0);
         numOfFailedRequests = new AtomicLong(0);
     }
@@ -102,7 +109,7 @@
 
     @Override
     public synchronized Collection<IClientRequest> getCompletedRequests() {
-        return Collections.unmodifiableCollection(new ArrayList<>(completedRequests));
+        return Collections.unmodifiableCollection(completedRequests.values());
     }
 
     private void cancel(IClientRequest request) throws HyracksDataException {
@@ -122,7 +129,7 @@
     }
 
     private synchronized void archive(IClientRequest request) {
-        completedRequests.add(request);
+        completedRequests.put(request.getId(), request);
     }
 
     public long getTotalNumberOfRequests() {
@@ -139,13 +146,14 @@
         return numOfFailedRequests.get();
     }
 
+    @Override
     public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status)
             throws HyracksException {
         String requestId = spec.getRequestId();
         if (requestId != null) {
-            IClientRequest clientRequest = runningRequests.get(requestId);
-            if (clientRequest != null) {
-                clientRequest.jobCreated(jobId, spec.getRequiredClusterCapacity(), status);
+            IClientRequest request = getRequest(requestId);
+            if (request != null) {
+                request.jobCreated(jobId, spec.getRequiredClusterCapacity(), status);
             }
         }
     }
@@ -154,9 +162,9 @@
     public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException {
         String requestId = spec.getRequestId();
         if (requestId != null) {
-            IClientRequest clientRequest = runningRequests.get(requestId);
-            if (clientRequest != null) {
-                clientRequest.jobStarted(jobId);
+            IClientRequest request = getRequest(requestId);
+            if (request != null) {
+                request.jobStarted(jobId);
             }
         }
     }
@@ -166,10 +174,20 @@
             throws HyracksException {
         String requestId = spec.getRequestId();
         if (requestId != null) {
-            IClientRequest clientRequest = runningRequests.get(requestId);
-            if (clientRequest != null) {
-                clientRequest.jobFinished(jobId, jobStatus, exceptions);
+            IClientRequest request = getRequest(requestId);
+            if (request != null) {
+                request.jobFinished(jobId, jobStatus, exceptions);
             }
         }
     }
+
+    private IClientRequest getRequest(String requestId) {
+        IClientRequest clientRequest = runningRequests.get(requestId);
+        if (clientRequest != null) {
+            return clientRequest;
+        }
+        synchronized (this) {
+            return completedRequests.get(requestId);
+        }
+    }
 }