[NO ISSUE][OTH] Improve Http Server

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Log estimated input memory budget
- Ensure all allocated input memory buffers are 4K. This reduces
  the chance of memory allocation to go beyond budget. This also
  allows input and output to share buffers of this size since
  it is the size of choice for reading and writing.
- Reject requests that go beyond server capacity before reading
  them which reduces wasted resources.
- Allow configurations of number of bosses and workers for an
  Http web manager.
- Push the limit further in Http server tests through:
  -- Increase the size of the single request to ~100KB.
  -- Increase the number of rejected requests.

Change-Id: I7adcd59047805dc384e1c119191eff995c6e9a7a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1991
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index 0066b77..e4f0777 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -88,7 +88,9 @@
                 try {
                     flush();
                 } finally {
-                    buffer.release();
+                    if (buffer != null) {
+                        buffer.release();
+                    }
                 }
             } else {
                 response.fullReponse(buffer);
@@ -101,12 +103,16 @@
     @Override
     public void flush() throws IOException {
         ensureWritable();
-        if (buffer.readableBytes() > 0) {
+        if (buffer != null && buffer.readableBytes() > 0) {
             if (response.status() == HttpResponseStatus.OK) {
                 int size = buffer.capacity();
                 response.beforeFlush();
                 DefaultHttpContent content = new DefaultHttpContent(buffer);
-                ctx.write(content, ctx.channel().voidPromise());
+                ctx.writeAndFlush(content, ctx.channel().voidPromise());
+                // The responisbility of releasing the buffer is now with the netty pipeline since it is forwarded
+                // within the http content. We must nullify buffer before we allocate the next one to avoid
+                // releasing the buffer twice in case the allocation call fails.
+                buffer = null;
                 buffer = ctx.alloc().buffer(size);
             } else {
                 ByteBuf aBuffer = ctx.alloc().buffer(buffer.readableBytes());
@@ -120,7 +126,6 @@
     private synchronized void ensureWritable() throws IOException {
         while (!ctx.channel().isWritable()) {
             try {
-                ctx.flush();
                 if (!ctx.channel().isOpen()) {
                     throw new IOException("Closed channel");
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java
new file mode 100644
index 0000000..e1f9e5a49
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestCapacityController.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.server;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.internal.PromiseNotificationUtil;
+
+/**
+ * A handler to do input control... as a single pipeline can create many requests
+ * The remaining capacity of the server executor queue is incremented only when a request has been fully read
+ * Therefore, there is a window where requests can be read and get rejected later when they are
+ * submitted to the server executor
+ */
+public class HttpRequestCapacityController extends ChannelInboundHandlerAdapter {
+
+    private static final Logger LOGGER = Logger.getLogger(HttpRequestCapacityController.class.getName());
+    private final HttpServer server;
+    private boolean overloaded = false;
+
+    public HttpRequestCapacityController(HttpServer server) {
+        this.server = server;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (overloaded) {
+            ReferenceCountUtil.release(msg);
+            return;
+        }
+        if (overloaded()) {
+            ReferenceCountUtil.release(msg);
+            reject(ctx);
+            return;
+        } else {
+            super.channelRead(ctx, msg);
+        }
+    }
+
+    public static void reject(ChannelHandlerContext ctx) {
+        HttpResponseEncoder encoder = new HttpResponseEncoder();
+        ChannelPromise promise = ctx.newPromise();
+        promise.addListener(ChannelFutureListener.CLOSE);
+        DefaultFullHttpResponse response =
+                new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE);
+        try {
+            encoder.write(ctx, response, ctx.voidPromise());
+            ctx.writeAndFlush(ctx.alloc().buffer(0), promise);
+        } catch (Throwable th) {//NOSONAR
+            try {
+                LOGGER.log(Level.SEVERE, "Failure during request rejection", th);
+            } catch (Throwable loggingFailure) {//NOSONAR
+            }
+            PromiseNotificationUtil.tryFailure(promise, th, null);
+        }
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        if (overloaded()) {
+            reject(ctx);
+            return;
+        }
+        // We disable auto read to avoid reading at all if we can't handle any more requests
+        ctx.read();
+        super.channelActive(ctx);
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+        ctx.read();
+        super.channelReadComplete(ctx);
+    }
+
+    private boolean overloaded() {
+        overloaded = overloaded || server.getExecutor().getQueue().remainingCapacity() == 0;
+        return overloaded;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 56f454f..44d4dfe 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -23,7 +23,6 @@
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -38,6 +37,7 @@
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
 import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.FullHttpRequest;
@@ -50,6 +50,9 @@
     private static final int HIGH_WRITE_BUFFER_WATER_MARK = 32 * 1024;
     protected static final WriteBufferWaterMark WRITE_BUFFER_WATER_MARK =
             new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, HIGH_WRITE_BUFFER_WATER_MARK);
+    protected static final int RECEIVE_BUFFER_SIZE = 4096;
+    protected static final int DEFAULT_NUM_EXECUTOR_THREADS = 16;
+    protected static final int DEFAULT_REQUEST_QUEUE_SIZE = 256;
     private static final Logger LOGGER = Logger.getLogger(HttpServer.class.getName());
     private static final int FAILED = -1;
     private static final int STOPPED = 0;
@@ -65,14 +68,14 @@
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
     private final int port;
-    private final ExecutorService executor;
+    private final ThreadPoolExecutor executor;
     // Mutable members
     private volatile int state = STOPPED;
     private Channel channel;
     private Throwable cause;
 
     public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port) {
-        this(bossGroup, workerGroup, port, 16, 256);
+        this(bossGroup, workerGroup, port, DEFAULT_NUM_EXECUTOR_THREADS, DEFAULT_REQUEST_QUEUE_SIZE);
     }
 
     public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, int numExecutorThreads,
@@ -87,7 +90,14 @@
                 runnable -> new Thread(runnable, "HttpExecutor(port:" + port + ")-" + threadId.getAndIncrement()));
         long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK
                 + numExecutorThreads * HttpServerInitializer.RESPONSE_CHUNK_SIZE;
-        LOGGER.log(Level.INFO, "The direct memory budget for this server is " + directMemoryBudget + " bytes");
+        LOGGER.log(Level.INFO, "The output direct memory budget for this server is " + directMemoryBudget + " bytes");
+        long inputBudgetEstimate =
+                (long) HttpServerInitializer.MAX_REQUEST_INITIAL_LINE_LENGTH * (requestQueueSize + numExecutorThreads);
+        inputBudgetEstimate = inputBudgetEstimate * 2;
+        LOGGER.log(Level.INFO,
+                "The \"estimated\" input direct memory budget for this server is " + inputBudgetEstimate + " bytes");
+        // Having multiple arenas, memory fragments, and local thread cached buffers
+        // can cause the input memory usage to exceed estimate and custom buffer allocator must be used to avoid this
     }
 
     public final void start() throws Exception { // NOSONAR
@@ -199,6 +209,8 @@
         Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length());
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+                .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
+                .childOption(ChannelOption.AUTO_READ, Boolean.FALSE)
                 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK)
                 .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new HttpServerInitializer(this));
@@ -264,7 +276,7 @@
         return new HttpServerHandler<>(this, chunkSize);
     }
 
-    public ExecutorService getExecutor() {
+    public ThreadPoolExecutor getExecutor() {
         return executor;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
index a32da39..bc173fd 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerInitializer.java
@@ -31,7 +31,7 @@
     public static final int MAX_REQUEST_HEADER_SIZE = 262144;
     public static final int MAX_REQUEST_INITIAL_LINE_LENGTH = 131072;
     public static final int RESPONSE_CHUNK_SIZE = 4096;
-    private HttpServer server;
+    private final HttpServer server;
 
     public HttpServerInitializer(HttpServer server) {
         this.server = server;
@@ -40,6 +40,7 @@
     @Override
     public void initChannel(SocketChannel ch) {
         ChannelPipeline p = ch.pipeline();
+        p.addLast(new HttpRequestCapacityController(server));
         p.addLast(new HttpRequestDecoder(MAX_REQUEST_INITIAL_LINE_LENGTH, MAX_REQUEST_HEADER_SIZE,
                 MAX_REQUEST_CHUNK_SIZE));
         p.addLast(new HttpResponseEncoder());
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/WebManager.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/WebManager.java
index 4a09f78..55741e4 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/WebManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/WebManager.java
@@ -30,10 +30,38 @@
     private final EventLoopGroup bosses;
     private final EventLoopGroup workers;
 
+    /**
+     * Create a web manager with number of bosses = 1
+     * and number of workers = MultithreadEventLoopGroup.DEFAULT_EVENT_LOOP_THREADS
+     * The default can be set using -Dio.netty.eventLoopThreads
+     * Otherwise, it is set to Runtime.getRuntime().availableProcessors() * 2
+     */
     public WebManager() {
+        this(1, 0);
+    }
+
+    /**
+     * Create a web manager with number of bosses = 1 and number of workers = numWorkers
+     *
+     * @param numWorkers
+     *            number of worker threads
+     */
+    public WebManager(int numWorkers) {
+        this(1, numWorkers);
+    }
+
+    /**
+     * Create a web manager with number of bosses = numBosses and number of workers = numWorkers
+     *
+     * @param numBosses
+     *            number of boss threads
+     * @param numWorkers
+     *            number of worker threads
+     */
+    public WebManager(int numBosses, int numWorkers) {
         servers = new ArrayList<>();
-        bosses = new NioEventLoopGroup(1);
-        workers = new NioEventLoopGroup();
+        bosses = new NioEventLoopGroup(numBosses);
+        workers = new NioEventLoopGroup(numWorkers);
     }
 
     public List<HttpServer> getServers() {
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index fa3cc57..8240bce 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -19,9 +19,16 @@
 package org.apache.hyracks.http.server.utils;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryType;
+import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 import java.util.regex.Pattern;
 
 import org.apache.hyracks.http.api.IServletRequest;
@@ -29,13 +36,17 @@
 import org.apache.hyracks.http.server.BaseRequest;
 import org.apache.hyracks.http.server.FormUrlEncodedRequest;
 
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpRequest;
+import io.netty.util.internal.PlatformDependent;
 
 public class HttpUtil {
 
+    private static final Logger LOGGER = Logger.getLogger(HttpUtil.class.getName());
     private static final Pattern PARENT_DIR = Pattern.compile("/[^./]+/\\.\\./");
+    private static long maxMemUsage = 0L;
 
     private HttpUtil() {
     }
@@ -145,4 +156,53 @@
         return clusterURL;
     }
 
+    @SuppressWarnings("restriction")
+    public static synchronized void printMemUsage() {
+        StringBuilder report = new StringBuilder();
+        report.append("sun.misc.VM.maxDirectMemory: ");
+        report.append(sun.misc.VM.maxDirectMemory());
+        report.append('\n');
+        report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed(): ");
+        report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed());
+        report.append('\n');
+        report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity(): ");
+        report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity());
+        report.append('\n');
+        report.append("ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage(): ");
+        report.append(ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage());
+        report.append('\n');
+        report.append("---------------------------- Beans ----------------------------");
+        report.append('\n');
+        List<MemoryPoolMXBean> memPoolBeans = ManagementFactory.getMemoryPoolMXBeans();
+        for (MemoryPoolMXBean bean : memPoolBeans) {
+            if (bean.isValid() && bean.getType() == MemoryType.NON_HEAP) {
+                report.append(bean.getName());
+                report.append(": ");
+                report.append(bean.getUsage());
+                report.append('\n');
+            }
+        }
+        report.append("---------------------------- Netty ----------------------------");
+        report.append('\n');
+        try {
+            Field field = PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER");
+            field.setAccessible(true);
+            AtomicLong usedDirectMemory = (AtomicLong) field.get(null);
+            long used = usedDirectMemory.get();
+            report.append("Current PlatformDependent.DIRECT_MEMORY_COUNTER: ");
+            report.append(used);
+            report.append('\n');
+            report.append("Maximum PlatformDependent.DIRECT_MEMORY_COUNTER: ");
+            maxMemUsage = Math.max(maxMemUsage, used);
+            report.append(maxMemUsage);
+            report.append('\n');
+            report.append('\n');
+        } catch (Throwable th) { // NOSONAR
+            LOGGER.log(Level.WARNING, "Failed to access PlatformDependent.DIRECT_MEMORY_COUNTER", th);
+            return;
+        }
+        report.append("--------------- PooledByteBufAllocator.DEFAULT ----------------");
+        report.append(PooledByteBufAllocator.DEFAULT.dumpStats());
+        LOGGER.log(Level.INFO, report.toString());
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
index bf0452b..5bd2e38 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
@@ -18,13 +18,7 @@
  */
 package org.apache.hyracks.http.servlet;
 
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryPoolMXBean;
-import java.lang.management.MemoryType;
-import java.lang.reflect.Field;
-import java.util.List;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -33,13 +27,10 @@
 import org.apache.hyracks.http.server.AbstractServlet;
 import org.apache.hyracks.http.server.utils.HttpUtil;
 
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.util.internal.PlatformDependent;
 
 public class ChattyServlet extends AbstractServlet {
     private static final Logger LOGGER = Logger.getLogger(ChattyServlet.class.getName());
-    private static long MAX = 0L;
     private byte[] bytes;
 
     public ChattyServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
@@ -66,57 +57,6 @@
         for (int i = 0; i < 100; i++) {
             response.outputStream().write(bytes);
         }
-        printMemUsage();
-    }
-
-    @SuppressWarnings("restriction")
-    public synchronized static void printMemUsage() {
-        StringBuilder report = new StringBuilder();
-        report.append("sun.misc.VM.maxDirectMemory: ");
-        report.append(sun.misc.VM.maxDirectMemory());
-        report.append('\n');
-        report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed(): ");
-        report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed());
-        report.append('\n');
-        report.append("sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity(): ");
-        report.append(sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getTotalCapacity());
-        report.append('\n');
-        report.append("ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage(): ");
-        report.append(ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage());
-        report.append('\n');
-        report.append("---------------------------- Beans ----------------------------");
-        report.append('\n');
-        List<MemoryPoolMXBean> memPoolBeans = ManagementFactory.getMemoryPoolMXBeans();
-        for (MemoryPoolMXBean bean : memPoolBeans) {
-            if (bean.isValid() && bean.getType() == MemoryType.NON_HEAP) {
-                report.append(bean.getName());
-                report.append(": ");
-                report.append(bean.getUsage());
-                report.append('\n');
-            }
-        }
-        report.append("---------------------------- Netty ----------------------------");
-        report.append('\n');
-        try {
-            Field field = PlatformDependent.class.getDeclaredField("DIRECT_MEMORY_COUNTER");
-            field.setAccessible(true);
-            AtomicLong usedDirectMemory = (AtomicLong) field.get(null);
-            long used = usedDirectMemory.get();
-            report.append("Current PlatformDependent.DIRECT_MEMORY_COUNTER: ");
-            report.append(used);
-            report.append('\n');
-            report.append("Maximum PlatformDependent.DIRECT_MEMORY_COUNTER: ");
-            MAX = Math.max(MAX, used);
-            report.append(MAX);
-            report.append('\n');
-            report.append('\n');
-        } catch (Throwable th) {
-            th.printStackTrace();
-            LOGGER.log(Level.WARNING, "Failed to access PlatformDependent.DIRECT_MEMORY_COUNTER", th);
-            return;
-        }
-        report.append("--------------- PooledByteBufAllocator.DEFAULT ----------------");
-        report.append(PooledByteBufAllocator.DEFAULT.dumpStats());
-        LOGGER.log(Level.INFO, report.toString());
+        HttpUtil.printMemUsage();
     }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 66d1b77..a3048ed 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -45,6 +45,7 @@
 import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
 import org.apache.hyracks.http.server.HttpServer;
 import org.apache.hyracks.http.server.WebManager;
+import org.apache.hyracks.http.server.utils.HttpUtil;
 import org.apache.hyracks.http.servlet.ChattyServlet;
 import org.apache.hyracks.http.servlet.SleepyServlet;
 import org.junit.Assert;
@@ -62,6 +63,7 @@
     static final AtomicInteger SUCCESS_COUNT = new AtomicInteger();
     static final AtomicInteger UNAVAILABLE_COUNT = new AtomicInteger();
     static final AtomicInteger OTHER_COUNT = new AtomicInteger();
+    static final AtomicInteger EXCEPTION_COUNT = new AtomicInteger();
     static final List<Future<Void>> FUTURES = new ArrayList<>();
     static final ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -70,6 +72,7 @@
         SUCCESS_COUNT.set(0);
         UNAVAILABLE_COUNT.set(0);
         OTHER_COUNT.set(0);
+        EXCEPTION_COUNT.set(0);
     }
 
     @Test
@@ -77,7 +80,7 @@
         WebManager webMgr = new WebManager();
         int numExecutors = 16;
         int serverQueueSize = 16;
-        int numRequests = 48;
+        int numRequests = 128;
         HttpServer server =
                 new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
         SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
@@ -100,7 +103,9 @@
                 f.get();
             }
             Assert.assertEquals(expectedSuccess, SUCCESS_COUNT.get());
-            Assert.assertEquals(expectedUnavailable, UNAVAILABLE_COUNT.get());
+            Assert.assertEquals(expectedUnavailable, UNAVAILABLE_COUNT.get() + EXCEPTION_COUNT.get());
+            System.err.println("Number of rejections: " + UNAVAILABLE_COUNT.get());
+            System.err.println("Number of exceptions: " + EXCEPTION_COUNT.get());
             Assert.assertEquals(0, OTHER_COUNT.get());
         } catch (Throwable th) {
             th.printStackTrace();
@@ -111,7 +116,7 @@
     }
 
     private void waitTillQueued(HttpServer server, int expectedQueued) throws Exception {
-        int maxAttempts = 5;
+        int maxAttempts = 15;
         int attempt = 0;
         int queued = server.getWorkQueueSize();
         while (queued != expectedQueued) {
@@ -144,7 +149,7 @@
         try {
             try {
                 for (int i = 0; i < numPatches; i++) {
-                    ChattyServlet.printMemUsage();
+                    HttpUtil.printMemUsage();
                     request(numRequests);
                     for (Future<Void> f : FUTURES) {
                         f.get();
@@ -152,14 +157,17 @@
                     FUTURES.clear();
                 }
             } finally {
-                ChattyServlet.printMemUsage();
+                HttpUtil.printMemUsage();
                 servlet.wakeUp();
                 for (Future<Void> f : stuck) {
                     f.get();
                 }
             }
         } finally {
+            System.err.println("Number of rejections: " + UNAVAILABLE_COUNT.get());
+            System.err.println("Number of exceptions: " + EXCEPTION_COUNT.get());
             webMgr.stop();
+            HttpUtil.printMemUsage();
         }
     }
 
@@ -174,7 +182,7 @@
         int numRequests = 64;
         int numExecutors = 32;
         int serverQueueSize = 32;
-        ChattyServlet.printMemUsage();
+        HttpUtil.printMemUsage();
         WebManager webMgr = new WebManager();
         HttpServer server =
                 new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
@@ -191,7 +199,9 @@
             Assert.assertEquals(0, UNAVAILABLE_COUNT.get());
             Assert.assertEquals(0, OTHER_COUNT.get());
         } finally {
+            HttpUtil.printMemUsage();
             webMgr.stop();
+            HttpUtil.printMemUsage();
         }
     }
 
@@ -261,8 +271,8 @@
                     }
                     IOUtils.closeQuietly(in);
                 } catch (Throwable th) {
-                    th.printStackTrace();
-                    throw th;
+                    // Server closed connection before we complete writing..
+                    EXCEPTION_COUNT.incrementAndGet();
                 }
                 return null;
             });
@@ -291,7 +301,7 @@
         URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null);
         RequestBuilder builder = RequestBuilder.post(uri);
         StringBuilder str = new StringBuilder();
-        for (int i = 0; i < 32; i++) {
+        for (int i = 0; i < 2046; i++) {
             str.append("This is a string statement that will be ignored");
             str.append('\n');
         }