[ASTERIXDB-2190][API] fix timeout behavior
- user model changes: no
- storage format changes: no
- interface changes: no
The timeout parameter is now handled for json and urlencoded requests,
the result status for a request that has timed out is "timeout", and
invalid timeout values result in an error.
Change-Id: Ide0515dc8ef9f8c295e1dc2ffde297100634060a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2199
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 64ea73d..cd1064b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -68,12 +68,13 @@
@Override
protected void executeStatement(String statementsText, SessionOutput sessionOutput,
IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats, RequestParameters param,
- long[] outExecStartEnd, Map<String, String> optionalParameters) throws Exception {
+ RequestExecutionState execution, Map<String, String> optionalParameters) throws Exception {
// Running on NC -> send 'execute' message to CC
INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker();
- IStatementExecutor.ResultDelivery ccDelivery = delivery == IStatementExecutor.ResultDelivery.IMMEDIATE
- ? IStatementExecutor.ResultDelivery.DEFERRED : delivery;
+ IStatementExecutor.ResultDelivery ccDelivery =
+ delivery == IStatementExecutor.ResultDelivery.IMMEDIATE ? IStatementExecutor.ResultDelivery.DEFERRED
+ : delivery;
ExecuteStatementResponseMessage responseMsg;
MessageFuture responseFuture = ncMb.registerMessageFuture();
final String handleUrl = getHandleUrl(param.host, param.path, delivery);
@@ -82,13 +83,13 @@
param.clientContextID = UUID.randomUUID().toString();
}
long timeout = ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
- if (param.timeout != null) {
+ if (param.timeout != null && !param.timeout.trim().isEmpty()) {
timeout = TimeUnit.NANOSECONDS.toMillis(Duration.parseDurationStringToNanos(param.timeout));
}
ExecuteStatementRequestMessage requestMsg = new ExecuteStatementRequestMessage(ncCtx.getNodeId(),
responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(), ccDelivery,
param.clientContextID, handleUrl, optionalParameters);
- outExecStartEnd[0] = System.nanoTime();
+ execution.start();
ncMb.sendMessageToCC(requestMsg);
try {
responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(timeout, TimeUnit.MILLISECONDS);
@@ -96,12 +97,13 @@
cancelQuery(ncMb, ncCtx.getNodeId(), param.clientContextID, e, false);
throw e;
} catch (TimeoutException exception) {
- RuntimeDataException hde = new RuntimeDataException(ErrorCode.QUERY_TIMEOUT, exception);
+ RuntimeDataException hde = new RuntimeDataException(ErrorCode.QUERY_TIMEOUT);
+ hde.addSuppressed(exception);
// cancel query
cancelQuery(ncMb, ncCtx.getNodeId(), param.clientContextID, hde, true);
throw hde;
}
- outExecStartEnd[1] = System.nanoTime();
+ execution.end();
} finally {
ncMb.deregisterMessageFuture(responseFuture.getFutureId());
}
@@ -131,8 +133,8 @@
}
}
- private void cancelQuery(INCMessageBroker messageBroker, String nodeId, String clientContextID,
- Exception exception, boolean wait) {
+ private void cancelQuery(INCMessageBroker messageBroker, String nodeId, String clientContextID, Exception exception,
+ boolean wait) {
MessageFuture cancelQueryFuture = messageBroker.registerMessageFuture();
try {
CancelQueryRequest cancelQueryMessage =
@@ -150,13 +152,13 @@
}
@Override
- protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
+ protected void handleExecuteStatementException(Throwable t, RequestExecutionState execution) {
if (t instanceof TimeoutException
|| (t instanceof HyracksDataException && ExceptionUtils.getRootCause(t) instanceof IPCException)) {
GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.toString(), t);
- return HttpResponseStatus.SERVICE_UNAVAILABLE;
+ execution.setStatus(ResultStatus.FAILED, HttpResponseStatus.SERVICE_UNAVAILABLE);
} else {
- return super.handleExecuteStatementException(t);
+ super.handleExecuteStatementException(t, execution);
}
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 42c9edd..0b6151f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.io.PrintWriter;
-import java.io.StringWriter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@@ -36,6 +35,7 @@
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.aql.parser.TokenMgrError;
import org.apache.asterix.lang.common.base.IParser;
@@ -68,7 +68,7 @@
import io.netty.handler.codec.http.HttpResponseStatus;
public class QueryServiceServlet extends AbstractQueryApiServlet {
- private static final Logger LOGGER = Logger.getLogger(QueryServiceServlet.class.getName());
+ protected static final Logger LOGGER = Logger.getLogger(QueryServiceServlet.class.getName());
protected final ILangExtension.Language queryLanguage;
private final ILangCompilationProvider compilationProvider;
private final IStatementExecutorFactory statementExecutorFactory;
@@ -184,6 +184,7 @@
on.put("mode", mode);
on.put("clientContextID", clientContextID);
on.put("format", format);
+ on.put("timeout", timeout);
return om.writer(new MinimalPrettyPrinter()).writeValueAsString(on);
} catch (JsonProcessingException e) { // NOSONAR
return e.getMessage();
@@ -191,6 +192,46 @@
}
}
+ static final class RequestExecutionState {
+ private long execStart = -1;
+ private long execEnd = -1;
+ private ResultStatus resultStatus = ResultStatus.SUCCESS;
+ private HttpResponseStatus httpResponseStatus = HttpResponseStatus.OK;
+
+ void setStatus(ResultStatus resultStatus, HttpResponseStatus httpResponseStatus) {
+ this.resultStatus = resultStatus;
+ this.httpResponseStatus = httpResponseStatus;
+ }
+
+ ResultStatus getResultStatus() {
+ return resultStatus;
+ }
+
+ HttpResponseStatus getHttpStatus() {
+ return httpResponseStatus;
+ }
+
+ void start() {
+ execStart = System.nanoTime();
+ }
+
+ void end() {
+ execEnd = System.nanoTime();
+ }
+
+ void finish() {
+ if (execStart == -1) {
+ execEnd = -1;
+ } else if (execEnd == -1) {
+ execEnd = System.nanoTime();
+ }
+ }
+
+ long duration() {
+ return execEnd - execStart;
+ }
+ }
+
private static String getParameterValue(String content, String attribute) {
if (content == null || attribute == null) {
return null;
@@ -289,7 +330,6 @@
ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), resultSize, true);
pw.print("\t");
ResultUtil.printField(pw, Metrics.PROCESSED_OBJECTS_COUNT.str(), processedObjects, hasErrors);
- pw.print("\t");
if (hasErrors) {
pw.print("\t");
ResultUtil.printField(pw, Metrics.ERROR_COUNT.str(), errorCount, false);
@@ -334,6 +374,7 @@
param.pretty = Boolean.parseBoolean(request.getParameter(Parameter.PRETTY.str()));
param.mode = toLower(request.getParameter(Parameter.MODE.str()));
param.clientContextID = request.getParameter(Parameter.CLIENT_ID.str());
+ param.timeout = request.getParameter(Parameter.TIMEOUT.str());
}
return param;
}
@@ -391,7 +432,7 @@
HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
Stats stats = new Stats();
- long[] execStartEnd = new long[] { -1, -1 };
+ RequestExecutionState execution = new RequestExecutionState();
// buffer the output until we are ready to set the status of the response message correctly
sessionOutput.hold();
@@ -410,27 +451,24 @@
if (optionalParamProvider != null) {
optionalParams = optionalParamProvider.apply(request);
}
- response.setStatus(HttpResponseStatus.OK);
- executeStatement(statementsText, sessionOutput, delivery, stats, param, execStartEnd, optionalParams);
+ response.setStatus(execution.getHttpStatus());
+ executeStatement(statementsText, sessionOutput, delivery, stats, param, execution, optionalParams);
if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) {
- ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS);
+ ResultUtil.printStatus(sessionOutput, execution.getResultStatus());
}
errorCount = 0;
} catch (Exception | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError e) {
- response.setStatus(handleExecuteStatementException(e));
+ handleExecuteStatementException(e, execution);
+ response.setStatus(execution.getHttpStatus());
ResultUtil.printError(sessionOutput.out(), e);
- ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
+ ResultUtil.printStatus(sessionOutput, execution.getResultStatus());
} finally {
// make sure that we stop buffering and return the result to the http response
sessionOutput.release();
- if (execStartEnd[0] == -1) {
- execStartEnd[1] = -1;
- } else if (execStartEnd[1] == -1) {
- execStartEnd[1] = System.nanoTime();
- }
+ execution.finish();
}
- printMetrics(sessionOutput.out(), System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0],
- stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount);
+ printMetrics(sessionOutput.out(), System.nanoTime() - elapsedStart, execution.duration(), stats.getCount(),
+ stats.getSize(), stats.getProcessedObjects(), errorCount);
sessionOutput.out().print("}\n");
sessionOutput.out().flush();
if (sessionOutput.out().checkError()) {
@@ -439,7 +477,7 @@
}
protected void executeStatement(String statementsText, SessionOutput sessionOutput, ResultDelivery delivery,
- IStatementExecutor.Stats stats, RequestParameters param, long[] outExecStartEnd,
+ IStatementExecutor.Stats stats, RequestParameters param, RequestExecutionState execution,
Map<String, String> optionalParameters) throws Exception {
IClusterManagementWork.ClusterState clusterState =
((ICcApplicationContext) appCtx).getClusterStateManager().getState();
@@ -452,25 +490,29 @@
MetadataManager.INSTANCE.init();
IStatementExecutor translator = statementExecutorFactory.create((ICcApplicationContext) appCtx, statements,
sessionOutput, compilationProvider, componentProvider);
- outExecStartEnd[0] = System.nanoTime();
+ execution.start();
final IRequestParameters requestParameters =
new org.apache.asterix.app.translator.RequestParameters(getHyracksDataset(), delivery, stats, null,
param.clientContextID, optionalParameters);
translator.compileAndExecute(getHyracksClientConnection(), queryCtx, requestParameters);
- outExecStartEnd[1] = System.nanoTime();
+ execution.end();
}
- protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
+ protected void handleExecuteStatementException(Throwable t, RequestExecutionState execution) {
if (t instanceof org.apache.asterix.aqlplus.parser.TokenMgrError || t instanceof TokenMgrError
|| t instanceof AlgebricksException) {
GlobalConfig.ASTERIX_LOGGER.log(Level.INFO, t.getMessage(), t);
- return HttpResponseStatus.BAD_REQUEST;
+ execution.setStatus(ResultStatus.FATAL, HttpResponseStatus.BAD_REQUEST);
} else if (t instanceof HyracksException) {
GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.getMessage(), t);
- return HttpResponseStatus.INTERNAL_SERVER_ERROR;
+ if (((HyracksException) t).getErrorCode() == ErrorCode.QUERY_TIMEOUT) {
+ execution.setStatus(ResultStatus.TIMEOUT, HttpResponseStatus.OK);
+ } else {
+ execution.setStatus(ResultStatus.FATAL, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ }
} else {
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", t);
- return HttpResponseStatus.INTERNAL_SERVER_ERROR;
+ execution.setStatus(ResultStatus.FATAL, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
index 72d82e0..ccbf68d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
@@ -130,8 +130,9 @@
public static void printError(PrintWriter pw, String msg, int code, boolean comma) {
pw.print("\t\"");
pw.print(AbstractQueryApiServlet.ResultFields.ERRORS.str());
- pw.print("\": [{ \n");
+ pw.print("\": [{ \n\t");
printField(pw, QueryServiceServlet.ErrorField.CODE.str(), code);
+ pw.print("\t");
printField(pw, QueryServiceServlet.ErrorField.MSG.str(), JSONUtil.escape(msg), false);
pw.print(comma ? "\t}],\n" : "\t}]\n");
}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 61d8291..5a7cbc3 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -61,7 +61,7 @@
25 = Polygon must have at least 3 points
26 = %1$s can not be an instance of polygon
27 = Operation not supported
-28 = Invalid duration %1$s
+28 = Invalid duration \"%1$s\"
29 = Unknown duration unit %1$s
30 = Query timed out and will be cancelled
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 330cf1f..1f82a17 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -41,7 +41,7 @@
22 = The distributed job %1$s already exists
23 = The distributed work failed for %1$s at %2$s
24 = No result set for job %1$s
-25 = Job %1$s has been cancelled by a user
+25 = Job %1$s has been cancelled
26 = Node %1$s failed
27 = File %1$s is not a directory
28 = User doesn't have read permissions on the file %1$s