[ASTERIXDB-2165] Avoid OOM in QueryServiceServlet

- user model changes: no
- storage format changes: no
- interface change: no

Change-Id: I74f61941f2e75e10f2accd6b2e6be6c1c0cd1490
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2150
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
index b559df8..f7031a4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
@@ -20,6 +20,7 @@
 package org.apache.asterix.translator;
 
 import java.io.PrintWriter;
+import java.io.StringWriter;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
@@ -29,6 +30,8 @@
 
     // Output path for primary execution.
     private final PrintWriter out;
+    private StringWriter buffer;
+    private PrintWriter bufferedOut;
 
     private final SessionOutput.ResultDecorator preResultDecorator;
     private final SessionOutput.ResultDecorator postResultDecorator;
@@ -53,7 +56,31 @@
      * Retrieve the PrintWriter to produce output to.
      */
     public PrintWriter out() {
-        return this.out;
+        return this.bufferedOut != null ? this.bufferedOut : this.out;
+    }
+
+    /**
+     * buffer the data provided to the PrintWriter returned by out() to be able to set the status of the response
+     * message when it can be determined. This is a no-op, if data is already buffered.
+     */
+    public void hold() {
+        if (this.bufferedOut == null) {
+            this.buffer = new StringWriter();
+            this.bufferedOut = new PrintWriter(this.buffer);
+        }
+    }
+
+    /**
+     * release the data that was buffered by calling hold() and remove the buffer from the pipeline.
+     * This is a no-op, if data is not buffered.
+     */
+    public void release() {
+        if (this.bufferedOut != null) {
+            this.bufferedOut.flush();
+            this.out.write(buffer.toString());
+            this.bufferedOut = null;
+            this.buffer = null;
+        }
     }
 
     public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws AlgebricksException {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 616c22e..64ea73d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -116,6 +116,8 @@
                 throw new Exception(err.toString(), err);
             }
         }
+        // no errors - stop buffering and allow for streaming result delivery
+        sessionOutput.release();
 
         IStatementExecutor.ResultMetadata resultMetadata = responseMsg.getMetadata();
         if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultMetadata.getResultSets().isEmpty()) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index f8f5c18..42c9edd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -381,25 +381,25 @@
         RequestParameters param = getRequestParameters(request);
         LOGGER.info(param.toString());
         long elapsedStart = System.nanoTime();
-        final StringWriter stringWriter = new StringWriter();
-        final PrintWriter resultWriter = new PrintWriter(stringWriter);
+        final PrintWriter httpWriter = response.writer();
 
         ResultDelivery delivery = parseResultDelivery(param.mode);
 
         String handleUrl = getHandleUrl(param.host, param.path, delivery);
-        SessionOutput sessionOutput = createSessionOutput(param, handleUrl, resultWriter);
+        SessionOutput sessionOutput = createSessionOutput(param, handleUrl, httpWriter);
         SessionConfig sessionConfig = sessionOutput.config();
         HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
 
-        HttpResponseStatus status = HttpResponseStatus.OK;
         Stats stats = new Stats();
         long[] execStartEnd = new long[] { -1, -1 };
 
-        resultWriter.print("{\n");
-        printRequestId(resultWriter);
-        printClientContextID(resultWriter, param);
-        printSignature(resultWriter);
-        printType(resultWriter, sessionConfig);
+        // buffer the output until we are ready to set the status of the response message correctly
+        sessionOutput.hold();
+        sessionOutput.out().print("{\n");
+        printRequestId(sessionOutput.out());
+        printClientContextID(sessionOutput.out(), param);
+        printSignature(sessionOutput.out());
+        printType(sessionOutput.out(), sessionConfig);
         long errorCount = 1; // so far we just return 1 error
         try {
             if (param.statement == null || param.statement.isEmpty()) {
@@ -410,33 +410,30 @@
             if (optionalParamProvider != null) {
                 optionalParams = optionalParamProvider.apply(request);
             }
+            response.setStatus(HttpResponseStatus.OK);
             executeStatement(statementsText, sessionOutput, delivery, stats, param, execStartEnd, optionalParams);
             if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) {
                 ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS);
             }
             errorCount = 0;
         } catch (Exception | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError e) {
-            status = handleExecuteStatementException(e);
-            ResultUtil.printError(resultWriter, e);
+            response.setStatus(handleExecuteStatementException(e));
+            ResultUtil.printError(sessionOutput.out(), e);
             ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
         } finally {
+            // make sure that we stop buffering and return the result to the http response
+            sessionOutput.release();
             if (execStartEnd[0] == -1) {
                 execStartEnd[1] = -1;
             } else if (execStartEnd[1] == -1) {
                 execStartEnd[1] = System.nanoTime();
             }
         }
-        printMetrics(resultWriter, System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0],
+        printMetrics(sessionOutput.out(), System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0],
                 stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount);
-        resultWriter.print("}\n");
-        resultWriter.flush();
-        String result = stringWriter.toString();
-
-        GlobalConfig.ASTERIX_LOGGER.log(Level.FINE, result);
-
-        response.setStatus(status);
-        response.writer().print(result);
-        if (response.writer().checkError()) {
+        sessionOutput.out().print("}\n");
+        sessionOutput.out().flush();
+        if (sessionOutput.out().checkError()) {
             LOGGER.warning("Error flushing output writer");
         }
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index a52d765..05debaa 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2395,6 +2395,8 @@
                 createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> {
                     final ResultReader resultReader = new ResultReader(hdc, id, resultSetId);
                     updateJobStats(id, stats);
+                    // stop buffering and allow for streaming result delivery
+                    sessionOutput.release();
                     ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats,
                             metadataProvider.findOutputRecordType());
                 }, clientContextId, ctx);