[ASTERIXDB-2018][NET] Add HTTP close channel listener

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Add a listener on HTTP channel to notify waiters when the
channel is closed.

Change-Id: I08e4890880a70d42e3e181366a5b0252102d0ea2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1917
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index 6509dcc..3456343 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -40,6 +40,12 @@
         this.response = response;
         this.ctx = ctx;
         buffer = ctx.alloc().buffer(chunkSize);
+        // register listener for channel closed
+        ctx.channel().closeFuture().addListener(futureListener -> {
+            synchronized (ChunkedNettyOutputStream.this) {
+                ChunkedNettyOutputStream.this.notifyAll();
+            }
+        });
     }
 
     @Override
@@ -76,8 +82,11 @@
     public void close() throws IOException {
         if (!closed) {
             if (response.isHeaderSent() || response.status() != HttpResponseStatus.OK) {
-                flush();
-                buffer.release();
+                try {
+                    flush();
+                } finally {
+                    buffer.release();
+                }
             } else {
                 response.fullReponse(buffer);
             }
@@ -121,6 +130,9 @@
         while (!ctx.channel().isWritable()) {
             try {
                 ctx.flush();
+                if (!ctx.channel().isOpen()) {
+                    throw new IOException("Closed channel");
+                }
                 wait();
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();