[ASTERIXDB-2165] Avoid OOM in QueryServiceServlet
- user model changes: no
- storage format changes: no
- interface change: no
Change-Id: I74f61941f2e75e10f2accd6b2e6be6c1c0cd1490
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2150
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
index b559df8..f7031a4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
@@ -20,6 +20,7 @@
package org.apache.asterix.translator;
import java.io.PrintWriter;
+import java.io.StringWriter;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
@@ -29,6 +30,8 @@
// Output path for primary execution.
private final PrintWriter out;
+ private StringWriter buffer;
+ private PrintWriter bufferedOut;
private final SessionOutput.ResultDecorator preResultDecorator;
private final SessionOutput.ResultDecorator postResultDecorator;
@@ -53,7 +56,31 @@
* Retrieve the PrintWriter to produce output to.
*/
public PrintWriter out() {
- return this.out;
+ return this.bufferedOut != null ? this.bufferedOut : this.out;
+ }
+
+ /**
+ * buffer the data provided to the PrintWriter returned by out() to be able to set the status of the response
+ * message when it can be determined. This is a no-op, if data is already buffered.
+ */
+ public void hold() {
+ if (this.bufferedOut == null) {
+ this.buffer = new StringWriter();
+ this.bufferedOut = new PrintWriter(this.buffer);
+ }
+ }
+
+ /**
+ * release the data that was buffered by calling hold() and remove the buffer from the pipeline.
+ * This is a no-op, if data is not buffered.
+ */
+ public void release() {
+ if (this.bufferedOut != null) {
+ this.bufferedOut.flush();
+ this.out.write(buffer.toString());
+ this.bufferedOut = null;
+ this.buffer = null;
+ }
}
public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws AlgebricksException {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 616c22e..64ea73d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -116,6 +116,8 @@
throw new Exception(err.toString(), err);
}
}
+ // no errors - stop buffering and allow for streaming result delivery
+ sessionOutput.release();
IStatementExecutor.ResultMetadata resultMetadata = responseMsg.getMetadata();
if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultMetadata.getResultSets().isEmpty()) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index f8f5c18..42c9edd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -381,25 +381,25 @@
RequestParameters param = getRequestParameters(request);
LOGGER.info(param.toString());
long elapsedStart = System.nanoTime();
- final StringWriter stringWriter = new StringWriter();
- final PrintWriter resultWriter = new PrintWriter(stringWriter);
+ final PrintWriter httpWriter = response.writer();
ResultDelivery delivery = parseResultDelivery(param.mode);
String handleUrl = getHandleUrl(param.host, param.path, delivery);
- SessionOutput sessionOutput = createSessionOutput(param, handleUrl, resultWriter);
+ SessionOutput sessionOutput = createSessionOutput(param, handleUrl, httpWriter);
SessionConfig sessionConfig = sessionOutput.config();
HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
- HttpResponseStatus status = HttpResponseStatus.OK;
Stats stats = new Stats();
long[] execStartEnd = new long[] { -1, -1 };
- resultWriter.print("{\n");
- printRequestId(resultWriter);
- printClientContextID(resultWriter, param);
- printSignature(resultWriter);
- printType(resultWriter, sessionConfig);
+ // buffer the output until we are ready to set the status of the response message correctly
+ sessionOutput.hold();
+ sessionOutput.out().print("{\n");
+ printRequestId(sessionOutput.out());
+ printClientContextID(sessionOutput.out(), param);
+ printSignature(sessionOutput.out());
+ printType(sessionOutput.out(), sessionConfig);
long errorCount = 1; // so far we just return 1 error
try {
if (param.statement == null || param.statement.isEmpty()) {
@@ -410,33 +410,30 @@
if (optionalParamProvider != null) {
optionalParams = optionalParamProvider.apply(request);
}
+ response.setStatus(HttpResponseStatus.OK);
executeStatement(statementsText, sessionOutput, delivery, stats, param, execStartEnd, optionalParams);
if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) {
ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS);
}
errorCount = 0;
} catch (Exception | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError e) {
- status = handleExecuteStatementException(e);
- ResultUtil.printError(resultWriter, e);
+ response.setStatus(handleExecuteStatementException(e));
+ ResultUtil.printError(sessionOutput.out(), e);
ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
} finally {
+ // make sure that we stop buffering and return the result to the http response
+ sessionOutput.release();
if (execStartEnd[0] == -1) {
execStartEnd[1] = -1;
} else if (execStartEnd[1] == -1) {
execStartEnd[1] = System.nanoTime();
}
}
- printMetrics(resultWriter, System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0],
+ printMetrics(sessionOutput.out(), System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0],
stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount);
- resultWriter.print("}\n");
- resultWriter.flush();
- String result = stringWriter.toString();
-
- GlobalConfig.ASTERIX_LOGGER.log(Level.FINE, result);
-
- response.setStatus(status);
- response.writer().print(result);
- if (response.writer().checkError()) {
+ sessionOutput.out().print("}\n");
+ sessionOutput.out().flush();
+ if (sessionOutput.out().checkError()) {
LOGGER.warning("Error flushing output writer");
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index a52d765..05debaa 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2395,6 +2395,8 @@
createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
final ResultReader resultReader = new ResultReader(hdc, id, resultSetId);
updateJobStats(id, stats);
+ // stop buffering and allow for streaming result delivery
+ sessionOutput.release();
ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
metadataProvider.findOutputRecordType());
}, clientContextId, ctx);