Add HTTP server overload test
Change-Id: I3f6e594b2acb4b5a8ad3118c9b492d0767980e61
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1531
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/pom.xml b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
index fb2a39d..af61ce8 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
@@ -33,5 +33,20 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
index 418cd26..aa0f32a 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
@@ -81,4 +81,8 @@
response.notifyChannelWritable();
}
+ public void reject() throws IOException {
+ response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
+ response.close();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index ad8ff41..2f35b6a 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -24,7 +24,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -66,12 +68,18 @@
private Throwable cause;
public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port) {
+ this(bossGroup, workerGroup, port, 16, 256);
+ }
+
+ public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, int numExecutorThreads,
+ int requestQueueSize) {
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.port = port;
ctx = new ConcurrentHashMap<>();
servlets = new ArrayList<>();
- executor = Executors.newFixedThreadPool(16,
+ executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(requestQueueSize),
runnable -> new Thread(runnable, "HttpExecutor(port:" + port + ")-" + threadId.getAndIncrement()));
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index c0e7353..743800e 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.http.server;
+import java.io.IOException;
+import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -67,7 +69,7 @@
ctx.write(notFound).addListener(ChannelFutureListener.CLOSE);
} else {
handler = new HttpRequestHandler(ctx, servlet, HttpUtil.toServletRequest(request), chunkSize);
- server.getExecutor().submit(handler);
+ submit();
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", e);
@@ -75,6 +77,15 @@
}
}
+ private void submit() throws IOException {
+ try {
+ server.getExecutor().submit(handler);
+ } catch (RejectedExecutionException e) { // NOSONAR
+ LOGGER.log(Level.WARNING, "Request rejected by server executor service. " + e.getMessage());
+ handler.reject();
+ }
+ }
+
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", cause);
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SlowServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SlowServlet.java
new file mode 100644
index 0000000..065d803
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SlowServlet.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.servlet;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class SlowServlet extends AbstractServlet {
+ public SlowServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
+ super(ctx, paths);
+ }
+
+ @Override
+ protected void get(IServletRequest request, IServletResponse response) throws Exception {
+ response.setStatus(HttpResponseStatus.OK);
+ Thread.sleep(5000); // NOSONAR
+ HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, HttpUtil.Encoding.UTF8);
+ response.outputStream().write("I am playing hard to get".getBytes(StandardCharsets.UTF_8));
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
new file mode 100644
index 0000000..20e6fe7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.test;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
+import org.apache.hyracks.http.server.HttpServer;
+import org.apache.hyracks.http.server.WebManager;
+import org.apache.hyracks.http.servlet.SlowServlet;
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class HttpServerTest {
+ static final int PORT = 9898;
+ static final int NUM_EXECUTOR_THREADS = 16;
+ static final int SERVER_QUEUE_SIZE = 16;
+ static final int NUM_OF_REQUESTS = 48;
+ static final String HOST = "localhost";
+ static final String PROTOCOL = "http";
+ static final String PATH = "/";
+ static final AtomicInteger SUCCESS_COUNT = new AtomicInteger();
+ static final AtomicInteger UNAVAILABLE_COUNT = new AtomicInteger();
+ static final AtomicInteger OTHER_COUNT = new AtomicInteger();
+ static final List<Thread> THREADS = new ArrayList<>();
+
+ @Test
+ public void testOverloadingServer() throws Exception {
+ WebManager webMgr = new WebManager();
+ HttpServer server =
+ new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, NUM_EXECUTOR_THREADS, SERVER_QUEUE_SIZE);
+ SlowServlet servlet = new SlowServlet(server.ctx(), new String[] { PATH });
+ server.addServlet(servlet);
+ webMgr.add(server);
+ webMgr.start();
+ try {
+ request(NUM_OF_REQUESTS);
+ for (Thread thread : THREADS) {
+ thread.join();
+ }
+ Assert.assertEquals(32, SUCCESS_COUNT.get());
+ Assert.assertEquals(16, UNAVAILABLE_COUNT.get());
+ Assert.assertEquals(0, OTHER_COUNT.get());
+ } finally {
+ webMgr.stop();
+ }
+ }
+
+ private void request(int count) {
+ for (int i = 0; i < count; i++) {
+ Thread next = new Thread(() -> {
+ try {
+ HttpUriRequest request = request();
+ HttpResponse response = executeHttpRequest(request);
+ if (response.getStatusLine().getStatusCode() == HttpResponseStatus.OK.code()) {
+ SUCCESS_COUNT.incrementAndGet();
+ } else if (response.getStatusLine().getStatusCode() == HttpResponseStatus.SERVICE_UNAVAILABLE
+ .code()) {
+ UNAVAILABLE_COUNT.incrementAndGet();
+ } else {
+ OTHER_COUNT.incrementAndGet();
+ }
+ InputStream responseStream = response.getEntity().getContent();
+ IOUtils.closeQuietly(responseStream);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ }
+ });
+ THREADS.add(next);
+ next.start();
+ }
+ }
+
+ protected HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception {
+ HttpClient client = HttpClients.custom().setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE).build();
+ try {
+ return client.execute(method);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ protected HttpUriRequest request() throws URISyntaxException {
+ URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, null, null);
+ RequestBuilder builder = RequestBuilder.get(uri);
+ builder.setCharset(StandardCharsets.UTF_8);
+ return builder.build();
+ }
+}