[NO ISSUE][HYR][HTTP] Prevent Netty buffer leaks on close() errors
Change-Id: Ibe5836eb61b3f01b8d95820f20c55269e98a3118
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17624
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Murtadha Al Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index 7d04cf2..57f9750 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.util.ComputingAction;
import org.apache.hyracks.util.IDelay;
import org.apache.hyracks.util.IOInterruptibleAction;
+import org.apache.hyracks.util.IOThrowingAction;
import org.apache.hyracks.util.IRetryPolicy;
import org.apache.hyracks.util.InterruptibleAction;
import org.apache.hyracks.util.Span;
@@ -188,7 +189,7 @@
}
}
- @SuppressWarnings({ "squid:S1181", "squid:S1193" }) // catching Throwable, instanceof of exception
+ @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs
public static void tryWithCleanups(ThrowingAction action, ThrowingAction... cleanups) throws Exception {
Throwable savedT = null;
boolean suppressedInterrupted = false;
@@ -225,6 +226,43 @@
}
}
+ @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs
+ public static void tryIoWithCleanups(IOThrowingAction action, IOThrowingAction... cleanups) throws IOException {
+ Throwable savedT = null;
+ boolean suppressedInterrupted = false;
+ try {
+ action.run();
+ } catch (Throwable t) {
+ savedT = t;
+ } finally {
+ for (IOThrowingAction cleanup : cleanups) {
+ try {
+ cleanup.run();
+ } catch (Throwable t) {
+ if (savedT != null) {
+ savedT.addSuppressed(t);
+ suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException;
+ } else {
+ savedT = t;
+ }
+ }
+ }
+ }
+ if (savedT == null) {
+ return;
+ }
+ if (suppressedInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ if (savedT instanceof Error) {
+ throw (Error) savedT;
+ } else if (savedT instanceof IOException) {
+ throw (IOException) savedT;
+ } else {
+ throw HyracksDataException.create(savedT);
+ }
+ }
+
/**
* Runs the supplied action, after suspending any pending interruption. An error will be logged if
* the action is itself interrupted.
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index e00c519..740af2f 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -25,6 +25,8 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.utils.HttpUtil;
import org.apache.logging.log4j.Level;
@@ -45,6 +47,7 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.util.ReferenceCountUtil;
/**
* A chunked http response. Here is how it is expected to work:
@@ -114,28 +117,40 @@
@Override
public void close() throws IOException {
- if (writer != null) {
- writer.close();
- } else {
- outputStream.close();
- }
- if (errorBuf == null && response.status() == HttpResponseStatus.OK) {
- if (!done) {
- respond(LastHttpContent.EMPTY_LAST_CONTENT);
- }
- } else {
- // There was an error
- if (headerSent) {
- LOGGER.log(Level.WARN, "Error after header write of chunked response");
- if (errorBuf != null) {
- errorBuf.release();
+ try {
+ InvokeUtil.tryIoWithCleanups(() -> {
+ if (writer != null) {
+ writer.close();
+ } else {
+ outputStream.close();
}
- future = ctx.channel().close().addListener(handler);
- } else {
- // we didn't send anything to the user, we need to send an non-chunked error response
- fullResponse(response.protocolVersion(), response.status(),
- errorBuf == null ? ctx.alloc().buffer(0, 0) : errorBuf, response.headers());
- }
+ if (errorBuf == null && response.status() == HttpResponseStatus.OK) {
+ if (!done) {
+ respond(LastHttpContent.EMPTY_LAST_CONTENT);
+ }
+ } else {
+ // There was an error
+ if (headerSent) {
+ LOGGER.log(Level.WARN, "Error after header write of chunked response");
+ future = ctx.channel().close().addListener(handler);
+ } else {
+ // we didn't send anything to the user, we need to send an non-chunked error response
+ fullResponse(response.protocolVersion(), response.status(),
+ errorBuf == null ? ctx.alloc().buffer(0, 0) : errorBuf, response.headers());
+ // The responsibility of releasing the error buffer is now with the netty pipeline since it is
+ // forwarded within the http content. We must nullify buffer to avoid releasing the buffer twice.
+ errorBuf = null;
+ }
+ }
+ }, outputStream::close, () -> {
+ ReferenceCountUtil.release(errorBuf);
+ // We must nullify buffer to avoid releasing the buffer twice in case of duplicate close()
+ errorBuf = null;
+ });
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
}
done = true;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java
new file mode 100644
index 0000000..b9430d8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface IOThrowingAction {
+ void run() throws IOException; // NOSONAR
+
+ static ComputingAction<Void> asComputingAction(IOThrowingAction action) {
+ return () -> {
+ action.run();
+ return null;
+ };
+ }
+}