[ASTERIXDB-2361][HYR] Memory Leak Due to Netty Close Listeners
- user model changes: no
- storage format changes: no
- interface changes:
- add IServletResponse.notifyChannelInactive()
Change-Id: I40156538d62a3c06b9ccc14338c3f554921a12b8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2579
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
index 1a7c65f..38f2d23 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServletResponse.java
@@ -78,8 +78,12 @@
ChannelFuture lastContentFuture() throws IOException;
/**
- * Notifies the response that the channel has become writable
- * became writable or unwritable. Used for flow control
+ * Notifies the response that the channel has become writable. Used for flow control
*/
void notifyChannelWritable();
+
+ /**
+ * Notifies the response that the channel has become inactive.
+ */
+ void notifyChannelInactive();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index d5f81e5..d4f1b3d 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -43,12 +43,6 @@
this.response = response;
this.ctx = ctx;
buffer = ctx.alloc().buffer(chunkSize);
- // register listener for channel closed
- ctx.channel().closeFuture().addListener(futureListener -> {
- synchronized (ChunkedNettyOutputStream.this) {
- ChunkedNettyOutputStream.this.notifyAll();
- }
- });
}
@Override
@@ -128,8 +122,8 @@
private synchronized void ensureWritable() throws IOException {
while (!ctx.channel().isWritable()) {
try {
- if (!ctx.channel().isOpen()) {
- throw new IOException("Closed channel");
+ if (!ctx.channel().isActive()) {
+ throw new IOException("Inactive channel");
}
wait();
} catch (InterruptedException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index 323a463..5a43d25 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -187,4 +187,9 @@
public void notifyChannelWritable() {
outputStream.resume();
}
+
+ @Override
+ public void notifyChannelInactive() {
+ outputStream.resume();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
index 598048e..90e33b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
@@ -105,4 +105,10 @@
// Do nothing.
// This response is sent as a single piece
}
+
+ @Override
+ public void notifyChannelInactive() {
+ // Do nothing.
+ // This response is sent as a single piece
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
index bf8e629..65a082c 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
@@ -82,6 +82,10 @@
response.notifyChannelWritable();
}
+ public void notifyChannelInactive() {
+ response.notifyChannelInactive();
+ }
+
public void reject() throws IOException {
try {
response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 2787b30..7b3d18a 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -44,6 +44,9 @@
protected final T server;
protected final int chunkSize;
protected HttpRequestHandler handler;
+ protected IChannelClosedHandler closeHandler;
+ protected Future<Void> task;
+ protected IServlet servlet;
public HttpServerHandler(T server, int chunkSize) {
this.server = server;
@@ -64,10 +67,24 @@
}
@Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ if (handler != null) {
+ handler.notifyChannelInactive();
+ }
+ if (closeHandler != null) {
+ closeHandler.channelClosed(server, servlet, task);
+ }
+ super.channelInactive(ctx);
+ }
+
+ @Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
FullHttpRequest request = (FullHttpRequest) msg;
+ handler = null;
+ task = null;
+ closeHandler = null;
try {
- IServlet servlet = server.getServlet(request);
+ servlet = server.getServlet(request);
if (servlet == null) {
handleServletNotFound(ctx, request);
} else {
@@ -94,16 +111,13 @@
return;
}
handler = new HttpRequestHandler(ctx, servlet, servletRequest, chunkSize);
- submit(ctx, servlet);
+ submit(servlet);
}
- private void submit(ChannelHandlerContext ctx, IServlet servlet) throws IOException {
+ private void submit(IServlet servlet) throws IOException {
try {
- Future<Void> task = server.getExecutor(handler).submit(handler);
- final IChannelClosedHandler closeHandler = servlet.getChannelClosedHandler(server);
- if (closeHandler != null) {
- ctx.channel().closeFuture().addListener(future -> closeHandler.channelClosed(server, servlet, task));
- }
+ task = server.getExecutor(handler).submit(handler);
+ closeHandler = servlet.getChannelClosedHandler(server);
} catch (RejectedExecutionException e) { // NOSONAR
LOGGER.log(Level.WARN, "Request rejected by server executor service. " + e.getMessage());
handler.reject();