[NO ISSUE][FUN] Add Completed_Requests Function
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add completed_requests to get recently completed
requests.
- Use enum for requests state.
- Add new config to specify the requests archive
size.
- Add test case for completed_requests function.
Change-Id: I3f47d523c683c3879ec52ce5bdaf16ce338e8e46
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3301
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index 50e6cc2..b7ef4e6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -33,7 +33,7 @@
private boolean complete;
private final IRequestReference requestReference;
private boolean cancellable = false;
- protected volatile String state = "received";
+ protected volatile State state = State.RECEIVED;
public BaseClientRequest(IRequestReference requestReference) {
this.requestReference = requestReference;
@@ -45,6 +45,7 @@
return;
}
complete = true;
+ state = State.COMPLETED;
}
@Override
@@ -53,6 +54,7 @@
return;
}
complete();
+ state = State.CANCELLED;
if (cancellable) {
doCancel(appCtx);
}
@@ -76,7 +78,7 @@
}
public void setRunning() {
- state = "running";
+ state = State.RUNNING;
}
@Override
@@ -90,7 +92,7 @@
json.put("requestTime", new ADateTime(requestReference.getTime()).toSimpleString());
json.put("elapsedTime", getElapsedTime());
json.put("node", requestReference.getNode());
- json.put("state", state);
+ json.put("state", state.getLabel());
json.put("userAgent", ((RequestReference) requestReference).getUserAgent());
json.put("remoteAddr", ((RequestReference) requestReference).getRemoteAddr());
json.put("cancellable", cancellable);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java
index 6d32763..eb5416b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.app.function;
+import org.apache.asterix.app.message.ClientRequestsRequest;
import org.apache.asterix.metadata.api.IDatasourceFunction;
import org.apache.asterix.metadata.declared.DataSourceId;
import org.apache.asterix.metadata.declared.FunctionDataSource;
@@ -40,6 +41,6 @@
AlgebricksAbsolutePartitionConstraint locations) {
AlgebricksAbsolutePartitionConstraint randomLocation =
AlgebricksAbsolutePartitionConstraint.randomLocation(locations.getLocations());
- return new ActiveRequestsFunction(randomLocation);
+ return new ClientRequestsFunction(randomLocation, ClientRequestsRequest.RequestType.RUNNING);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ClientRequestsFunction.java
similarity index 77%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ClientRequestsFunction.java
index 7c621f1..d0e1b27 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ClientRequestsFunction.java
@@ -18,12 +18,13 @@
*/
package org.apache.asterix.app.function;
+import static org.apache.asterix.app.message.ClientRequestsRequest.RequestType;
import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
import java.util.concurrent.TimeUnit;
-import org.apache.asterix.app.message.ActiveRequestsRequest;
-import org.apache.asterix.app.message.ActiveRequestsResponse;
+import org.apache.asterix.app.message.ClientRequestsRequest;
+import org.apache.asterix.app.message.ClientRequestsResponse;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.messaging.api.MessageFuture;
import org.apache.asterix.external.api.IRecordReader;
@@ -35,13 +36,15 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public class ActiveRequestsFunction extends AbstractDatasourceFunction {
+public class ClientRequestsFunction extends AbstractDatasourceFunction {
private static final Logger LOGGER = LogManager.getLogger();
private static final long serialVersionUID = 1L;
+ private final RequestType requestType;
- public ActiveRequestsFunction(AlgebricksAbsolutePartitionConstraint locations) {
+ public ClientRequestsFunction(AlgebricksAbsolutePartitionConstraint locations, RequestType requestType) {
super(locations);
+ this.requestType = requestType;
}
@Override
@@ -51,12 +54,12 @@
INCMessageBroker messageBroker = (INCMessageBroker) serviceCtx.getMessageBroker();
MessageFuture messageFuture = messageBroker.registerMessageFuture();
long futureId = messageFuture.getFutureId();
- ActiveRequestsRequest request = new ActiveRequestsRequest(serviceCtx.getNodeId(), futureId);
+ ClientRequestsRequest request = new ClientRequestsRequest(serviceCtx.getNodeId(), futureId, requestType);
try {
messageBroker.sendMessageToPrimaryCC(request);
- ActiveRequestsResponse response =
- (ActiveRequestsResponse) messageFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- return new ActiveRequestsReader(response.getRequests());
+ ClientRequestsResponse response =
+ (ClientRequestsResponse) messageFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ return new ClientRequestsReader(response.getRequests());
} catch (Exception e) {
LOGGER.warn("Could not retrieve active requests", e);
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ClientRequestsReader.java
similarity index 80%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ClientRequestsReader.java
index 9c9ebd9..a5b6370 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ClientRequestsReader.java
@@ -23,26 +23,26 @@
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.input.record.CharArrayRecord;
-public class ActiveRequestsReader extends FunctionReader {
+public class ClientRequestsReader extends FunctionReader {
- private final String[] activeRequests;
+ private final String[] requests;
private CharArrayRecord record;
private int recordIndex;
- public ActiveRequestsReader(String[] activeRequests) {
- this.activeRequests = activeRequests;
+ public ClientRequestsReader(String[] requests) {
+ this.requests = requests;
record = new CharArrayRecord();
}
@Override
public boolean hasNext() throws Exception {
- return recordIndex < activeRequests.length;
+ return recordIndex < requests.length;
}
@Override
public IRawRecord<char[]> next() throws IOException {
record.reset();
- record.append((activeRequests[recordIndex++]).toCharArray());
+ record.append((requests[recordIndex++]).toCharArray());
record.endRecord();
return record;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/CompletedRequestsDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/CompletedRequestsDatasource.java
new file mode 100644
index 0000000..4c3672b
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/CompletedRequestsDatasource.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.app.message.ClientRequestsRequest;
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class CompletedRequestsDatasource extends FunctionDataSource {
+
+ private static final DataSourceId COMPLETED_REQUESTS_DATASOURCE_ID =
+ new DataSourceId(ActiveRequestsRewriter.ACTIVE_REQUESTS.getNamespace(),
+ CompletedRequestsRewriter.COMPLETED_REQUESTS.getName());
+
+ public CompletedRequestsDatasource(INodeDomain domain) throws AlgebricksException {
+ super(COMPLETED_REQUESTS_DATASOURCE_ID, domain);
+ }
+
+ @Override
+ protected IDatasourceFunction createFunction(MetadataProvider metadataProvider,
+ AlgebricksAbsolutePartitionConstraint locations) {
+ AlgebricksAbsolutePartitionConstraint randomLocation =
+ AlgebricksAbsolutePartitionConstraint.randomLocation(locations.getLocations());
+ return new ClientRequestsFunction(randomLocation, ClientRequestsRequest.RequestType.COMPLETED);
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/CompletedRequestsRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/CompletedRequestsRewriter.java
new file mode 100644
index 0000000..df6bd3e
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/CompletedRequestsRewriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class CompletedRequestsRewriter extends FunctionRewriter {
+
+ public static final FunctionIdentifier COMPLETED_REQUESTS =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "completed-requests", 0);
+ public static final CompletedRequestsRewriter INSTANCE = new CompletedRequestsRewriter(COMPLETED_REQUESTS);
+
+ private CompletedRequestsRewriter(FunctionIdentifier functionId) {
+ super(functionId);
+ }
+
+ @Override
+ protected FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f)
+ throws AlgebricksException {
+ return new CompletedRequestsDatasource(context.getComputationNodeDomain());
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ClientRequestsRequest.java
similarity index 68%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ClientRequestsRequest.java
index f1c2a9c..32667d8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ClientRequestsRequest.java
@@ -29,23 +29,40 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public class ActiveRequestsRequest implements ICcAddressedMessage {
+public class ClientRequestsRequest implements ICcAddressedMessage {
+
+ public enum RequestType {
+ RUNNING,
+ COMPLETED
+ }
private static final Logger LOGGER = LogManager.getLogger();
private static final long serialVersionUID = 1L;
private final String nodeId;
private final long reqId;
+ private final RequestType requestType;
- public ActiveRequestsRequest(String nodeId, long reqId) {
+ public ClientRequestsRequest(String nodeId, long reqId, RequestType requestType) {
this.nodeId = nodeId;
this.reqId = reqId;
+ this.requestType = requestType;
}
@Override
public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- final Collection<IClientRequest> runningRequests = appCtx.getRequestTracker().getRunningRequests();
- final String[] requests = runningRequests.stream().map(IClientRequest::toJson).toArray(String[]::new);
- ActiveRequestsResponse response = new ActiveRequestsResponse(reqId, requests);
+ Collection<IClientRequest> clientRequests;
+ switch (requestType) {
+ case RUNNING:
+ clientRequests = appCtx.getRequestTracker().getRunningRequests();
+ break;
+ case COMPLETED:
+ clientRequests = appCtx.getRequestTracker().getCompletedRequests();
+ break;
+ default:
+ throw new IllegalStateException("unrecognized request type: " + requestType);
+ }
+ final String[] requests = clientRequests.stream().map(IClientRequest::toJson).toArray(String[]::new);
+ ClientRequestsResponse response = new ClientRequestsResponse(reqId, requests);
CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
try {
messageBroker.sendApplicationMessageToNC(response, nodeId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ClientRequestsResponse.java
similarity index 93%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ClientRequestsResponse.java
index 0bf7976..5c25208 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ClientRequestsResponse.java
@@ -24,13 +24,13 @@
import org.apache.asterix.messaging.NCMessageBroker;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class ActiveRequestsResponse implements INcAddressedMessage {
+public class ClientRequestsResponse implements INcAddressedMessage {
private static final long serialVersionUID = 1L;
private final long reqId;
private final String[] requests;
- public ActiveRequestsResponse(long reqId, String[] requests) {
+ public ClientRequestsResponse(long reqId, String[] requests) {
this.reqId = reqId;
this.requests = requests;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
index a8314ec..738d6b4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
@@ -19,6 +19,7 @@
package org.apache.asterix.util;
import org.apache.asterix.app.function.ActiveRequestsRewriter;
+import org.apache.asterix.app.function.CompletedRequestsRewriter;
import org.apache.asterix.app.function.DatasetResourcesRewriter;
import org.apache.asterix.app.function.DatasetRewriter;
import org.apache.asterix.app.function.FeedRewriter;
@@ -66,6 +67,12 @@
(expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
BuiltinFunctions.addUnnestFun(JobSummariesRewriter.JOBSUMMARIES, true);
BuiltinFunctions.addDatasourceFunction(JobSummariesRewriter.JOBSUMMARIES, JobSummariesRewriter.INSTANCE);
+ // completed requests function
+ BuiltinFunctions.addFunction(CompletedRequestsRewriter.COMPLETED_REQUESTS,
+ (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
+ BuiltinFunctions.addUnnestFun(CompletedRequestsRewriter.COMPLETED_REQUESTS, true);
+ BuiltinFunctions.addDatasourceFunction(CompletedRequestsRewriter.COMPLETED_REQUESTS,
+ CompletedRequestsRewriter.INSTANCE);
}
private MetadataBuiltinFunctions() {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index 801bd0f..348b947 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -31,6 +31,7 @@
import org.apache.asterix.api.http.server.ServletConstants;
import org.apache.asterix.app.translator.RequestParameters;
import org.apache.asterix.common.api.RequestReference;
+import org.apache.asterix.common.config.ExternalProperties;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.runtime.utils.RequestTracker;
import org.apache.asterix.translator.ClientRequest;
@@ -50,6 +51,9 @@
@Test
public void testDelete() throws Exception {
ICcApplicationContext appCtx = mock(ICcApplicationContext.class);
+ ExternalProperties externalProperties = mock(ExternalProperties.class);
+ Mockito.when(externalProperties.getRequestsArchiveSize()).thenReturn(50);
+ Mockito.when(appCtx.getExternalProperties()).thenReturn(externalProperties);
RequestTracker tracker = new RequestTracker(appCtx);
Mockito.when(appCtx.getRequestTracker()).thenReturn(tracker);
// Creates a query cancellation servlet.
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.1.query.sqlpp
new file mode 100644
index 0000000..aaf08c8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.1.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+-- param client_context_id=completed_requests_query
+
+select value "anything";
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.3.query.sqlpp
new file mode 100644
index 0000000..1bfb32f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.3.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ -- param client_context_id=ensure_completed_query
+
+LET request_count = (SELECT VALUE COUNT(*) FROM completed_requests() r
+WHERE r.state="completed" AND r.clientContextID = "completed_requests_query")[0]
+SELECT VALUE request_count > 0;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.1.adm
new file mode 100644
index 0000000..ea7c79d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.1.adm
@@ -0,0 +1 @@
+"anything"
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.2.adm
new file mode 100644
index 0000000..f32a580
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.2.adm
@@ -0,0 +1 @@
+true
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index aeb79e4..5258102 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -5752,6 +5752,11 @@
<output-dir compare="Text">jobs</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="misc">
+ <compilation-unit name="completed_requests">
+ <output-dir compare="Text">completed_requests</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="index">
<test-group name="index/validations">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 430cd2a..515c837 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -23,6 +23,24 @@
public interface IClientRequest {
+ enum State {
+
+ RECEIVED("received"),
+ RUNNING("running"),
+ CANCELLED("cancelled"),
+ COMPLETED("completed");
+
+ private final String label;
+
+ State(String label) {
+ this.label = label;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+ }
+
/**
* A system wide unique id representing this {@link IClientRequest}
*
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
index dc165b4e..a3ddb30 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
@@ -68,4 +68,11 @@
* @return the currently running requests
*/
Collection<IClientRequest> getRunningRequests();
+
+ /**
+ * Gets the recently completed requests
+ *
+ * @return the recently completed requests
+ */
+ Collection<IClientRequest> getCompletedRequests();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
index 620e47c..1533c9f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
@@ -48,7 +48,8 @@
MAX_WEB_REQUEST_SIZE(
UNSIGNED_INTEGER,
StorageUtil.getIntSizeInBytes(50, StorageUtil.StorageUnit.MEGABYTE),
- "The maximum accepted web request size in bytes");
+ "The maximum accepted web request size in bytes"),
+ REQUESTS_ARCHIVE_SIZE(UNSIGNED_INTEGER, 50, "The maximum number of archived requests to maintain");
private final IOptionType type;
private final Object defaultValue;
@@ -67,6 +68,7 @@
case WEB_QUERYINTERFACE_PORT:
case API_PORT:
case ACTIVE_PORT:
+ case REQUESTS_ARCHIVE_SIZE:
return Section.CC;
case NC_API_PORT:
return Section.NC;
@@ -141,4 +143,8 @@
public int getMaxWebRequestSize() {
return accessor.getInt(Option.MAX_WEB_REQUEST_SIZE);
}
+
+ public int getRequestsArchiveSize() {
+ return accessor.getInt(Option.REQUESTS_ARCHIVE_SIZE);
+ }
}
diff --git a/asterixdb/asterix-runtime/pom.xml b/asterixdb/asterix-runtime/pom.xml
index c7e7071..7b6faf0 100644
--- a/asterixdb/asterix-runtime/pom.xml
+++ b/asterixdb/asterix-runtime/pom.xml
@@ -200,5 +200,9 @@
<artifactId>fastutil</artifactId>
<version>8.2.2</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index a0ab559..a8749b1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.runtime.utils;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
@@ -27,16 +28,19 @@
import org.apache.asterix.common.api.IClientRequest;
import org.apache.asterix.common.api.IRequestTracker;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class RequestTracker implements IRequestTracker {
private final Map<String, IClientRequest> runningRequests = new ConcurrentHashMap<>();
private final Map<String, IClientRequest> clientIdRequests = new ConcurrentHashMap<>();
+ private final CircularFifoQueue<IClientRequest> completedRequests;
private final ICcApplicationContext ccAppCtx;
public RequestTracker(ICcApplicationContext ccAppCtx) {
this.ccAppCtx = ccAppCtx;
+ completedRequests = new CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize());
}
@Override
@@ -84,6 +88,11 @@
return Collections.unmodifiableCollection(runningRequests.values());
}
+ @Override
+ public synchronized Collection<IClientRequest> getCompletedRequests() {
+ return Collections.unmodifiableCollection(new ArrayList<>(completedRequests));
+ }
+
private void cancel(IClientRequest request) throws HyracksDataException {
request.cancel(ccAppCtx);
untrack(request);
@@ -95,5 +104,10 @@
if (clientContextId != null) {
clientIdRequests.remove(request.getClientContextId());
}
+ archive(request);
+ }
+
+ private synchronized void archive(IClientRequest request) {
+ completedRequests.add(request);
}
}