[ASTERIXDB-2518][RT] Introduce Request Tracker
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Introduce IReceptionist to generate request references.
- Track all requests by uuid.
- Add more information to active_requests response.
- Replace StatementExecutorContext by RequestTracker.
- Deprecate StatementExecutorContext (to be removed)
- Allow extensions to set optional parameters in query service.
- Return forbidden when a cancellation is attempt on a request
that is not cancellable.
Change-Id: If08ecd91c55881743b2ecf40a628fa3d4166c554
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3163
Reviewed-by: Till Westmann <tillw@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/pom.xml b/asterixdb/asterix-algebra/pom.xml
index 369d93b..ee0bb5e 100644
--- a/asterixdb/asterix-algebra/pom.xml
+++ b/asterixdb/asterix-algebra/pom.xml
@@ -246,5 +246,13 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-http</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
index ec44d60..50e6cc2 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -19,6 +19,8 @@
package org.apache.asterix.translator;
import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.api.RequestReference;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.om.base.ADateTime;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -27,14 +29,14 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
public abstract class BaseClientRequest implements IClientRequest {
- protected final IStatementExecutorContext ctx;
- protected final long requestTime = System.currentTimeMillis();
- protected final String contextId;
- private boolean complete;
- public BaseClientRequest(IStatementExecutorContext ctx, String contextId) {
- this.ctx = ctx;
- this.contextId = contextId;
+ private boolean complete;
+ private final IRequestReference requestReference;
+ private boolean cancellable = false;
+ protected volatile String state = "received";
+
+ public BaseClientRequest(IRequestReference requestReference) {
+ this.requestReference = requestReference;
}
@Override
@@ -43,7 +45,6 @@
return;
}
complete = true;
- ctx.remove(contextId);
}
@Override
@@ -52,24 +53,55 @@
return;
}
complete();
- doCancel(appCtx);
+ if (cancellable) {
+ doCancel(appCtx);
+ }
+ }
+
+ @Override
+ public synchronized void markCancellable() {
+ cancellable = true;
+ }
+
+ @Override
+ public String getId() {
+ // the uuid is generated by the node which received the request
+ // so there is a chance this might not be unique now
+ return requestReference.getUuid();
+ }
+
+ @Override
+ public synchronized boolean isCancellable() {
+ return cancellable;
+ }
+
+ public void setRunning() {
+ state = "running";
}
@Override
public String toJson() {
- try {
- return JSONUtil.convertNode(asJson());
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
+ return JSONUtil.convertNodeOrThrow(asJson());
}
protected ObjectNode asJson() {
ObjectNode json = JSONUtil.createObject();
- json.put("requestTime", new ADateTime(requestTime).toSimpleString());
- json.put("clientContextID", contextId);
+ json.put("uuid", requestReference.getUuid());
+ json.put("requestTime", new ADateTime(requestReference.getTime()).toSimpleString());
+ json.put("elapsedTime", getElapsedTime());
+ json.put("node", requestReference.getNode());
+ json.put("state", state);
+ json.put("userAgent", ((RequestReference) requestReference).getUserAgent());
+ json.put("remoteAddr", ((RequestReference) requestReference).getRemoteAddr());
+ json.put("cancellable", cancellable);
return json;
}
+ private String getElapsedTime() {
+ // this is just an estimation as the request might have been received on a node with a different system time
+ // TODO add dynamic time unit
+ return System.currentTimeMillis() - requestReference.getTime() + "ms";
+ }
+
protected abstract void doCancel(ICcApplicationContext appCtx) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
deleted file mode 100644
index 81714ca..0000000
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.translator;
-
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.util.JSONUtil;
-
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-public class ClientJobRequest extends BaseClientRequest {
- private final JobId jobId;
-
- public ClientJobRequest(IStatementExecutorContext ctx, String clientCtxId, JobId jobId) {
- super(ctx, clientCtxId);
- this.jobId = jobId;
- }
-
- @Override
- protected void doCancel(ICcApplicationContext appCtx) throws HyracksDataException {
- IHyracksClientConnection hcc = appCtx.getHcc();
- try {
- hcc.cancelJob(jobId);
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
- ctx.remove(contextId);
- }
-
- @Override
- public String toJson() {
- final ObjectNode jsonNode = super.asJson();
- jsonNode.put("jobId", jobId.toString());
- try {
- return JSONUtil.convertNode(jsonNode);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
new file mode 100644
index 0000000..014bf3c
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import java.util.Map;
+
+import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class ClientRequest extends BaseClientRequest {
+
+ protected String statement;
+ protected JobId jobId;
+ protected Thread executor;
+ protected String clientContextId;
+
+ public ClientRequest(IRequestReference requestReference, String clientContextId, String statement,
+ Map<String, String> optionalParameters) {
+ super(requestReference);
+ this.clientContextId = clientContextId;
+ this.statement = statement;
+ this.executor = Thread.currentThread();
+ }
+
+ @Override
+ public String getClientContextId() {
+ return clientContextId;
+ }
+
+ public synchronized void setJobId(JobId jobId) {
+ this.jobId = jobId;
+ setRunning();
+ }
+
+ public Thread getExecutor() {
+ return executor;
+ }
+
+ @Override
+ protected void doCancel(ICcApplicationContext appCtx) throws HyracksDataException {
+ // if the request has a job, we abort the job and do not interrupt the thread as it will be notified
+ // that the job has been cancelled. Otherwise, we interrupt the thread
+ if (jobId != null) {
+ IHyracksClientConnection hcc = appCtx.getHcc();
+ try {
+ hcc.cancelJob(jobId);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ } else if (executor != null) {
+ executor.interrupt();
+ }
+ }
+
+ @Override
+ protected ObjectNode asJson() {
+ ObjectNode json = super.asJson();
+ json.put("jobId", jobId.toString());
+ json.put("statement", statement);
+ json.put("clientContextID", clientContextId);
+ return json;
+ }
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
index 58f0997..86ba301 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java
@@ -20,6 +20,7 @@
import java.util.Map;
+import org.apache.asterix.common.api.IRequestReference;
import org.apache.asterix.om.base.IAObject;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.api.result.IResultSet;
@@ -67,4 +68,18 @@
* @return true if the request accepts multiple statements. Otherwise, false.
*/
boolean isMultiStatement();
+
+ /**
+ * Gets the statement the client provided with the request
+ *
+ * @return the request statement
+ */
+ String getStatement();
+
+ /**
+ * The request reference of this {@link IRequestParameters}
+ *
+ * @return the request reference
+ */
+ IRequestReference getRequestReference();
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 9bc86da..93eed8a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -110,12 +110,10 @@
* Compiles and executes a list of statements
*
* @param hcc
- * @param ctx
* @param requestParameters
* @throws Exception
*/
- void compileAndExecute(IHyracksClientConnection hcc, IStatementExecutorContext ctx,
- IRequestParameters requestParameters) throws Exception;
+ void compileAndExecute(IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception;
/**
* rewrites and compiles query into a hyracks job specifications
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
index 29e7bda..9648036 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
@@ -24,8 +24,9 @@
import org.apache.asterix.common.api.IClientRequest;
/**
- * The context for statement executors. Maintains ongoing user requests.
+ * @deprecated (use IRequestTracker)
*/
+@Deprecated
public interface IStatementExecutorContext {
/**
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
new file mode 100644
index 0000000..8d06143
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.IReceptionist;
+import org.apache.asterix.common.api.IRequestReference;
+import org.apache.asterix.common.api.RequestReference;
+import org.apache.http.HttpHeaders;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.http.api.IServletRequest;
+
+public class Receptionist implements IReceptionist {
+
+ private final String node;
+
+ public Receptionist(String node) {
+ this.node = node;
+ }
+
+ @Override
+ public IRequestReference welcome(IServletRequest request) {
+ final String uuid = UUID.randomUUID().toString();
+ final RequestReference ref = RequestReference.of(uuid, node, System.currentTimeMillis());
+ ref.setUserAgent(request.getHeader(HttpHeaders.USER_AGENT));
+ //TODO set remote address
+ return ref;
+ }
+
+ @Override
+ public IClientRequest requestReceived(IRequestReference requestRef, String clientContextId, String statement,
+ Map<String, String> optionalParameters) throws HyracksDataException {
+ return new ClientRequest(requestRef, clientContextId, statement, optionalParameters);
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
deleted file mode 100644
index 136fda7..0000000
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.api.http.ctx;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.asterix.common.api.IClientRequest;
-import org.apache.asterix.translator.IStatementExecutorContext;
-
-public class StatementExecutorContext implements IStatementExecutorContext {
-
- private final Map<String, IClientRequest> runningQueries = new ConcurrentHashMap<>();
-
- @Override
- public IClientRequest get(String clientContextId) {
- return runningQueries.get(clientContextId);
- }
-
- @Override
- public void put(String clientContextId, IClientRequest req) {
- runningQueries.put(clientContextId, req);
- }
-
- @Override
- public IClientRequest remove(String clientContextId) {
- return runningQueries.remove(clientContextId);
- }
-
- @Override
- public Map<String, IClientRequest> getRunningRequests() {
- return Collections.unmodifiableMap(runningQueries);
- }
-}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index 9844900..b23fa1e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -120,10 +120,8 @@
return hcc;
}
- protected static UUID printRequestId(PrintWriter pw) {
- UUID requestId = UUID.randomUUID();
- ResultUtil.printField(pw, ResultFields.REQUEST_ID.str(), requestId.toString());
- return requestId;
+ protected static void printRequestId(PrintWriter pw, String requestId) {
+ ResultUtil.printField(pw, ResultFields.REQUEST_ID.str(), requestId);
}
protected static void printHandle(PrintWriter pw, String handle, boolean comma) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index d206336..ee3eb9c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -33,6 +33,7 @@
import javax.imageio.ImageIO;
import org.apache.asterix.app.translator.RequestParameters;
+import org.apache.asterix.common.api.IRequestReference;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -89,6 +90,7 @@
@Override
protected void post(IServletRequest request, IServletResponse response) {
+ final IRequestReference requestReference = appCtx.getReceptionist().welcome(request);
// Query language
ILangCompilationProvider compilationProvider = "AQL".equals(request.getParameter("query-language"))
? aqlCompilationProvider : sqlppCompilationProvider;
@@ -149,10 +151,10 @@
compilationProvider, componentProvider);
double duration;
long startTime = System.currentTimeMillis();
- final IRequestParameters requestParameters =
- new RequestParameters(resultSet, new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
- new IStatementExecutor.Stats(), null, null, null, null, true);
- translator.compileAndExecute(hcc, null, requestParameters);
+ final IRequestParameters requestParameters = new RequestParameters(requestReference, query, resultSet,
+ new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE), new IStatementExecutor.Stats(),
+ null, null, null, null, true);
+ translator.compileAndExecute(hcc, requestParameters);
long endTime = System.currentTimeMillis();
duration = (endTime - startTime) / 1000.00;
out.println(HTML_STATEMENT_SEPARATOR);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
index 5f5692d..d260a0b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
@@ -23,8 +23,8 @@
import org.apache.asterix.api.http.server.QueryServiceServlet.Parameter;
import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.IRequestTracker;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
@@ -54,17 +54,20 @@
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
- IStatementExecutorContext executorCtx =
- (IStatementExecutorContext) ctx.get(ServletConstants.RUNNING_QUERIES_ATTR);
- IClientRequest req = executorCtx.get(clientContextId);
+ final IRequestTracker requestTracker = appCtx.getRequestTracker();
+ final IClientRequest req = requestTracker.getByClientContextId(clientContextId);
if (req == null) {
// response: NOT FOUND
response.setStatus(HttpResponseStatus.NOT_FOUND);
return;
}
+ if (!req.isCancellable()) {
+ response.setStatus(HttpResponseStatus.FORBIDDEN);
+ return;
+ }
try {
// Cancels the on-going job.
- req.cancel(appCtx);
+ requestTracker.cancel(req.getId());
// response: OK
response.setStatus(HttpResponseStatus.OK);
} catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 362f924..55f3369 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -20,7 +20,6 @@
package org.apache.asterix.api.http.server;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -33,6 +32,7 @@
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.api.Duration;
import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.api.IRequestReference;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.ExceptionUtils;
@@ -69,10 +69,10 @@
}
@Override
- protected void executeStatement(String statementsText, SessionOutput sessionOutput,
- ResultProperties resultProperties, IStatementExecutor.Stats stats, QueryServiceRequestParameters param,
- RequestExecutionState execution, Map<String, String> optionalParameters,
- Map<String, byte[]> statementParameters) throws Exception {
+ protected void executeStatement(IRequestReference requestReference, String statementsText,
+ SessionOutput sessionOutput, ResultProperties resultProperties, IStatementExecutor.Stats stats,
+ QueryServiceRequestParameters param, RequestExecutionState execution,
+ Map<String, String> optionalParameters, Map<String, byte[]> statementParameters) throws Exception {
// Running on NC -> send 'execute' message to CC
INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker();
@@ -81,9 +81,6 @@
MessageFuture responseFuture = ncMb.registerMessageFuture();
final String handleUrl = getHandleUrl(param.getHost(), param.getPath(), delivery);
try {
- if (param.getClientContextID() == null) {
- param.setClientContextID(UUID.randomUUID().toString());
- }
long timeout = ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
if (param.getTimeout() != null && !param.getTimeout().trim().isEmpty()) {
timeout = TimeUnit.NANOSECONDS.toMillis(Duration.parseDurationStringToNanos(param.getTimeout()));
@@ -91,7 +88,7 @@
ExecuteStatementRequestMessage requestMsg = new ExecuteStatementRequestMessage(ncCtx.getNodeId(),
responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(),
resultProperties.getNcToCcResultProperties(), param.getClientContextID(), handleUrl,
- optionalParameters, statementParameters, param.isMultiStatement());
+ optionalParameters, statementParameters, param.isMultiStatement(), requestReference);
execution.start();
ncMb.sendMessageToPrimaryCC(requestMsg);
try {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 99df372..a34f6b4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -43,6 +43,8 @@
import org.apache.asterix.common.api.Duration;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.api.IReceptionist;
+import org.apache.asterix.common.api.IRequestReference;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -64,7 +66,6 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.ResultProperties;
import org.apache.asterix.translator.SessionConfig;
@@ -93,7 +94,7 @@
private final ILangCompilationProvider compilationProvider;
private final IStatementExecutorFactory statementExecutorFactory;
private final IStorageComponentProvider componentProvider;
- private final IStatementExecutorContext queryCtx;
+ private final IReceptionist receptionist;
protected final IServiceContext serviceCtx;
protected final Function<IServletRequest, Map<String, String>> optionalParamProvider;
protected String hostName;
@@ -107,7 +108,7 @@
this.compilationProvider = compilationProvider;
this.statementExecutorFactory = statementExecutorFactory;
this.componentProvider = componentProvider;
- this.queryCtx = (IStatementExecutorContext) ctx.get(ServletConstants.RUNNING_QUERIES_ATTR);
+ receptionist = appCtx.getReceptionist();
this.serviceCtx = (IServiceContext) ctx.get(ServletConstants.SERVICE_CONTEXT_ATTR);
this.optionalParamProvider = optionalParamProvider;
try {
@@ -345,7 +346,7 @@
pw.print("\t}\n");
}
- private String getOptText(JsonNode node, String fieldName) {
+ protected String getOptText(JsonNode node, String fieldName) {
final JsonNode value = node.get(fieldName);
return value != null ? value.asText() : null;
}
@@ -397,7 +398,7 @@
String contentType = HttpUtil.getContentTypeOnly(request);
if (HttpUtil.ContentType.APPLICATION_JSON.equals(contentType)) {
try {
- setParamFromJSON(request, param);
+ setParamFromJSON(request, param, optionalParams);
} catch (JsonParseException | JsonMappingException e) {
// if the JSON parsing fails, the statement is empty and we get an empty statement error
GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, e.getMessage(), e);
@@ -407,7 +408,8 @@
}
}
- private void setParamFromJSON(IServletRequest request, QueryServiceRequestParameters param) throws IOException {
+ private void setParamFromJSON(IServletRequest request, QueryServiceRequestParameters param,
+ Map<String, String> optionalParameters) throws IOException {
JsonNode jsonRequest = OBJECT_MAPPER.readTree(HttpUtil.getRequestBody(request));
param.setFormat(toLower(getOptText(jsonRequest, Parameter.FORMAT.str())));
param.setPretty(getOptBoolean(jsonRequest, Parameter.PRETTY.str(), false));
@@ -430,6 +432,11 @@
if (jsonRequest.has(statementParam)) {
param.setStatement(jsonRequest.get(statementParam).asText());
}
+ setJsonOptionalParameters(jsonRequest, optionalParameters);
+ }
+
+ protected void setJsonOptionalParameters(JsonNode jsonRequest, Map<String, String> optionalParameters) {
+ // allows extensions to set extra parameters
}
private void setParamFromRequest(IServletRequest request, QueryServiceRequestParameters param) throws IOException {
@@ -503,6 +510,7 @@
}
private void handleRequest(IServletRequest request, IServletResponse response) {
+ final IRequestReference requestRef = receptionist.welcome(request);
long elapsedStart = System.nanoTime();
long errorCount = 1;
Stats stats = new Stats();
@@ -527,7 +535,7 @@
final ResultProperties resultProperties = param.getMaxResultReads() == null ? new ResultProperties(delivery)
: new ResultProperties(delivery, Long.parseLong(param.getMaxResultReads()));
printAdditionalResultFields(sessionOutput.out());
- printRequestId(sessionOutput.out());
+ printRequestId(sessionOutput.out(), requestRef.getUuid());
printClientContextID(sessionOutput.out(), param);
if (!param.isParseOnly()) {
printSignature(sessionOutput.out(), param);
@@ -544,10 +552,9 @@
} else {
Map<String, byte[]> statementParams = org.apache.asterix.app.translator.RequestParameters
.serializeParameterValues(param.getStatementParams());
-
setAccessControlHeaders(request, response);
response.setStatus(execution.getHttpStatus());
- executeStatement(statementsText, sessionOutput, resultProperties, stats, param, execution,
+ executeStatement(requestRef, statementsText, sessionOutput, resultProperties, stats, param, execution,
optionalParams, statementParams);
if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) {
ResultUtil.printStatus(sessionOutput, execution.getResultStatus());
@@ -594,10 +601,10 @@
return parseOnlyResult;
}
- protected void executeStatement(String statementsText, SessionOutput sessionOutput,
- ResultProperties resultProperties, Stats stats, QueryServiceRequestParameters param,
- RequestExecutionState execution, Map<String, String> optionalParameters,
- Map<String, byte[]> statementParameters) throws Exception {
+ protected void executeStatement(IRequestReference requestReference, String statementsText,
+ SessionOutput sessionOutput, ResultProperties resultProperties, Stats stats,
+ QueryServiceRequestParameters param, RequestExecutionState execution,
+ Map<String, String> optionalParameters, Map<String, byte[]> statementParameters) throws Exception {
IClusterManagementWork.ClusterState clusterState =
((ICcApplicationContext) appCtx).getClusterStateManager().getState();
if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
@@ -612,10 +619,10 @@
execution.start();
Map<String, IAObject> stmtParams =
org.apache.asterix.app.translator.RequestParameters.deserializeParameterValues(statementParameters);
- IRequestParameters requestParameters =
- new org.apache.asterix.app.translator.RequestParameters(getResultSet(), resultProperties, stats, null,
- param.getClientContextID(), optionalParameters, stmtParams, param.isMultiStatement());
- translator.compileAndExecute(getHyracksClientConnection(), queryCtx, requestParameters);
+ IRequestParameters requestParameters = new org.apache.asterix.app.translator.RequestParameters(requestReference,
+ statementsText, getResultSet(), resultProperties, stats, null, param.getClientContextID(),
+ optionalParameters, stmtParams, param.isMultiStatement());
+ translator.compileAndExecute(getHyracksClientConnection(), requestParameters);
execution.end();
printExecutionPlans(sessionOutput, translator.getExecutionPlans());
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index 0aa2211..520f85a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -26,6 +26,7 @@
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.app.translator.RequestParameters;
+import org.apache.asterix.common.api.IRequestReference;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -176,16 +177,16 @@
response.setHeader("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept");
SessionOutput sessionOutput = initResponse(request, response);
QueryTranslator.ResultDelivery resultDelivery = whichResultDelivery(request);
- doHandle(response, query, sessionOutput, resultDelivery);
+ final IRequestReference requestReference = appCtx.getReceptionist().welcome(request);
+ doHandle(requestReference, response, query, sessionOutput, resultDelivery);
} catch (Exception e) {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
LOGGER.log(Level.WARN, "Failure handling request", e);
- return;
}
}
- private void doHandle(IServletResponse response, String query, SessionOutput sessionOutput,
- ResultDelivery resultDelivery) throws JsonProcessingException {
+ private void doHandle(IRequestReference requestReference, IServletResponse response, String query,
+ SessionOutput sessionOutput, ResultDelivery resultDelivery) throws JsonProcessingException {
try {
response.setStatus(HttpResponseStatus.OK);
IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
@@ -196,9 +197,9 @@
IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, sessionOutput,
compilationProvider, componentProvider);
final IResultSet resultSet = ServletUtil.getResultSet(hcc, appCtx, ctx);
- final IRequestParameters requestParameters = new RequestParameters(resultSet,
+ final IRequestParameters requestParameters = new RequestParameters(requestReference, query, resultSet,
new ResultProperties(resultDelivery), new IStatementExecutor.Stats(), null, null, null, null, true);
- translator.compileAndExecute(hcc, null, requestParameters);
+ translator.compileAndExecute(hcc, requestParameters);
} catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, pe.getMessage(), pe);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index 71b4b81..a5c8645 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -22,9 +22,11 @@
import java.io.Reader;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.asterix.api.common.APIFramework;
import org.apache.asterix.app.translator.RequestParameters;
+import org.apache.asterix.common.api.RequestReference;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.utils.Job;
@@ -113,7 +115,8 @@
while ((ch = queryText.read()) != -1) {
builder.append((char) ch);
}
- IParser parser = parserFactory.createParser(builder.toString());
+ String statement = builder.toString();
+ IParser parser = parserFactory.createParser(statement);
List<Statement> statements = parser.parse();
MetadataManager.INSTANCE.init();
@@ -126,10 +129,12 @@
IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, output, compilationProvider,
storageComponentProvider);
- final IRequestParameters requestParameters =
- new RequestParameters(null, new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE),
- new IStatementExecutor.Stats(), null, null, null, statementParams, true);
- translator.compileAndExecute(hcc, null, requestParameters);
+ final RequestReference requestReference =
+ RequestReference.of(UUID.randomUUID().toString(), "CC", System.currentTimeMillis());
+ final IRequestParameters requestParameters = new RequestParameters(requestReference, statement, null,
+ new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE), new IStatementExecutor.Stats(), null,
+ null, null, statementParams, true);
+ translator.compileAndExecute(hcc, requestParameters);
writer.flush();
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
index 9d15131..3ae18a7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java
@@ -46,10 +46,7 @@
@Override
public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
- CCApplication application = (CCApplication) ccs.getApplication();
- IStatementExecutorContext executorsCtx = application.getStatementExecutorContext();
- final Collection<IClientRequest> runningRequests = executorsCtx.getRunningRequests().values();
+ final Collection<IClientRequest> runningRequests = appCtx.getRequestTracker().getRunningRequests();
final String[] requests = runningRequests.stream().map(IClientRequest::toJson).toArray(String[]::new);
ActiveRequestsResponse response = new ActiveRequestsResponse(reqId, requests);
CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
index 943aad3..e4cef7f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -19,14 +19,12 @@
package org.apache.asterix.app.message;
import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.IRequestTracker;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.utils.RequestStatus;
-import org.apache.asterix.hyracks.bootstrap.CCApplication;
import org.apache.asterix.messaging.CCMessageBroker;
-import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -47,23 +45,24 @@
@Override
public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
- CCApplication application = (CCApplication) ccs.getApplication();
- IStatementExecutorContext executorsCtx = application.getStatementExecutorContext();
- IClientRequest req = executorsCtx.get(contextId);
+ final IRequestTracker requestTracker = appCtx.getRequestTracker();
+ IClientRequest req = requestTracker.getByClientContextId(contextId);
RequestStatus status;
if (req == null) {
LOGGER.log(Level.WARN, "No job found for context id " + contextId);
status = RequestStatus.NOT_FOUND;
} else {
- try {
- req.cancel(appCtx);
- executorsCtx.remove(contextId);
- status = RequestStatus.SUCCESS;
- } catch (Exception e) {
- LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e);
- status = RequestStatus.FAILED;
+ if (!req.isCancellable()) {
+ status = RequestStatus.REJECTED;
+ } else {
+ try {
+ requestTracker.cancel(req.getId());
+ status = RequestStatus.SUCCESS;
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e);
+ status = RequestStatus.FAILED;
+ }
}
}
CancelQueryResponse response = new CancelQueryResponse(reqId, status);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 94d63a4..fdc4432 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -30,6 +30,7 @@
import org.apache.asterix.app.cc.CCExtensionManager;
import org.apache.asterix.app.translator.RequestParameters;
import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.api.IRequestReference;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -80,11 +81,12 @@
private final Map<String, String> optionalParameters;
private final Map<String, byte[]> statementParameters;
private final boolean multiStatement;
+ private final IRequestReference requestReference;
public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang,
String statementsText, SessionConfig sessionConfig, ResultProperties resultProperties,
String clientContextID, String handleUrl, Map<String, String> optionalParameters,
- Map<String, byte[]> statementParameters, boolean multiStatement) {
+ Map<String, byte[]> statementParameters, boolean multiStatement, IRequestReference requestReference) {
this.requestNodeId = requestNodeId;
this.requestMessageId = requestMessageId;
this.lang = lang;
@@ -96,6 +98,7 @@
this.optionalParameters = optionalParameters;
this.statementParameters = statementParameters;
this.multiStatement = multiStatement;
+ this.requestReference = requestReference;
}
@Override
@@ -113,7 +116,6 @@
ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(lang);
IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider();
IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory();
- IStatementExecutorContext statementExecutorContext = ccApp.getStatementExecutorContext();
ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
try {
IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
@@ -132,9 +134,10 @@
compilationProvider, storageComponentProvider);
final IStatementExecutor.Stats stats = new IStatementExecutor.Stats();
Map<String, IAObject> stmtParams = RequestParameters.deserializeParameterValues(statementParameters);
- final IRequestParameters requestParameters = new RequestParameters(null, resultProperties, stats,
- outMetadata, clientContextID, optionalParameters, stmtParams, multiStatement);
- translator.compileAndExecute(ccApp.getHcc(), statementExecutorContext, requestParameters);
+ final IRequestParameters requestParameters =
+ new RequestParameters(requestReference, statementsText, null, resultProperties, stats, outMetadata,
+ clientContextID, optionalParameters, stmtParams, multiStatement);
+ translator.compileAndExecute(ccApp.getHcc(), requestParameters);
outPrinter.close();
responseMsg.setResult(outWriter.toString());
responseMsg.setMetadata(outMetadata);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 3e72b7d..724691c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -36,6 +36,8 @@
import org.apache.asterix.common.api.IDatasetMemoryManager;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.api.IPropertiesFactory;
+import org.apache.asterix.common.api.IReceptionist;
+import org.apache.asterix.common.api.IReceptionistFactory;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ActiveProperties;
import org.apache.asterix.common.config.AsterixExtension;
@@ -146,6 +148,7 @@
private IHyracksClientConnection hcc;
private IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
private IReplicaManager replicaManager;
+ private IReceptionist receptionist;
public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions,
IPropertiesFactory propertiesFactory) throws AsterixException, InstantiationException,
@@ -175,7 +178,8 @@
}
@Override
- public void initialize(IRecoveryManagerFactory recoveryManagerFactory, boolean initialRun) throws IOException {
+ public void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptionistFactory receptionistFactory,
+ boolean initialRun) throws IOException {
ioManager = getServiceContext().getIoManager();
threadExecutor =
MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
@@ -215,6 +219,7 @@
activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(),
this.ncServiceContext);
+ receptionist = receptionistFactory.create();
if (replicationProperties.isReplicationEnabled()) {
replicationManager = new ReplicationManager(this, replicationProperties);
@@ -533,4 +538,9 @@
public IPersistedResourceRegistry getPersistedResourceRegistry() {
return persistedResourceRegistry;
}
+
+ @Override
+ public IReceptionist getReceptionist() {
+ return receptionist;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b0f3287..2a1ed4f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -52,7 +52,9 @@
import org.apache.asterix.app.active.FeedEventsListener;
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.common.api.IClientRequest;
import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.api.IRequestTracker;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -150,7 +152,7 @@
import org.apache.asterix.om.types.TypeSignature;
import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
import org.apache.asterix.translator.AbstractLangTranslator;
-import org.apache.asterix.translator.ClientJobRequest;
+import org.apache.asterix.translator.ClientRequest;
import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
@@ -160,7 +162,6 @@
import org.apache.asterix.translator.ExecutionPlansHtmlPrintUtil;
import org.apache.asterix.translator.IRequestParameters;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.NoOpStatementExecutorContext;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionOutput;
@@ -261,30 +262,24 @@
}
@Override
- public void compileAndExecute(IHyracksClientConnection hcc, IStatementExecutorContext ctx,
- IRequestParameters requestParameters) throws Exception {
+ public void compileAndExecute(IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception {
if (!requestParameters.isMultiStatement()) {
validateStatements(statements);
}
+ trackRequest(requestParameters);
int resultSetIdCounter = 0;
FileSplit outputFile = null;
IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE;
- /*
- * Since the system runs a large number of threads, when HTTP requests don't
- * return, it becomes difficult to find the thread running the request to
- * determine where it has stopped. Setting the thread name helps make that
- * easier
- */
String threadName = Thread.currentThread().getName();
- Thread.currentThread().setName(QueryTranslator.class.getSimpleName());
+ Thread.currentThread().setName(
+ QueryTranslator.class.getSimpleName() + ":" + requestParameters.getRequestReference().getUuid());
Map<String, String> config = new HashMap<>();
final IResultSet resultSet = requestParameters.getResultSet();
final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery();
final long maxResultReads = requestParameters.getResultProperties().getMaxReads();
final Stats stats = requestParameters.getStats();
final ResultMetadata outMetadata = requestParameters.getOutMetadata();
- final String clientContextId = requestParameters.getClientContextId();
final Map<String, IAObject> stmtParams = requestParameters.getStatementParameters();
try {
for (Statement stmt : statements) {
@@ -354,7 +349,7 @@
metadataProvider.setMaxResultReads(maxResultReads);
}
handleInsertUpsertStatement(metadataProvider, stmt, hcc, resultSet, resultDelivery, outMetadata,
- stats, false, clientContextId, stmtParams, stmtRewriter);
+ stats, false, requestParameters, stmtParams, stmtRewriter);
break;
case DELETE:
handleDeleteStatement(metadataProvider, stmt, hcc, false, stmtParams, stmtRewriter);
@@ -389,7 +384,7 @@
resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED);
metadataProvider.setMaxResultReads(maxResultReads);
handleQuery(metadataProvider, (Query) stmt, hcc, resultSet, resultDelivery, outMetadata, stats,
- clientContextId, ctx, stmtParams, stmtRewriter);
+ requestParameters, stmtParams, stmtRewriter);
break;
case COMPACT:
handleCompactStatement(metadataProvider, stmt, hcc);
@@ -406,8 +401,9 @@
// No op
break;
case EXTENSION:
+ //TODO remove deprecated statement executor context
((ExtensionStatement) stmt).handle(hcc, this, requestParameters, metadataProvider,
- resultSetIdCounter, ctx);
+ resultSetIdCounter, NoOpStatementExecutorContext.INSTANCE);
break;
default:
throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, stmt.getSourceLocation(),
@@ -415,6 +411,10 @@
}
}
} finally {
+ // async queries are completed after their job completes
+ if (ResultDelivery.ASYNC != resultDelivery) {
+ appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid());
+ }
Thread.currentThread().setName(threadName);
}
}
@@ -1856,7 +1856,7 @@
public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery,
- ResultMetadata outMetadata, Stats stats, boolean compileOnly, String clientContextId,
+ ResultMetadata outMetadata, Stats stats, boolean compileOnly, IRequestParameters requestParameters,
Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) throws Exception {
InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
String dataverseName = getActiveDataverse(stmtInsertUpsert.getDataverseName());
@@ -1901,7 +1901,7 @@
if (stmtInsertUpsert.getReturnExpression() != null) {
deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
- clientContextId, NoOpStatementExecutorContext.INSTANCE);
+ requestParameters, false);
} else {
locker.lock();
try {
@@ -2454,8 +2454,8 @@
protected void handleQuery(MetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
IResultSet resultSet, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats,
- String clientContextId, IStatementExecutorContext ctx, Map<String, IAObject> stmtParams,
- IStatementRewriter stmtRewriter) throws Exception {
+ IRequestParameters requestParameters, Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter)
+ throws Exception {
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() {
@@ -2488,19 +2488,19 @@
}
};
deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats,
- clientContextId, ctx);
+ requestParameters, true);
}
private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler,
MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery,
- ResultMetadata outMetadata, Stats stats, String clientContextId, IStatementExecutorContext ctx)
+ ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable)
throws Exception {
final ResultSetId resultSetId = metadataProvider.getResultSetId();
switch (resultDelivery) {
case ASYNC:
MutableBoolean printed = new MutableBoolean(false);
executorService.submit(() -> asyncCreateAndRunJob(hcc, compiler, locker, resultDelivery,
- clientContextId, ctx, resultSetId, printed));
+ requestParameters, cancellable, resultSetId, printed));
synchronized (printed) {
while (!printed.booleanValue()) {
printed.wait();
@@ -2515,7 +2515,7 @@
sessionOutput.release();
ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
metadataProvider.findOutputRecordType());
- }, clientContextId, ctx);
+ }, requestParameters, cancellable, appCtx);
break;
case DEFERRED:
createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
@@ -2525,7 +2525,7 @@
outMetadata.getResultSets()
.add(Triple.of(id, resultSetId, metadataProvider.findOutputRecordType()));
}
- }, clientContextId, ctx);
+ }, requestParameters, cancellable, appCtx);
break;
default:
break;
@@ -2552,7 +2552,7 @@
}
private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker,
- ResultDelivery resultDelivery, String clientContextId, IStatementExecutorContext ctx,
+ ResultDelivery resultDelivery, IRequestParameters requestParameters, boolean cancellable,
ResultSetId resultSetId, MutableBoolean printed) {
Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID);
try {
@@ -2564,7 +2564,7 @@
printed.setTrue();
printed.notify();
}
- }, clientContextId, ctx);
+ }, requestParameters, cancellable, appCtx);
} catch (Exception e) {
if (Objects.equals(JobId.INVALID, jobId.getValue())) {
// compilation failed
@@ -2595,8 +2595,10 @@
private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer,
- String clientContextId, IStatementExecutorContext ctx) throws Exception {
- ClientJobRequest req = null;
+ IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx) throws Exception {
+ final IRequestTracker requestTracker = appCtx.getRequestTracker();
+ final ClientRequest clientRequest =
+ (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid());
locker.lock();
try {
final JobSpecification jobSpec = compiler.compile();
@@ -2604,9 +2606,9 @@
return;
}
final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
- if (ctx != null && clientContextId != null) {
- req = new ClientJobRequest(ctx, clientContextId, jobId);
- ctx.put(clientContextId, req); // Adds the running job into the context.
+ clientRequest.setJobId(jobId);
+ if (cancellable) {
+ clientRequest.markCancellable();
}
if (jId != null) {
jId.setValue(jobId);
@@ -2619,11 +2621,11 @@
printer.print(jobId);
}
} finally {
- locker.unlock();
- // No matter the job succeeds or fails, removes it into the context.
- if (req != null) {
- req.complete();
+ // complete async jobs after their job completes
+ if (ResultDelivery.ASYNC == resultDelivery) {
+ requestTracker.complete(clientRequest.getId());
}
+ locker.unlock();
}
}
@@ -2941,6 +2943,13 @@
}
}
+ protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException {
+ final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(
+ requestParameters.getRequestReference(), requestParameters.getClientContextId(),
+ requestParameters.getStatement(), requestParameters.getOptionalParameters());
+ appCtx.getRequestTracker().track(clientRequest);
+ }
+
public static void validateStatements(List<Statement> statements) throws CompilationException {
if (statements.stream().filter(QueryTranslator::isNotAllowedMultiStatement).count() > 1) {
throw new CompilationException(ErrorCode.UNSUPPORTED_MULTIPLE_STATEMENTS);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
index d0adcda..eda8a4a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Map;
+import org.apache.asterix.common.api.IRequestReference;
import org.apache.asterix.external.parser.JSONDataParser;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.om.base.IAObject;
@@ -41,6 +42,7 @@
public class RequestParameters implements IRequestParameters {
+ private final IRequestReference requestReference;
private final IResultSet resultSet;
private final ResultProperties resultProperties;
private final Stats stats;
@@ -49,10 +51,14 @@
private final String clientContextId;
private final Map<String, IAObject> statementParameters;
private final boolean multiStatement;
+ private final String statement;
- public RequestParameters(IResultSet resultSet, ResultProperties resultProperties, Stats stats,
- IStatementExecutor.ResultMetadata outMetadata, String clientContextId,
- Map<String, String> optionalParameters, Map<String, IAObject> statementParameters, boolean multiStatement) {
+ public RequestParameters(IRequestReference requestReference, String statement, IResultSet resultSet,
+ ResultProperties resultProperties, Stats stats, IStatementExecutor.ResultMetadata outMetadata,
+ String clientContextId, Map<String, String> optionalParameters, Map<String, IAObject> statementParameters,
+ boolean multiStatement) {
+ this.requestReference = requestReference;
+ this.statement = statement;
this.resultSet = resultSet;
this.resultProperties = resultProperties;
this.stats = stats;
@@ -103,6 +109,16 @@
return statementParameters;
}
+ @Override
+ public String getStatement() {
+ return statement;
+ }
+
+ @Override
+ public IRequestReference getRequestReference() {
+ return requestReference;
+ }
+
public static Map<String, byte[]> serializeParameterValues(Map<String, JsonNode> inParams)
throws HyracksDataException {
if (inParams == null || inParams.isEmpty()) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 24a1463..ce98a03 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -33,7 +33,6 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.asterix.api.http.IQueryWebServerRegistrant;
-import org.apache.asterix.api.http.ctx.StatementExecutorContext;
import org.apache.asterix.api.http.server.ActiveStatsApiServlet;
import org.apache.asterix.api.http.server.ApiServlet;
import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
@@ -55,6 +54,8 @@
import org.apache.asterix.app.replication.NcLifecycleCoordinator;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.INodeJobTracker;
+import org.apache.asterix.common.api.IReceptionistFactory;
+import org.apache.asterix.translator.Receptionist;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.AsterixExtension;
import org.apache.asterix.common.config.ExtensionProperties;
@@ -77,7 +78,6 @@
import org.apache.asterix.metadata.lock.MetadataLockManager;
import org.apache.asterix.runtime.job.resource.JobCapacityController;
import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.util.MetadataBuiltinFunctions;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -110,7 +110,6 @@
protected ICCServiceContext ccServiceCtx;
protected CCExtensionManager ccExtensionManager;
protected IStorageComponentProvider componentProvider;
- protected StatementExecutorContext statementExecutorCtx;
protected WebManager webManager;
protected ICcApplicationContext appCtx;
private IJobCapacityController jobCapacityController;
@@ -154,8 +153,8 @@
extensions.addAll(getExtensions());
ccExtensionManager = new CCExtensionManager(extensions);
IGlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager();
- statementExecutorCtx = new StatementExecutorContext();
- appCtx = createApplicationContext(libraryManager, globalRecoveryManager, lifecycleCoordinator);
+ appCtx = createApplicationContext(libraryManager, globalRecoveryManager, lifecycleCoordinator,
+ () -> new Receptionist("CC"));
appCtx.setExtensionManager(ccExtensionManager);
final CCConfig ccConfig = controllerService.getCCConfig();
if (System.getProperty("java.rmi.server.hostname") == null) {
@@ -182,11 +181,11 @@
}
protected ICcApplicationContext createApplicationContext(ILibraryManager libraryManager,
- IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator lifecycleCoordinator)
- throws AlgebricksException, IOException {
+ IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator lifecycleCoordinator,
+ IReceptionistFactory receptionistFactory) throws AlgebricksException, IOException {
return new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE,
globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider,
- new MetadataLockManager());
+ new MetadataLockManager(), receptionistFactory);
}
protected IGlobalRecoveryManager createGlobalRecoveryManager() throws Exception {
@@ -243,7 +242,6 @@
jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx);
jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
ccServiceCtx.getControllerService().getExecutor());
- jsonAPIServer.setAttribute(ServletConstants.RUNNING_QUERIES_ATTR, statementExecutorCtx);
jsonAPIServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, ccServiceCtx);
// Other APIs.
@@ -322,10 +320,6 @@
return ccExtensionManager.getStatementExecutorFactory(ccServiceCtx.getControllerService().getExecutor());
}
- public IStatementExecutorContext getStatementExecutorContext() {
- return statementExecutorCtx;
- }
-
@Override
public IJobCapacityController getJobCapacityController() {
return jobCapacityController;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 3857ac5..1112507 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -34,6 +34,8 @@
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.api.IPropertiesFactory;
+import org.apache.asterix.common.api.IReceptionistFactory;
+import org.apache.asterix.translator.Receptionist;
import org.apache.asterix.common.config.AsterixExtension;
import org.apache.asterix.common.config.ExternalProperties;
import org.apache.asterix.common.config.GlobalConfig;
@@ -128,7 +130,8 @@
}
updateOnNodeJoin();
}
- runtimeContext.initialize(getRecoveryManagerFactory(), runtimeContext.getNodeProperties().isInitialRun());
+ runtimeContext.initialize(getRecoveryManagerFactory(), getReceptionistFactory(),
+ runtimeContext.getNodeProperties().isInitialRun());
MessagingProperties messagingProperties = runtimeContext.getMessagingProperties();
NCMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties);
this.ncServiceCtx.setMessageBroker(messageBroker);
@@ -155,6 +158,10 @@
return RecoveryManager::new;
}
+ protected IReceptionistFactory getReceptionistFactory() {
+ return () -> new Receptionist(nodeId);
+ }
+
@Override
protected void configureLoggingLevel(Level level) {
super.configureLoggingLevel(level);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index eae82af..1a526b2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -25,14 +25,15 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.asterix.api.http.ctx.StatementExecutorContext;
import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
import org.apache.asterix.api.http.server.ServletConstants;
+import org.apache.asterix.common.api.RequestReference;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.translator.ClientJobRequest;
-import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.asterix.runtime.utils.RequestTracker;
+import org.apache.asterix.translator.ClientRequest;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.http.api.IServletRequest;
@@ -49,15 +50,14 @@
@Test
public void testDelete() throws Exception {
ICcApplicationContext appCtx = mock(ICcApplicationContext.class);
+ RequestTracker tracker = new RequestTracker(appCtx);
+ Mockito.when(appCtx.getRequestTracker()).thenReturn(tracker);
// Creates a query cancellation servlet.
CcQueryCancellationServlet cancellationServlet =
new CcQueryCancellationServlet(new ConcurrentHashMap<>(), appCtx, new String[] { "/" });
// Adds mocked Hyracks client connection into the servlet context.
IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class);
cancellationServlet.ctx().put(ServletConstants.HYRACKS_CONNECTION_ATTR, mockHcc);
- // Adds a query context into the servlet context.
- IStatementExecutorContext queryCtx = new StatementExecutorContext();
- cancellationServlet.ctx().put(ServletConstants.RUNNING_QUERIES_ATTR, queryCtx);
Mockito.when(appCtx.getHcc()).thenReturn(mockHcc);
// Tests the case that query is not in the map.
IServletRequest mockRequest = mockRequest("1");
@@ -65,8 +65,12 @@
cancellationServlet.handle(mockRequest, mockResponse);
verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND);
+ final RequestReference requestReference = RequestReference.of("1", "node1", System.currentTimeMillis());
+ ClientRequest request = new ClientRequest(requestReference, "1", "select 1;", new HashMap<>());
+ request.setJobId(new JobId(1));
+ request.markCancellable();
+ tracker.track(request);
// Tests the case that query is in the map.
- queryCtx.put("1", new ClientJobRequest(queryCtx, "1", new JobId(1)));
cancellationServlet.handle(mockRequest, mockResponse);
verify(mockResponse, times(1)).setStatus(HttpResponseStatus.OK);
@@ -76,7 +80,11 @@
verify(mockResponse, times(1)).setStatus(HttpResponseStatus.BAD_REQUEST);
// Tests the case that the job cancellation hit some exception from Hyracks.
- queryCtx.put("2", new ClientJobRequest(queryCtx, "2", new JobId(2)));
+ final RequestReference requestReference2 = RequestReference.of("2", "node1", System.currentTimeMillis());
+ ClientRequest request2 = new ClientRequest(requestReference2, "2", "select 1;", new HashMap<>());
+ request2.setJobId(new JobId(2));
+ request2.markCancellable();
+ tracker.track(request2);
Mockito.doThrow(new Exception()).when(mockHcc).cancelJob(any());
mockRequest = mockRequest("2");
cancellationServlet.handle(mockRequest, mockResponse);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index 48d11ee..2a945e9 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -70,7 +70,7 @@
Thread.sleep(10);
// Cancels the query request while the query is executing.
int rc = cancelQuery(getEndpoint(Servlets.RUNNING_REQUESTS), newParams);
- Assert.assertTrue(rc == 200 || rc == 404);
+ Assert.assertTrue(rc == 200 || rc == 404 || rc == 403);
if (rc == 200) {
break;
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
index 3a29ef2..7eee42e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp
@@ -18,5 +18,5 @@
*/
-- param client_context_id=ensure_running_query
-- polltimeoutsecs=15
-SELECT VALUE rqst FROM active_requests() rqst
+SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM active_requests() rqst
WHERE rqst.clientContextID = 'sleep_async_query';
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
index 92f4746..e31fe3b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex
@@ -1 +1 @@
-/\{ "clientContextID": "sleep_async_query", "jobId": "JID:.*", "requestTime": ".*" \}/
\ No newline at end of file
+/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*" \}/
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index 6b5b472..9e5801f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -76,4 +76,6 @@
* @return the cluster coordination service.
*/
ICoordinationService getCoordinationService();
+
+ IReceptionist getReceptionist();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
index 53771d5..430cd2a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -24,11 +24,35 @@
public interface IClientRequest {
/**
+ * A system wide unique id representing this {@link IClientRequest}
+ *
+ * @return the system request id
+ */
+ String getId();
+
+ /**
+ * A user supplied id representing this {@link IClientRequest}
+ *
+ * @return the client supplied request id
+ */
+ String getClientContextId();
+
+ /**
* Mark the request as complete, non-cancellable anymore
*/
void complete();
/**
+ * Mark the request as cancellable
+ */
+ void markCancellable();
+
+ /**
+ * @return true if the request can be cancelled. Otherwise false.
+ */
+ boolean isCancellable();
+
+ /**
* Cancel a request
*
* @param appCtx
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8648c5b..c6e7439 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -67,8 +67,8 @@
IResourceIdFactory getResourceIdFactory();
- void initialize(IRecoveryManagerFactory recoveryManagerFactory, boolean initialRun)
- throws IOException, AlgebricksException;
+ void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptionistFactory receptionistFactory,
+ boolean initialRun) throws IOException, AlgebricksException;
void setShuttingdown(boolean b);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
new file mode 100644
index 0000000..51df306
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.http.api.IServletRequest;
+
+public interface IReceptionist {
+
+ /**
+ * Generates a request reference based on {@code request}
+ *
+ * @param request
+ * @return a request reference representing the request
+ */
+ IRequestReference welcome(IServletRequest request);
+
+ /**
+ * Generates a {@link IClientRequest} based on the requests parameters
+ *
+ * @param requestRef
+ * @param clientContextId
+ * @param statement
+ * @param getOptionalParameters
+ * @return A client request
+ * @throws HyracksDataException
+ */
+ IClientRequest requestReceived(IRequestReference requestRef, String clientContextId, String statement,
+ Map<String, String> getOptionalParameters) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionistFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionistFactory.java
new file mode 100644
index 0000000..6784f26
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionistFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+@FunctionalInterface
+public interface IReceptionistFactory {
+
+ /**
+ * Creates a {@link IReceptionist}
+ *
+ * @return a receptionist
+ */
+ IReceptionist create();
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestReference.java
new file mode 100644
index 0000000..8a25ed2
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestReference.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.io.Serializable;
+
+public interface IRequestReference extends Serializable {
+
+ /**
+ * Gets the system wide unique request id.
+ *
+ * @return the requests id.
+ */
+ String getUuid();
+
+ /**
+ * Get the node name which received this requests.
+ *
+ * @return the node name
+ */
+ String getNode();
+
+ /**
+ * Gets the system time at which the request was received.
+ *
+ * @return the time at which the request was received.
+ */
+ long getTime();
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
new file mode 100644
index 0000000..01bcf82
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.util.Collection;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IRequestTracker {
+
+ /**
+ * Starts tracking {@code request}
+ *
+ * @param request
+ */
+ void track(IClientRequest request);
+
+ /**
+ * Gets a client request by {@code requestId}
+ *
+ * @param requestId
+ * @return the client request if found. Otherwise null.
+ */
+ IClientRequest get(String requestId);
+
+ /**
+ * Gets a client request by {@code clientContextId}
+ *
+ * @param clientContextId
+ * @return the client request if found. Otherwise null.
+ */
+ IClientRequest getByClientContextId(String clientContextId);
+
+ /**
+ * Cancels the client request with id {@code requestId} if found.
+ *
+ * @param requestId
+ * @throws HyracksDataException
+ */
+ void cancel(String requestId) throws HyracksDataException;
+
+ /**
+ * Completes the request with id {@code requestId}
+ *
+ * @param requestId
+ */
+ void complete(String requestId);
+
+ /**
+ * Gets the currently running requests
+ *
+ * @return the currently running requests
+ */
+ Collection<IClientRequest> getRunningRequests();
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/RequestReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/RequestReference.java
new file mode 100644
index 0000000..eb08c09
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/RequestReference.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import org.apache.hyracks.util.JSONUtil;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class RequestReference implements IRequestReference {
+
+ private static final long serialVersionUID = 1L;
+ private String uuid;
+ private String node;
+ private long time;
+ private String userAgent;
+ private String remoteAddr;
+
+ private RequestReference(String uuid, String node, long time) {
+ this.uuid = uuid;
+ this.node = node;
+ this.time = time;
+ }
+
+ public static RequestReference of(String uuid, String node, long time) {
+ return new RequestReference(uuid, node, time);
+ }
+
+ @Override
+ public String getUuid() {
+ return uuid;
+ }
+
+ @Override
+ public long getTime() {
+ return time;
+ }
+
+ public String getNode() {
+ return node;
+ }
+
+ public String getUserAgent() {
+ return userAgent;
+ }
+
+ public void setUserAgent(String userAgent) {
+ this.userAgent = userAgent;
+ }
+
+ public void setRemoteAddr(String remoteAddr) {
+ this.remoteAddr = remoteAddr;
+ }
+
+ public String getRemoteAddr() {
+ return remoteAddr;
+ }
+
+ @Override
+ public String toString() {
+ final ObjectNode object = JSONUtil.createObject();
+ object.put("uuid", uuid);
+ object.put("node", node);
+ object.put("time", time);
+ object.put("userAgent", userAgent);
+ object.put("remoteAddr", remoteAddr);
+ return JSONUtil.convertNodeOrThrow(object);
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 81ae3e1..e4b70f6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -21,6 +21,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.api.INodeJobTracker;
+import org.apache.asterix.common.api.IRequestTracker;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.ExtensionProperties;
@@ -133,4 +134,11 @@
* @return the compression manager
*/
ICompressionManager getCompressionManager();
+
+ /**
+ * Gets the request tracker.
+ *
+ * @return the request tracker.
+ */
+ IRequestTracker getRequestTracker();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/RequestStatus.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/RequestStatus.java
index 52dfe90..741af83 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/RequestStatus.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/RequestStatus.java
@@ -23,7 +23,8 @@
public enum RequestStatus {
SUCCESS,
FAILED,
- NOT_FOUND;
+ NOT_FOUND,
+ REJECTED;
public HttpResponseStatus toHttpResponse() {
switch (this) {
@@ -33,6 +34,8 @@
return HttpResponseStatus.INTERNAL_SERVER_ERROR;
case NOT_FOUND:
return HttpResponseStatus.NOT_FOUND;
+ case REJECTED:
+ return HttpResponseStatus.FORBIDDEN;
default:
throw new IllegalStateException("Unrecognized status: " + this);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 48463e8..b92a15e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -24,6 +24,9 @@
import org.apache.asterix.common.api.ICoordinationService;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.api.INodeJobTracker;
+import org.apache.asterix.common.api.IReceptionist;
+import org.apache.asterix.common.api.IReceptionistFactory;
+import org.apache.asterix.common.api.IRequestTracker;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.ActiveProperties;
@@ -90,12 +93,15 @@
private final INodeJobTracker nodeJobTracker;
private final ITxnIdFactory txnIdFactory;
private final ICompressionManager compressionManager;
+ private final IReceptionist receptionist;
+ private final IRequestTracker requestTracker;
public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy,
IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider,
- IMetadataLockManager mdLockManager) throws AlgebricksException, IOException {
+ IMetadataLockManager mdLockManager, IReceptionistFactory receptionistFactory)
+ throws AlgebricksException, IOException {
this.ccServiceCtx = ccServiceCtx;
this.hcc = hcc;
this.libraryManager = libraryManager;
@@ -125,7 +131,8 @@
nodeJobTracker = new NodeJobTracker();
txnIdFactory = new BulkTxnIdFactory();
compressionManager = new CompressionManager(storageProperties);
-
+ receptionist = receptionistFactory.create();
+ requestTracker = new RequestTracker(this);
}
@Override
@@ -283,4 +290,14 @@
public ICompressionManager getCompressionManager() {
return compressionManager;
}
+
+ @Override
+ public IReceptionist getReceptionist() {
+ return receptionist;
+ }
+
+ @Override
+ public IRequestTracker getRequestTracker() {
+ return requestTracker;
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
new file mode 100644
index 0000000..f651eb36
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.utils;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.api.IRequestTracker;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class RequestTracker implements IRequestTracker {
+
+ private final Map<String, IClientRequest> runningRequests = new ConcurrentHashMap<>();
+ private final Map<String, IClientRequest> clientIdRequests = new ConcurrentHashMap<>();
+ private final ICcApplicationContext ccAppCtx;
+
+ public RequestTracker(ICcApplicationContext ccAppCtx) {
+ this.ccAppCtx = ccAppCtx;
+ }
+
+ @Override
+ public IClientRequest get(String requestId) {
+ return runningRequests.get(requestId);
+ }
+
+ @Override
+ public IClientRequest getByClientContextId(String clientContextId) {
+ return clientIdRequests.get(clientContextId);
+ }
+
+ @Override
+ public void track(IClientRequest request) {
+ runningRequests.put(request.getId(), request);
+ if (request.getClientContextId() != null) {
+ clientIdRequests.put(request.getClientContextId(), request);
+ }
+ }
+
+ @Override
+ public void cancel(String requestId) throws HyracksDataException {
+ final IClientRequest request = runningRequests.get(requestId);
+ if (request == null) {
+ return;
+ }
+ if (!request.isCancellable()) {
+ throw new IllegalStateException("Request " + request.getId() + " cannot be cancelled");
+ }
+ cancel(request);
+ }
+
+ @Override
+ public void complete(String requestId) {
+ final IClientRequest request = runningRequests.get(requestId);
+ if (request != null) {
+ request.complete();
+ untrack(request);
+ }
+ }
+
+ @Override
+ public synchronized Collection<IClientRequest> getRunningRequests() {
+ return Collections.unmodifiableCollection(runningRequests.values());
+ }
+
+ private void cancel(IClientRequest request) throws HyracksDataException {
+ request.cancel(ccAppCtx);
+ untrack(request);
+ }
+
+ private void untrack(IClientRequest request) {
+ runningRequests.remove(request.getId());
+ final String clientContextId = request.getClientContextId();
+ if (clientContextId != null) {
+ clientIdRequests.remove(request.getClientContextId());
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java
index 006659b..7decbe0 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java
@@ -63,6 +63,14 @@
return PRETTY_SORTED_WRITER.writeValueAsString(SORTED_MAPPER.treeToValue(node, Object.class));
}
+ public static String convertNodeOrThrow(final JsonNode node) {
+ try {
+ return convertNode(node);
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
public static void writeNode(final Writer writer, final JsonNode node) throws IOException {
PRETTY_SORTED_WRITER.writeValue(writer, SORTED_MAPPER.treeToValue(node, Object.class));
}