[NO ISSUE][HYR][HTTP] Prevent Netty buffer leaks on close() errors

Change-Id: Ibe5836eb61b3f01b8d95820f20c55269e98a3118
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17624
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Murtadha Al Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index 7d04cf2..57f9750 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.util.ComputingAction;
 import org.apache.hyracks.util.IDelay;
 import org.apache.hyracks.util.IOInterruptibleAction;
+import org.apache.hyracks.util.IOThrowingAction;
 import org.apache.hyracks.util.IRetryPolicy;
 import org.apache.hyracks.util.InterruptibleAction;
 import org.apache.hyracks.util.Span;
@@ -188,7 +189,7 @@
         }
     }
 
-    @SuppressWarnings({ "squid:S1181", "squid:S1193" }) // catching Throwable, instanceof of exception
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs
     public static void tryWithCleanups(ThrowingAction action, ThrowingAction... cleanups) throws Exception {
         Throwable savedT = null;
         boolean suppressedInterrupted = false;
@@ -225,6 +226,43 @@
         }
     }
 
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs
+    public static void tryIoWithCleanups(IOThrowingAction action, IOThrowingAction... cleanups) throws IOException {
+        Throwable savedT = null;
+        boolean suppressedInterrupted = false;
+        try {
+            action.run();
+        } catch (Throwable t) {
+            savedT = t;
+        } finally {
+            for (IOThrowingAction cleanup : cleanups) {
+                try {
+                    cleanup.run();
+                } catch (Throwable t) {
+                    if (savedT != null) {
+                        savedT.addSuppressed(t);
+                        suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException;
+                    } else {
+                        savedT = t;
+                    }
+                }
+            }
+        }
+        if (savedT == null) {
+            return;
+        }
+        if (suppressedInterrupted) {
+            Thread.currentThread().interrupt();
+        }
+        if (savedT instanceof Error) {
+            throw (Error) savedT;
+        } else if (savedT instanceof IOException) {
+            throw (IOException) savedT;
+        } else {
+            throw HyracksDataException.create(savedT);
+        }
+    }
+
     /**
      * Runs the supplied action, after suspending any pending interruption. An error will be logged if
      * the action is itself interrupted.
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index e00c519..740af2f 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -25,6 +25,8 @@
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.utils.HttpUtil;
 import org.apache.logging.log4j.Level;
@@ -45,6 +47,7 @@
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.util.ReferenceCountUtil;
 
 /**
  * A chunked http response. Here is how it is expected to work:
@@ -114,28 +117,40 @@
 
     @Override
     public void close() throws IOException {
-        if (writer != null) {
-            writer.close();
-        } else {
-            outputStream.close();
-        }
-        if (errorBuf == null && response.status() == HttpResponseStatus.OK) {
-            if (!done) {
-                respond(LastHttpContent.EMPTY_LAST_CONTENT);
-            }
-        } else {
-            // There was an error
-            if (headerSent) {
-                LOGGER.log(Level.WARN, "Error after header write of chunked response");
-                if (errorBuf != null) {
-                    errorBuf.release();
+        try {
+            InvokeUtil.tryIoWithCleanups(() -> {
+                if (writer != null) {
+                    writer.close();
+                } else {
+                    outputStream.close();
                 }
-                future = ctx.channel().close().addListener(handler);
-            } else {
-                // we didn't send anything to the user, we need to send an non-chunked error response
-                fullResponse(response.protocolVersion(), response.status(),
-                        errorBuf == null ? ctx.alloc().buffer(0, 0) : errorBuf, response.headers());
-            }
+                if (errorBuf == null && response.status() == HttpResponseStatus.OK) {
+                    if (!done) {
+                        respond(LastHttpContent.EMPTY_LAST_CONTENT);
+                    }
+                } else {
+                    // There was an error
+                    if (headerSent) {
+                        LOGGER.log(Level.WARN, "Error after header write of chunked response");
+                        future = ctx.channel().close().addListener(handler);
+                    } else {
+                        // we didn't send anything to the user, we need to send an non-chunked error response
+                        fullResponse(response.protocolVersion(), response.status(),
+                                errorBuf == null ? ctx.alloc().buffer(0, 0) : errorBuf, response.headers());
+                        // The responsibility of releasing the error buffer is now with the netty pipeline since it is
+                        // forwarded within the http content. We must nullify buffer to avoid releasing the buffer twice.
+                        errorBuf = null;
+                    }
+                }
+            }, outputStream::close, () -> {
+                ReferenceCountUtil.release(errorBuf);
+                // We must nullify buffer to avoid releasing the buffer twice in case of duplicate close()
+                errorBuf = null;
+            });
+        } catch (IOException e) {
+            throw e;
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
         }
         done = true;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java
new file mode 100644
index 0000000..b9430d8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface IOThrowingAction {
+    void run() throws IOException; // NOSONAR
+
+    static ComputingAction<Void> asComputingAction(IOThrowingAction action) {
+        return () -> {
+            action.run();
+            return null;
+        };
+    }
+}