[NO ISSUE][OTH] Enable adding request channel close listener
- user model changes: no
- storage format changes: no
- interface changes: yes
- Introduce IChannelCloseHandler.handle that gets called when
the request channel is closed.
- Add HttpServer.getChannelCloseHandler
- Add IServlet.getChannelCloseHandler
details:
- Previously, we didn't know that an Http client closed the
connection until we try to write and find that the channel has
been closed.
- After this change, the moment the channel is closed, the http
channel close handler is called.
- A test is added with a handler that interrupts the execution.
Change-Id: I42f1857c0158af6f447282cab8fbd600767b08d5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1972
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IChannelClosedHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IChannelClosedHandler.java
new file mode 100644
index 0000000..4e433ad
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IChannelClosedHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.api;
+
+import java.util.concurrent.Future;
+
+import org.apache.hyracks.http.server.HttpServer;
+
+@FunctionalInterface
+public interface IChannelClosedHandler {
+
+ /**
+ * Handle a request channel closed event
+ *
+ * @param server
+ * the server handling the request
+ * @param servlet
+ * the servlet handling the request
+ * @param task
+ * the task handling the request
+ */
+ void channelClosed(HttpServer server, IServlet servlet, Future<Void> task);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
index 157eef5..186fb0e 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/api/IServlet.java
@@ -20,6 +20,8 @@
import java.util.concurrent.ConcurrentMap;
+import org.apache.hyracks.http.server.HttpServer;
+
/**
* Represents a component that handles IServlet requests
*/
@@ -42,4 +44,15 @@
* @param response
*/
void handle(IServletRequest request, IServletResponse response);
+
+ /**
+ * Get the handler for channel close events
+ *
+ * @param server
+ * the http server
+ * @return the handler for channel close events
+ */
+ default IChannelClosedHandler getChannelClosedHandler(HttpServer server) {
+ return server.getChannelClosedHandler();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 8ce1d70..d971a7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hyracks.http.api.IChannelClosedHandler;
import org.apache.hyracks.http.api.IServlet;
import org.apache.hyracks.util.ThreadDumpUtil;
import org.apache.logging.log4j.Level;
@@ -62,6 +63,7 @@
private static final int STARTED = 2;
private static final int STOPPING = 3;
// Final members
+ private final IChannelClosedHandler closedHandler;
private final Object lock = new Object();
private final AtomicInteger threadId = new AtomicInteger();
private final ConcurrentMap<String, Object> ctx;
@@ -78,14 +80,25 @@
private Throwable cause;
public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port) {
- this(bossGroup, workerGroup, port, DEFAULT_NUM_EXECUTOR_THREADS, DEFAULT_REQUEST_QUEUE_SIZE);
+ this(bossGroup, workerGroup, port, DEFAULT_NUM_EXECUTOR_THREADS, DEFAULT_REQUEST_QUEUE_SIZE, null);
+ }
+
+ public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port,
+ IChannelClosedHandler closeHandler) {
+ this(bossGroup, workerGroup, port, DEFAULT_NUM_EXECUTOR_THREADS, DEFAULT_REQUEST_QUEUE_SIZE, closeHandler);
}
public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, int numExecutorThreads,
int requestQueueSize) {
+ this(bossGroup, workerGroup, port, numExecutorThreads, requestQueueSize, null);
+ }
+
+ public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, int numExecutorThreads,
+ int requestQueueSize, IChannelClosedHandler closeHandler) {
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.port = port;
+ this.closedHandler = closeHandler;
ctx = new ConcurrentHashMap<>();
servlets = new ArrayList<>();
workQueue = new LinkedBlockingQueue<>(requestQueueSize);
@@ -378,6 +391,10 @@
return workQueue.size();
}
+ public IChannelClosedHandler getChannelClosedHandler() {
+ return closedHandler;
+ }
+
@Override
public String toString() {
return "{\"class\":\"" + getClass().getSimpleName() + "\",\"port\":" + port + ",\"state\":\"" + getState()
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 9290cdf..2787b30 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -19,8 +19,10 @@
package org.apache.hyracks.http.server;
import java.io.IOException;
+import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import org.apache.hyracks.http.api.IChannelClosedHandler;
import org.apache.hyracks.http.api.IServlet;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.server.utils.HttpUtil;
@@ -92,12 +94,16 @@
return;
}
handler = new HttpRequestHandler(ctx, servlet, servletRequest, chunkSize);
- submit();
+ submit(ctx, servlet);
}
- private void submit() throws IOException {
+ private void submit(ChannelHandlerContext ctx, IServlet servlet) throws IOException {
try {
- server.getExecutor(handler).submit(handler);
+ Future<Void> task = server.getExecutor(handler).submit(handler);
+ final IChannelClosedHandler closeHandler = servlet.getChannelClosedHandler(server);
+ if (closeHandler != null) {
+ ctx.channel().closeFuture().addListener(future -> closeHandler.channelClosed(server, servlet, task));
+ }
} catch (RejectedExecutionException e) { // NOSONAR
LOGGER.log(Level.WARN, "Request rejected by server executor service. " + e.getMessage());
handler.reject();
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
index 6bfa0cf..2a5a0a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
@@ -31,6 +31,7 @@
public class SleepyServlet extends AbstractServlet {
private volatile boolean sleep = true;
+ private int numSlept = 0;
public SleepyServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
super(ctx, paths);
@@ -46,8 +47,11 @@
response.setStatus(HttpResponseStatus.OK);
if (sleep) {
synchronized (this) {
- while (sleep) {
- this.wait();
+ if (sleep) {
+ incrementSleptCount();
+ while (sleep) {
+ this.wait();
+ }
}
}
}
@@ -55,6 +59,15 @@
response.outputStream().write("I am playing hard to get".getBytes(StandardCharsets.UTF_8));
}
+ private void incrementSleptCount() {
+ numSlept++;
+ notifyAll();
+ }
+
+ public int getNumSlept() {
+ return numSlept;
+ }
+
public synchronized void wakeUp() {
sleep = false;
notifyAll();
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
new file mode 100644
index 0000000..17f6f9a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpRequestTask.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.test;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class HttpRequestTask implements Callable<Void> {
+
+ protected final HttpUriRequest request;
+
+ protected HttpRequestTask() throws URISyntaxException {
+ request = post(null);
+ }
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ HttpResponse response = executeHttpRequest(request);
+ if (response.getStatusLine().getStatusCode() == HttpResponseStatus.OK.code()) {
+ HttpServerTest.SUCCESS_COUNT.incrementAndGet();
+ } else if (response.getStatusLine().getStatusCode() == HttpResponseStatus.SERVICE_UNAVAILABLE.code()) {
+ HttpServerTest.UNAVAILABLE_COUNT.incrementAndGet();
+ } else {
+ HttpServerTest.OTHER_COUNT.incrementAndGet();
+ }
+ InputStream in = response.getEntity().getContent();
+ if (HttpServerTest.PRINT_TO_CONSOLE) {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ System.out.println(line);
+ }
+ }
+ IOUtils.closeQuietly(in);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
+ }
+ return null;
+ }
+
+ protected HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception {
+ HttpClient client = HttpClients.custom().setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE).build();
+ try {
+ return client.execute(method);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ protected HttpUriRequest get(String query) throws URISyntaxException {
+ URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, HttpServerTest.PORT, HttpServerTest.PATH,
+ query, null);
+ RequestBuilder builder = RequestBuilder.get(uri);
+ builder.setCharset(StandardCharsets.UTF_8);
+ return builder.build();
+ }
+
+ protected HttpUriRequest post(String query) throws URISyntaxException {
+ URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, HttpServerTest.PORT, HttpServerTest.PATH,
+ query, null);
+ RequestBuilder builder = RequestBuilder.post(uri);
+ StringBuilder str = new StringBuilder();
+ for (int i = 0; i < 32; i++) {
+ str.append("This is a string statement that will be ignored");
+ str.append('\n');
+ }
+ String statement = str.toString();
+ builder.setHeader("Content-type", "application/x-www-form-urlencoded");
+ builder.addParameter("statement", statement);
+ builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8));
+ builder.setCharset(StandardCharsets.UTF_8);
+ return builder.build();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 298d2de..7e6ccf4 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -19,15 +19,12 @@
package org.apache.hyracks.http.test;
import java.io.BufferedReader;
-import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.Socket;
-import java.net.URI;
import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -35,14 +32,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.methods.RequestBuilder;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
import org.apache.hyracks.http.server.HttpServer;
import org.apache.hyracks.http.server.WebManager;
import org.apache.hyracks.http.server.utils.HttpUtil;
@@ -69,6 +58,7 @@
static final AtomicInteger UNAVAILABLE_COUNT = new AtomicInteger();
static final AtomicInteger OTHER_COUNT = new AtomicInteger();
static final AtomicInteger EXCEPTION_COUNT = new AtomicInteger();
+ static final List<HttpRequestTask> TASKS = new ArrayList<>();
static final List<Future<Void>> FUTURES = new ArrayList<>();
static final ExecutorService executor = Executors.newCachedThreadPool();
@@ -78,6 +68,8 @@
UNAVAILABLE_COUNT.set(0);
OTHER_COUNT.set(0);
EXCEPTION_COUNT.set(0);
+ FUTURES.clear();
+ TASKS.clear();
}
@Test
@@ -303,76 +295,57 @@
}
}
+ @Test
+ public void testInterruptOnClientClose() throws Exception {
+ WebManager webMgr = new WebManager();
+ int numExecutors = 1;
+ int queueSize = 1;
+ HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, queueSize,
+ (reqServer, reqServlet, reqTask) -> reqTask.cancel(true));
+ SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
+ server.addServlet(servlet);
+ webMgr.add(server);
+ webMgr.start();
+ try {
+ request(1);
+ synchronized (servlet) {
+ while (servlet.getNumSlept() == 0) {
+ servlet.wait();
+ }
+ }
+ request(1);
+ waitTillQueued(server, 1);
+ FUTURES.remove(0);
+ HttpRequestTask request = TASKS.remove(0);
+ request.request.abort();
+ waitTillQueued(server, 0);
+ synchronized (servlet) {
+ while (servlet.getNumSlept() == 1) {
+ servlet.wait();
+ }
+ }
+ servlet.wakeUp();
+ for (Future<Void> f : FUTURES) {
+ f.get();
+ }
+ FUTURES.clear();
+ } finally {
+ webMgr.stop();
+ }
+ }
+
public static void setPrivateField(Object obj, String filedName, Object value) throws Exception {
Field f = obj.getClass().getDeclaredField(filedName);
f.setAccessible(true);
f.set(obj, value);
}
- private void request(int count) {
+ private void request(int count) throws URISyntaxException {
for (int i = 0; i < count; i++) {
- Future<Void> next = executor.submit(() -> {
- try {
- HttpUriRequest request = post(null);
- HttpResponse response = executeHttpRequest(request);
- if (response.getStatusLine().getStatusCode() == HttpResponseStatus.OK.code()) {
- SUCCESS_COUNT.incrementAndGet();
- } else if (response.getStatusLine().getStatusCode() == HttpResponseStatus.SERVICE_UNAVAILABLE
- .code()) {
- UNAVAILABLE_COUNT.incrementAndGet();
- } else {
- OTHER_COUNT.incrementAndGet();
- }
- InputStream in = response.getEntity().getContent();
- if (PRINT_TO_CONSOLE) {
- BufferedReader reader = new BufferedReader(new InputStreamReader(in));
- String line = null;
- while ((line = reader.readLine()) != null) {
- System.out.println(line);
- }
- }
- IOUtils.closeQuietly(in);
- } catch (Throwable th) {
- // Server closed connection before we complete writing..
- EXCEPTION_COUNT.incrementAndGet();
- }
- return null;
- });
+ HttpRequestTask requestTask = new HttpRequestTask();
+ Future<Void> next = executor.submit(requestTask);
FUTURES.add(next);
+ TASKS.add(requestTask);
}
}
-
- public static HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception {
- HttpClient client = HttpClients.custom().setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE).build();
- try {
- return client.execute(method);
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- }
-
- public static HttpUriRequest get(String protocol, String host, int port, String path, String query)
- throws URISyntaxException {
- URI uri = new URI(protocol, null, host, port, path, query, null);
- RequestBuilder builder = RequestBuilder.get(uri);
- builder.setCharset(StandardCharsets.UTF_8);
- return builder.build();
- }
-
- protected HttpUriRequest post(String query) throws URISyntaxException {
- URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null);
- RequestBuilder builder = RequestBuilder.post(uri);
- StringBuilder str = new StringBuilder();
- for (int i = 0; i < 2046; i++) {
- str.append("This is a string statement that will be ignored");
- str.append('\n');
- }
- String statement = str.toString();
- builder.setHeader("Content-type", "application/x-www-form-urlencoded");
- builder.addParameter("statement", statement);
- builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8));
- builder.setCharset(StandardCharsets.UTF_8);
- return builder.build();
- }
}