[ASTERIXDB-2361][HYR] Memory Leak Due to Netty Close Listeners

- user model changes: no
- storage format changes: no
- interface changes:
  - add IServletResponse.notifyChannelInactive()

Change-Id: I40156538d62a3c06b9ccc14338c3f554921a12b8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2579
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
index 1a7c65f..38f2d23 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
@@ -78,8 +78,12 @@
     ChannelFuture lastContentFuture() throws IOException;
 
     /**
-     * Notifies the response that the channel has become writable
-     * became writable or unwritable. Used for flow control
+     * Notifies the response that the channel has become writable. Used for flow control
      */
     void notifyChannelWritable();
+
+    /**
+     * Notifies the response that the channel has become inactive.
+     */
+    void notifyChannelInactive();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index d5f81e5..d4f1b3d 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -43,12 +43,6 @@
         this.response = response;
         this.ctx = ctx;
         buffer = ctx.alloc().buffer(chunkSize);
-        // register listener for channel closed
-        ctx.channel().closeFuture().addListener(futureListener -> {
-            synchronized (ChunkedNettyOutputStream.this) {
-                ChunkedNettyOutputStream.this.notifyAll();
-            }
-        });
     }
 
     @Override
@@ -128,8 +122,8 @@
     private synchronized void ensureWritable() throws IOException {
         while (!ctx.channel().isWritable()) {
             try {
-                if (!ctx.channel().isOpen()) {
-                    throw new IOException("Closed channel");
+                if (!ctx.channel().isActive()) {
+                    throw new IOException("Inactive channel");
                 }
                 wait();
             } catch (InterruptedException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index 323a463..5a43d25 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -187,4 +187,9 @@
     public void notifyChannelWritable() {
         outputStream.resume();
     }
+
+    @Override
+    public void notifyChannelInactive() {
+        outputStream.resume();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
index 598048e..90e33b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
@@ -105,4 +105,10 @@
         // Do nothing.
         // This response is sent as a single piece
     }
+
+    @Override
+    public void notifyChannelInactive() {
+        // Do nothing.
+        // This response is sent as a single piece
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
index bf8e629..65a082c 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
@@ -82,6 +82,10 @@
         response.notifyChannelWritable();
     }
 
+    public void notifyChannelInactive() {
+        response.notifyChannelInactive();
+    }
+
     public void reject() throws IOException {
         try {
             response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 2787b30..7b3d18a 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -44,6 +44,9 @@
     protected final T server;
     protected final int chunkSize;
     protected HttpRequestHandler handler;
+    protected IChannelClosedHandler closeHandler;
+    protected Future<Void> task;
+    protected IServlet servlet;
 
     public HttpServerHandler(T server, int chunkSize) {
         this.server = server;
@@ -64,10 +67,24 @@
     }
 
     @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        if (handler != null) {
+            handler.notifyChannelInactive();
+        }
+        if (closeHandler != null) {
+            closeHandler.channelClosed(server, servlet, task);
+        }
+        super.channelInactive(ctx);
+    }
+
+    @Override
     protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
         FullHttpRequest request = (FullHttpRequest) msg;
+        handler = null;
+        task = null;
+        closeHandler = null;
         try {
-            IServlet servlet = server.getServlet(request);
+            servlet = server.getServlet(request);
             if (servlet == null) {
                 handleServletNotFound(ctx, request);
             } else {
@@ -94,16 +111,13 @@
             return;
         }
         handler = new HttpRequestHandler(ctx, servlet, servletRequest, chunkSize);
-        submit(ctx, servlet);
+        submit(servlet);
     }
 
-    private void submit(ChannelHandlerContext ctx, IServlet servlet) throws IOException {
+    private void submit(IServlet servlet) throws IOException {
         try {
-            Future<Void> task = server.getExecutor(handler).submit(handler);
-            final IChannelClosedHandler closeHandler = servlet.getChannelClosedHandler(server);
-            if (closeHandler != null) {
-                ctx.channel().closeFuture().addListener(future -> closeHandler.channelClosed(server, servlet, task));
-            }
+            task = server.getExecutor(handler).submit(handler);
+            closeHandler = servlet.getChannelClosedHandler(server);
         } catch (RejectedExecutionException e) { // NOSONAR
             LOGGER.log(Level.WARN, "Request rejected by server executor service. " + e.getMessage());
             handler.reject();