[ASTERIXDB-2513][FUN] Add Active_Requests Function
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add a datasource function (active_requests) which
returns the active jobs that the user specified
client_context_id for.
- This function runs on a single NC and uses messaging
to get the currently running jobs from CC.
- Currently, the function returns the following fields:
-- clientContextId: the user specified clientContextId.
-- requestTime: a timestamp at which the request reference
was created.
-- jobId: optionally, the job id that belongs to this request.
- The function may be improved later to return all jobs and it may
return additional fields such as (request uuid, statement,
executionTime, elapsedTime, nodeAddress, userAgent, etc..)
- Add test case.
- Do not allow cancellation test to cancel queries with
clientContextId to avoid intermittent failures.
Change-Id: I95962742161ed18c4cf2e09c8541c8ad3b35356c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3136
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-algebra/pom.xml b/asterixdb/asterix-algebra/pom.xml
index f9f5677..369d93b 100644
--- a/asterixdb/asterix-algebra/pom.xml
+++ b/asterixdb/asterix-algebra/pom.xml
@@ -242,5 +242,9 @@
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-util</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index a9bd856..ec44d60 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -20,10 +20,15 @@
import org.apache.asterix.common.api.IClientRequest;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.om.base.ADateTime;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.JSONUtil;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
public abstract class BaseClientRequest implements IClientRequest {
protected final IStatementExecutorContext ctx;
+ protected final long requestTime = System.currentTimeMillis();
protected final String contextId;
private boolean complete;
@@ -50,5 +55,21 @@
doCancel(appCtx);
}
+ @Override
+ public String toJson() {
+ try {
+ return JSONUtil.convertNode(asJson());
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ protected ObjectNode asJson() {
+ ObjectNode json = JSONUtil.createObject();
+ json.put("requestTime", new ADateTime(requestTime).toSimpleString());
+ json.put("clientContextID", contextId);
+ return json;
+ }
+
protected abstract void doCancel(ICcApplicationContext appCtx) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
index 520ce03..81714ca 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
@@ -22,6 +22,9 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.util.JSONUtil;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
public class ClientJobRequest extends BaseClientRequest {
private final JobId jobId;
@@ -41,4 +44,15 @@
}
ctx.remove(contextId);
}
+
+ @Override
+ public String toJson() {
+ final ObjectNode jsonNode = super.asJson();
+ jsonNode.put("jobId", jobId.toString());
+ try {
+ return JSONUtil.convertNode(jsonNode);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
index 78080f3..29e7bda 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
@@ -19,6 +19,8 @@
package org.apache.asterix.translator;
+import java.util.Map;
+
import org.apache.asterix.common.api.IClientRequest;
/**
@@ -52,4 +54,9 @@
* a user provided client context id.
*/
IClientRequest remove(String clientContextId);
+
+ /**
+ * @return The currently running requests
+ */
+ Map<String, IClientRequest> getRunningRequests();
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
index c4e2859..a2a7906 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.translator;
+import java.util.Collections;
+import java.util.Map;
+
import org.apache.asterix.common.api.IClientRequest;
public class NoOpStatementExecutorContext implements IStatementExecutorContext {
@@ -42,4 +45,8 @@
return null;
}
+ @Override
+ public Map<String, IClientRequest> getRunningRequests() {
+ return Collections.emptyMap();
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
index a4da189..136fda7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.api.http.ctx;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -42,4 +43,9 @@
public IClientRequest remove(String clientContextId) {
return runningQueries.remove(clientContextId);
}
+
+ @Override
+ public Map<String, IClientRequest> getRunningRequests() {
+ return Collections.unmodifiableMap(runningQueries);
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java
new file mode 100644
index 0000000..6d32763
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsDatasource.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class ActiveRequestsDatasource extends FunctionDataSource {
+
+ private static final DataSourceId ACTIVE_REQUESTS_DATASOURCE_ID = new DataSourceId(
+ ActiveRequestsRewriter.ACTIVE_REQUESTS.getNamespace(), ActiveRequestsRewriter.ACTIVE_REQUESTS.getName());
+
+ public ActiveRequestsDatasource(INodeDomain domain) throws AlgebricksException {
+ super(ACTIVE_REQUESTS_DATASOURCE_ID, domain);
+ }
+
+ @Override
+ protected IDatasourceFunction createFunction(MetadataProvider metadataProvider,
+ AlgebricksAbsolutePartitionConstraint locations) {
+ AlgebricksAbsolutePartitionConstraint randomLocation =
+ AlgebricksAbsolutePartitionConstraint.randomLocation(locations.getLocations());
+ return new ActiveRequestsFunction(randomLocation);
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java
new file mode 100644
index 0000000..7c621f1
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsFunction.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.app.message.ActiveRequestsRequest;
+import org.apache.asterix.app.message.ActiveRequestsResponse;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ActiveRequestsFunction extends AbstractDatasourceFunction {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 1L;
+
+ public ActiveRequestsFunction(AlgebricksAbsolutePartitionConstraint locations) {
+ super(locations);
+ }
+
+ @Override
+ public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext();
+ INCMessageBroker messageBroker = (INCMessageBroker) serviceCtx.getMessageBroker();
+ MessageFuture messageFuture = messageBroker.registerMessageFuture();
+ long futureId = messageFuture.getFutureId();
+ ActiveRequestsRequest request = new ActiveRequestsRequest(serviceCtx.getNodeId(), futureId);
+ try {
+ messageBroker.sendMessageToPrimaryCC(request);
+ ActiveRequestsResponse response =
+ (ActiveRequestsResponse) messageFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ return new ActiveRequestsReader(response.getRequests());
+ } catch (Exception e) {
+ LOGGER.warn("Could not retrieve active requests", e);
+ throw HyracksDataException.create(e);
+ } finally {
+ messageBroker.deregisterMessageFuture(futureId);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java
new file mode 100644
index 0000000..9c9ebd9
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+
+public class ActiveRequestsReader extends FunctionReader {
+
+ private final String[] activeRequests;
+ private CharArrayRecord record;
+ private int recordIndex;
+
+ public ActiveRequestsReader(String[] activeRequests) {
+ this.activeRequests = activeRequests;
+ record = new CharArrayRecord();
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return recordIndex < activeRequests.length;
+ }
+
+ @Override
+ public IRawRecord<char[]> next() throws IOException {
+ record.reset();
+ record.append((activeRequests[recordIndex++]).toCharArray());
+ record.endRecord();
+ return record;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsRewriter.java
new file mode 100644
index 0000000..b0daabb
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/ActiveRequestsRewriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ActiveRequestsRewriter extends FunctionRewriter {
+
+ public static final FunctionIdentifier ACTIVE_REQUESTS =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "active-requests", 0);
+ public static final ActiveRequestsRewriter INSTANCE = new ActiveRequestsRewriter(ACTIVE_REQUESTS);
+
+ private ActiveRequestsRewriter(FunctionIdentifier functionId) {
+ super(functionId);
+ }
+
+ @Override
+ protected FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression f)
+ throws AlgebricksException {
+ return new ActiveRequestsDatasource(context.getComputationNodeDomain());
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
new file mode 100644
index 0000000..9d15131
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.Collection;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ActiveRequestsRequest implements ICcAddressedMessage {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+ private final long reqId;
+
+ public ActiveRequestsRequest(String nodeId, long reqId) {
+ this.nodeId = nodeId;
+ this.reqId = reqId;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
+ CCApplication application = (CCApplication) ccs.getApplication();
+ IStatementExecutorContext executorsCtx = application.getStatementExecutorContext();
+ final Collection<IClientRequest> runningRequests = executorsCtx.getRunningRequests().values();
+ final String[] requests = runningRequests.stream().map(IClientRequest::toJson).toArray(String[]::new);
+ ActiveRequestsResponse response = new ActiveRequestsResponse(reqId, requests);
+ CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ try {
+ messageBroker.sendApplicationMessageToNC(response, nodeId);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "Failure sending response to nc", e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java
new file mode 100644
index 0000000..0bf7976
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsResponse.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ActiveRequestsResponse implements INcAddressedMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final long reqId;
+ private final String[] requests;
+
+ public ActiveRequestsResponse(long reqId, String[] requests) {
+ this.reqId = reqId;
+ this.requests = requests;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ MessageFuture future = mb.deregisterMessageFuture(reqId);
+ if (future != null) {
+ future.complete(this);
+ }
+ }
+
+ public String[] getRequests() {
+ return requests;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
index 83ceec7..3407d59 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.util;
+import org.apache.asterix.app.function.ActiveRequestsRewriter;
import org.apache.asterix.app.function.DatasetResourcesRewriter;
import org.apache.asterix.app.function.DatasetRewriter;
import org.apache.asterix.app.function.FeedRewriter;
@@ -54,6 +55,11 @@
(expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
BuiltinFunctions.addUnnestFun(PingRewriter.PING, true);
BuiltinFunctions.addDatasourceFunction(PingRewriter.PING, PingRewriter.INSTANCE);
+ // Active requests function
+ BuiltinFunctions.addFunction(ActiveRequestsRewriter.ACTIVE_REQUESTS,
+ (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true);
+ BuiltinFunctions.addUnnestFun(ActiveRequestsRewriter.ACTIVE_REQUESTS, true);
+ BuiltinFunctions.addDatasourceFunction(ActiveRequestsRewriter.ACTIVE_REQUESTS, ActiveRequestsRewriter.INSTANCE);
}
private MetadataBuiltinFunctions() {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index e85fedf..48d11ee 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -51,6 +51,7 @@
public InputStream executeQueryService(String str, TestCaseContext.OutputFormat fmt, URI uri,
List<TestCase.CompilationUnit.Parameter> params, boolean jsonEncoded,
Predicate<Integer> responseCodeValidator, boolean cancellable) throws Exception {
+ cancellable = cancellable && !containsClientContextID(str);
String clientContextId = UUID.randomUUID().toString();
final List<TestCase.CompilationUnit.Parameter> newParams = cancellable
? upsertParam(params, "client_context_id", ParameterTypeEnum.STRING, clientContextId) : params;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 4766639..282d83d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -1999,6 +1999,12 @@
}
}
+ protected static boolean containsClientContextID(String statement) {
+ List<Parameter> httpParams = extractParameters(statement);
+ return httpParams.stream().map(Parameter::getName)
+ .anyMatch(QueryServiceServlet.Parameter.CLIENT_ID.str()::equals);
+ }
+
private static boolean isCancellable(String type) {
return !NON_CANCELLABLE.contains(type);
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.1.async.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.1.async.sqlpp
new file mode 100644
index 0000000..ec96a51
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.1.async.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+-- param client_context_id=sleep_async_query
+-- handlevariable=status
+
+select value sleep("result", 10000);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
new file mode 100644
index 0000000..3a29ef2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ -- param client_context_id=ensure_running_query
+ -- polltimeoutsecs=15
+SELECT VALUE rqst FROM active_requests() rqst
+WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.3.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.3.pollquery.sqlpp
new file mode 100644
index 0000000..1881f1a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.3.pollquery.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ -- param client_context_id=ensure_completed_query
+ -- polltimeoutsecs=15
+SELECT VALUE rqst FROM active_requests() rqst
+WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.1.ignore b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.1.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.1.ignore
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
new file mode 100644
index 0000000..92f4746
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
@@ -0,0 +1 @@
+/\{ "clientContextID": "sleep_async_query", "jobId": "JID:.*", "requestTime": ".*" \}/
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.3.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.3.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 8156244..ceee5f9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -4982,6 +4982,11 @@
<output-dir compare="Text">p_sort_num_samples</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="misc">
+ <compilation-unit name="active_requests">
+ <output-dir compare="Text">active_requests</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="index">
<test-group name="index/validations">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 30759de..53771d5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -35,4 +35,9 @@
* @throws HyracksDataException
*/
void cancel(ICcApplicationContext appCtx) throws HyracksDataException;
+
+ /**
+ * @return A json representation of this request
+ */
+ String toJson();
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
index efcb828..62e5c87 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
@@ -118,11 +118,15 @@
return sbder.toString();
}
- public String toSimpleString() throws IOException {
+ public String toSimpleString() {
StringBuilder sbder = new StringBuilder();
- GregorianCalendarSystem.getInstance().getExtendStringRepUntilField(chrononTime, 0, sbder,
- GregorianCalendarSystem.Fields.YEAR, GregorianCalendarSystem.Fields.MILLISECOND, true);
- return sbder.toString();
+ try {
+ GregorianCalendarSystem.getInstance().getExtendStringRepUntilField(chrononTime, 0, sbder,
+ GregorianCalendarSystem.Fields.YEAR, GregorianCalendarSystem.Fields.MILLISECOND, true);
+ return sbder.toString();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
}
public long getChrononTime() {
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
index fa4a707..b53f4d3 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.algebricks.common.constraints;
import java.util.Arrays;
+import java.util.Random;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -33,6 +34,11 @@
Arrays.sort(sortedLocations);
}
+ public static AlgebricksAbsolutePartitionConstraint randomLocation(String[] locations) {
+ int randomIndex = new Random().nextInt(locations.length);
+ return new AlgebricksAbsolutePartitionConstraint(new String[] { locations[randomIndex] });
+ }
+
@Override
public PartitionConstraintType getPartitionConstraintType() {
return PartitionConstraintType.ABSOLUTE;