[ASTERIXDB-2052][OTH] Release resources on http request rejection

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- When a request is rejected, we release its resources.
- A test case was added which sends 3500+ rejected requests and
  causes the server to throw out of memory error prior to this
  fix.

Change-Id: Ia0e3f3e6e2f94a31f296b3491a07f624a4fea604
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1955
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 1cec616..1f1d282 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -62,6 +62,7 @@
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+
 import io.netty.handler.codec.http.HttpResponseStatus;
 
 public class QueryServiceServlet extends AbstractQueryApiServlet {
@@ -93,6 +94,12 @@
             // Servlet methods should not throw exceptions
             // http://cwe.mitre.org/data/definitions/600.html
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
+        } catch (Throwable th) {// NOSONAR: Logging and re-throwing
+            try {
+                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, th.getMessage(), th);
+            } catch (Throwable ignored) { // NOSONAR: Logging failure
+            }
+            throw th;
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-http/pom.xml b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
index 6439adb..cb69caa 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
@@ -27,6 +27,9 @@
   <artifactId>hyracks-http</artifactId>
   <properties>
     <root.dir>${basedir}/../..</root.dir>
+    <direct.mem>-XX:MaxDirectMemorySize</direct.mem>
+    <num.arenas>-Dio.netty.allocator.numDirectArenas</num.arenas>
+    <max.order>-Dio.netty.allocator.maxOrder</max.order>
   </properties>
   <build>
     <plugins>
@@ -34,7 +37,7 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
-          <argLine>-XX:MaxDirectMemorySize=16M</argLine>
+          <argLine>${direct.mem}=16M ${num.arenas}=4 ${max.order}=7</argLine>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index 64be051..47714ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -113,7 +113,7 @@
         } else {
             // There was an error
             if (headerSent) {
-                LOGGER.log(Level.WARNING,"Error after header write of chunked response");
+                LOGGER.log(Level.WARNING, "Error after header write of chunked response");
                 if (error != null) {
                     error.release();
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
index aa0f32a..cabb01f 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
@@ -82,7 +82,11 @@
     }
 
     public void reject() throws IOException {
-        response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
-        response.close();
+        try {
+            response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
+            response.close();
+        } finally {
+            request.getHttpRequest().release();
+        }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 6ceafc6..45634ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -60,6 +60,7 @@
     private final Object lock = new Object();
     private final AtomicInteger threadId = new AtomicInteger();
     private final ConcurrentMap<String, Object> ctx;
+    private final LinkedBlockingQueue<Runnable> workQueue;
     private final List<IServlet> servlets;
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
@@ -81,8 +82,8 @@
         this.port = port;
         ctx = new ConcurrentHashMap<>();
         servlets = new ArrayList<>();
-        executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS,
-                new LinkedBlockingQueue<>(requestQueueSize),
+        workQueue = new LinkedBlockingQueue<>(requestQueueSize);
+        executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue,
                 runnable -> new Thread(runnable, "HttpExecutor(port:" + port + ")-" + threadId.getAndIncrement()));
         long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK
                 + numExecutorThreads * HttpServerInitializer.RESPONSE_CHUNK_SIZE;
@@ -270,4 +271,8 @@
     protected EventLoopGroup getWorkerGroup() {
         return workerGroup;
     }
+
+    public int getWorkQueueSize() {
+        return workQueue.size();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
index 863eddd..bf0452b 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
@@ -54,6 +54,11 @@
     }
 
     @Override
+    protected void post(IServletRequest request, IServletResponse response) throws Exception {
+        get(request, response);
+    }
+
+    @Override
     protected void get(IServletRequest request, IServletResponse response) throws Exception {
         response.setStatus(HttpResponseStatus.OK);
         HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, HttpUtil.Encoding.UTF8);
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SlowServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
similarity index 71%
rename from hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SlowServlet.java
rename to hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
index 065d803..6bfa0cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SlowServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
@@ -28,16 +28,39 @@
 
 import io.netty.handler.codec.http.HttpResponseStatus;
 
-public class SlowServlet extends AbstractServlet {
-    public SlowServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
+public class SleepyServlet extends AbstractServlet {
+
+    private volatile boolean sleep = true;
+
+    public SleepyServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
         super(ctx, paths);
     }
 
     @Override
+    protected void post(IServletRequest request, IServletResponse response) throws Exception {
+        get(request, response);
+    }
+
+    @Override
     protected void get(IServletRequest request, IServletResponse response) throws Exception {
         response.setStatus(HttpResponseStatus.OK);
-        Thread.sleep(5000); // NOSONAR
+        if (sleep) {
+            synchronized (this) {
+                while (sleep) {
+                    this.wait();
+                }
+            }
+        }
         HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, HttpUtil.Encoding.UTF8);
         response.outputStream().write("I am playing hard to get".getBytes(StandardCharsets.UTF_8));
     }
+
+    public synchronized void wakeUp() {
+        sleep = false;
+        notifyAll();
+    }
+
+    public void sleep() {
+        sleep = true;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 6512dc1..66d1b77 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -30,6 +30,9 @@
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.IOUtils;
@@ -37,12 +40,13 @@
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.client.methods.RequestBuilder;
+import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
 import org.apache.hyracks.http.server.HttpServer;
 import org.apache.hyracks.http.server.WebManager;
 import org.apache.hyracks.http.servlet.ChattyServlet;
-import org.apache.hyracks.http.servlet.SlowServlet;
+import org.apache.hyracks.http.servlet.SleepyServlet;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -52,16 +56,14 @@
 public class HttpServerTest {
     static final boolean PRINT_TO_CONSOLE = false;
     static final int PORT = 9898;
-    static final int NUM_EXECUTOR_THREADS = 16;
-    static final int SERVER_QUEUE_SIZE = 16;
-    static final int NUM_OF_REQUESTS = 48;
     static final String HOST = "localhost";
     static final String PROTOCOL = "http";
     static final String PATH = "/";
     static final AtomicInteger SUCCESS_COUNT = new AtomicInteger();
     static final AtomicInteger UNAVAILABLE_COUNT = new AtomicInteger();
     static final AtomicInteger OTHER_COUNT = new AtomicInteger();
-    static final List<Thread> THREADS = new ArrayList<>();
+    static final List<Future<Void>> FUTURES = new ArrayList<>();
+    static final ExecutorService executor = Executors.newCachedThreadPool();
 
     @Before
     public void setUp() {
@@ -73,31 +75,106 @@
     @Test
     public void testOverloadingServer() throws Exception {
         WebManager webMgr = new WebManager();
+        int numExecutors = 16;
+        int serverQueueSize = 16;
+        int numRequests = 48;
         HttpServer server =
-                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, NUM_EXECUTOR_THREADS, SERVER_QUEUE_SIZE);
-        SlowServlet servlet = new SlowServlet(server.ctx(), new String[] { PATH });
+                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
+        SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
         server.addServlet(servlet);
         webMgr.add(server);
         webMgr.start();
+        int expectedSuccess = numExecutors + serverQueueSize;
+        int expectedUnavailable = numRequests - expectedSuccess;
         try {
-            request(NUM_OF_REQUESTS);
-            for (Thread thread : THREADS) {
-                thread.join();
+            request(expectedSuccess);
+            waitTillQueued(server, serverQueueSize);
+            ArrayList<Future<Void>> successSet = started();
+            request(expectedUnavailable);
+            ArrayList<Future<Void>> rejectedSet = started();
+            for (Future<Void> f : rejectedSet) {
+                f.get();
             }
-            Assert.assertEquals(32, SUCCESS_COUNT.get());
-            Assert.assertEquals(16, UNAVAILABLE_COUNT.get());
+            servlet.wakeUp();
+            for (Future<Void> f : successSet) {
+                f.get();
+            }
+            Assert.assertEquals(expectedSuccess, SUCCESS_COUNT.get());
+            Assert.assertEquals(expectedUnavailable, UNAVAILABLE_COUNT.get());
             Assert.assertEquals(0, OTHER_COUNT.get());
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
         } finally {
             webMgr.stop();
         }
     }
 
+    private void waitTillQueued(HttpServer server, int expectedQueued) throws Exception {
+        int maxAttempts = 5;
+        int attempt = 0;
+        int queued = server.getWorkQueueSize();
+        while (queued != expectedQueued) {
+            attempt++;
+            if (attempt > maxAttempts) {
+                throw new Exception("Number of queued requests (" + queued + ") didn't match the expected number ("
+                        + expectedQueued + ")");
+            }
+            Thread.sleep(1000); // NOSONAR polling is the clean way
+            queued = server.getWorkQueueSize();
+        }
+    }
+
+    @Test
+    public void testReleaseRejectedRequest() throws Exception {
+        WebManager webMgr = new WebManager();
+        int numRequests = 64;
+        int numExecutors = 2;
+        int serverQueueSize = 2;
+        int numPatches = 60;
+        HttpServer server =
+                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
+        SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
+        server.addServlet(servlet);
+        webMgr.add(server);
+        webMgr.start();
+        request(numExecutors + serverQueueSize);
+        ArrayList<Future<Void>> stuck = started();
+        waitTillQueued(server, serverQueueSize);
+        try {
+            try {
+                for (int i = 0; i < numPatches; i++) {
+                    ChattyServlet.printMemUsage();
+                    request(numRequests);
+                    for (Future<Void> f : FUTURES) {
+                        f.get();
+                    }
+                    FUTURES.clear();
+                }
+            } finally {
+                ChattyServlet.printMemUsage();
+                servlet.wakeUp();
+                for (Future<Void> f : stuck) {
+                    f.get();
+                }
+            }
+        } finally {
+            webMgr.stop();
+        }
+    }
+
+    private ArrayList<Future<Void>> started() {
+        ArrayList<Future<Void>> started = new ArrayList<>(FUTURES);
+        FUTURES.clear();
+        return started;
+    }
+
     @Test
     public void testChattyServer() throws Exception {
-        ChattyServlet.printMemUsage();
         int numRequests = 64;
         int numExecutors = 32;
         int serverQueueSize = 32;
+        ChattyServlet.printMemUsage();
         WebManager webMgr = new WebManager();
         HttpServer server =
                 new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
@@ -107,8 +184,8 @@
         webMgr.start();
         try {
             request(numRequests);
-            for (Thread thread : THREADS) {
-                thread.join();
+            for (Future<Void> thread : FUTURES) {
+                thread.get();
             }
             Assert.assertEquals(numRequests, SUCCESS_COUNT.get());
             Assert.assertEquals(0, UNAVAILABLE_COUNT.get());
@@ -120,10 +197,12 @@
 
     @Test
     public void testMalformedString() throws Exception {
+        int numExecutors = 16;
+        int serverQueueSize = 16;
         WebManager webMgr = new WebManager();
         HttpServer server =
-                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, NUM_EXECUTOR_THREADS, SERVER_QUEUE_SIZE);
-        SlowServlet servlet = new SlowServlet(server.ctx(), new String[] { PATH });
+                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
+        SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
         server.addServlet(servlet);
         webMgr.add(server);
         webMgr.start();
@@ -160,9 +239,9 @@
 
     private void request(int count) {
         for (int i = 0; i < count; i++) {
-            Thread next = new Thread(() -> {
+            Future<Void> next = executor.submit(() -> {
                 try {
-                    HttpUriRequest request = request(null);
+                    HttpUriRequest request = post(null);
                     HttpResponse response = executeHttpRequest(request);
                     if (response.getStatusLine().getStatusCode() == HttpResponseStatus.OK.code()) {
                         SUCCESS_COUNT.incrementAndGet();
@@ -183,10 +262,11 @@
                     IOUtils.closeQuietly(in);
                 } catch (Throwable th) {
                     th.printStackTrace();
+                    throw th;
                 }
+                return null;
             });
-            THREADS.add(next);
-            next.start();
+            FUTURES.add(next);
         }
     }
 
@@ -200,10 +280,26 @@
         }
     }
 
-    protected HttpUriRequest request(String query) throws URISyntaxException {
+    protected HttpUriRequest get(String query) throws URISyntaxException {
         URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null);
         RequestBuilder builder = RequestBuilder.get(uri);
         builder.setCharset(StandardCharsets.UTF_8);
         return builder.build();
     }
+
+    protected HttpUriRequest post(String query) throws URISyntaxException {
+        URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null);
+        RequestBuilder builder = RequestBuilder.post(uri);
+        StringBuilder str = new StringBuilder();
+        for (int i = 0; i < 32; i++) {
+            str.append("This is a string statement that will be ignored");
+            str.append('\n');
+        }
+        String statement = str.toString();
+        builder.setHeader("Content-type", "application/x-www-form-urlencoded");
+        builder.addParameter("statement", statement);
+        builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8));
+        builder.setCharset(StandardCharsets.UTF_8);
+        return builder.build();
+    }
 }