[ASTERIXDB-2033][OTH] restrict chunk size in http server

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- This change ensures no large chunks are written to
  direct buffers in the http server.

Change-Id: I08bac47ea28f66502b99df6fb8ff91dd85566d38
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1935
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/pom.xml b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
index 835cf61..6439adb 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
@@ -16,7 +16,8 @@
  ! specific language governing permissions and limitations
  ! under the License.
  !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.hyracks</groupId>
@@ -27,6 +28,17 @@
   <properties>
     <root.dir>${basedir}/../..</root.dir>
   </properties>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <argLine>-XX:MaxDirectMemorySize=16M</argLine>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
   <dependencies>
     <dependency>
       <groupId>io.netty</groupId>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
index 4a4c25a..30d599d 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
@@ -88,6 +88,12 @@
         } catch (Exception e) {
             LOGGER.log(Level.WARNING, "Unhandled exception", e);
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+        } catch (Throwable th) { //NOSONAR Just logging and then throwing again
+            try {
+                LOGGER.log(Level.WARNING, "Unhandled throwable", th);
+            } catch (Throwable loggingFailure) {// NOSONAR... swallow logging failure
+            }
+            throw th;
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index 3456343..0066b77 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -27,6 +27,7 @@
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.DefaultHttpContent;
 import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.util.internal.OutOfDirectMemoryError;
 
 public class ChunkedNettyOutputStream extends OutputStream {
 
@@ -50,23 +51,25 @@
 
     @Override
     public void write(byte[] b, int off, int len) throws IOException {
-        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)) {
-            throw new IndexOutOfBoundsException();
-        } else if (len == 0) {
-            return;
-        }
-        if (len > buffer.capacity()) {
-            flush();
-            flush(b, off, len);
-        } else {
-            int space = buffer.writableBytes();
-            if (space >= len) {
-                buffer.writeBytes(b, off, len);
-            } else {
-                buffer.writeBytes(b, off, space);
-                flush();
-                buffer.writeBytes(b, off + space, len - space);
+        try {
+            if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)) {
+                throw new IndexOutOfBoundsException();
             }
+            while (len > 0) {
+                int space = buffer.writableBytes();
+                if (space >= len) {
+                    buffer.writeBytes(b, off, len);
+                    len = 0; // NOSONAR
+                } else {
+                    buffer.writeBytes(b, off, space);
+                    len -= space; // NOSONAR
+                    off += space; // NOSONAR
+                    flush();
+                }
+            }
+        } catch (OutOfDirectMemoryError error) {// NOSONAR
+            response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+            throw error;
         }
     }
 
@@ -114,18 +117,6 @@
         }
     }
 
-    private void flush(byte[] buf, int offset, int len) throws IOException {
-        ensureWritable();
-        ByteBuf aBuffer = ctx.alloc().buffer(len);
-        aBuffer.writeBytes(buf, offset, len);
-        if (response.status() == HttpResponseStatus.OK) {
-            response.beforeFlush();
-            ctx.write(new DefaultHttpContent(aBuffer), ctx.channel().voidPromise());
-        } else {
-            response.error(aBuffer);
-        }
-    }
-
     private synchronized void ensureWritable() throws IOException {
         while (!ctx.channel().isWritable()) {
             try {
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
new file mode 100644
index 0000000..30df003
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.servlet;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class ChattyServlet extends AbstractServlet {
+    private static final Logger LOGGER = Logger.getLogger(ChattyServlet.class.getName());
+    private byte[] bytes;
+
+    public ChattyServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
+        super(ctx, paths);
+        String line = "I don't know when to stop talking\n";
+        StringBuilder responseBuilder = new StringBuilder();
+        for (int i = 0; i < 100000; i++) {
+            responseBuilder.append(line);
+        }
+        String responseString = responseBuilder.toString();
+        bytes = responseString.getBytes();
+    }
+
+    @Override
+    protected void get(IServletRequest request, IServletResponse response) throws Exception {
+        response.setStatus(HttpResponseStatus.OK);
+        HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, HttpUtil.Encoding.UTF8);
+        LOGGER.log(Level.WARNING, "I am about to flood you... and a single buffer is " + bytes.length + " bytes");
+        for (int i = 0; i < 100; i++) {
+            response.outputStream().write(bytes);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 2076201..854980e 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -41,13 +41,16 @@
 import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
 import org.apache.hyracks.http.server.HttpServer;
 import org.apache.hyracks.http.server.WebManager;
+import org.apache.hyracks.http.servlet.ChattyServlet;
 import org.apache.hyracks.http.servlet.SlowServlet;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import io.netty.handler.codec.http.HttpResponseStatus;
 
 public class HttpServerTest {
+    static final boolean PRINT_TO_CONSOLE = false;
     static final int PORT = 9898;
     static final int NUM_EXECUTOR_THREADS = 16;
     static final int SERVER_QUEUE_SIZE = 16;
@@ -60,6 +63,13 @@
     static final AtomicInteger OTHER_COUNT = new AtomicInteger();
     static final List<Thread> THREADS = new ArrayList<>();
 
+    @Before
+    public void setUp() {
+        SUCCESS_COUNT.set(0);
+        UNAVAILABLE_COUNT.set(0);
+        OTHER_COUNT.set(0);
+    }
+
     @Test
     public void testOverloadingServer() throws Exception {
         WebManager webMgr = new WebManager();
@@ -83,6 +93,31 @@
     }
 
     @Test
+    public void testChattyServer() throws Exception {
+        int numRequests = 64;
+        int numExecutors = 32;
+        int serverQueueSize = 32;
+        WebManager webMgr = new WebManager();
+        HttpServer server =
+                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
+        ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { PATH });
+        server.addServlet(servlet);
+        webMgr.add(server);
+        webMgr.start();
+        try {
+            request(numRequests);
+            for (Thread thread : THREADS) {
+                thread.join();
+            }
+            Assert.assertEquals(numRequests, SUCCESS_COUNT.get());
+            Assert.assertEquals(0, UNAVAILABLE_COUNT.get());
+            Assert.assertEquals(0, OTHER_COUNT.get());
+        } finally {
+            webMgr.stop();
+        }
+    }
+
+    @Test
     public void testMalformedString() throws Exception {
         WebManager webMgr = new WebManager();
         HttpServer server =
@@ -136,8 +171,15 @@
                     } else {
                         OTHER_COUNT.incrementAndGet();
                     }
-                    InputStream responseStream = response.getEntity().getContent();
-                    IOUtils.closeQuietly(responseStream);
+                    InputStream in = response.getEntity().getContent();
+                    if (PRINT_TO_CONSOLE) {
+                        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+                        String line = null;
+                        while ((line = reader.readLine()) != null) {
+                            System.out.println(line);
+                        }
+                    }
+                    IOUtils.closeQuietly(in);
                 } catch (Throwable th) {
                     th.printStackTrace();
                 }