[ASTERIXDB-2052][OTH] Release resources on http request rejection
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- When a request is rejected, we release its resources.
- A test case was added which sends 3500+ rejected requests and
causes the server to throw out of memory error prior to this
fix.
Change-Id: Ia0e3f3e6e2f94a31f296b3491a07f624a4fea604
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1955
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 1cec616..1f1d282 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -62,6 +62,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+
import io.netty.handler.codec.http.HttpResponseStatus;
public class QueryServiceServlet extends AbstractQueryApiServlet {
@@ -93,6 +94,12 @@
// Servlet methods should not throw exceptions
// http://cwe.mitre.org/data/definitions/600.html
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
+ } catch (Throwable th) {// NOSONAR: Logging and re-throwing
+ try {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, th.getMessage(), th);
+ } catch (Throwable ignored) { // NOSONAR: Logging failure
+ }
+ throw th;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/pom.xml b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
index 6439adb..cb69caa 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
@@ -27,6 +27,9 @@
<artifactId>hyracks-http</artifactId>
<properties>
<root.dir>${basedir}/../..</root.dir>
+ <direct.mem>-XX:MaxDirectMemorySize</direct.mem>
+ <num.arenas>-Dio.netty.allocator.numDirectArenas</num.arenas>
+ <max.order>-Dio.netty.allocator.maxOrder</max.order>
</properties>
<build>
<plugins>
@@ -34,7 +37,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
- <argLine>-XX:MaxDirectMemorySize=16M</argLine>
+ <argLine>${direct.mem}=16M ${num.arenas}=4 ${max.order}=7</argLine>
</configuration>
</plugin>
</plugins>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index 64be051..47714ae 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -113,7 +113,7 @@
} else {
// There was an error
if (headerSent) {
- LOGGER.log(Level.WARNING,"Error after header write of chunked response");
+ LOGGER.log(Level.WARNING, "Error after header write of chunked response");
if (error != null) {
error.release();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
index aa0f32a..cabb01f 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
@@ -82,7 +82,11 @@
}
public void reject() throws IOException {
- response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
- response.close();
+ try {
+ response.setStatus(HttpResponseStatus.SERVICE_UNAVAILABLE);
+ response.close();
+ } finally {
+ request.getHttpRequest().release();
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 6ceafc6..45634ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -60,6 +60,7 @@
private final Object lock = new Object();
private final AtomicInteger threadId = new AtomicInteger();
private final ConcurrentMap<String, Object> ctx;
+ private final LinkedBlockingQueue<Runnable> workQueue;
private final List<IServlet> servlets;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
@@ -81,8 +82,8 @@
this.port = port;
ctx = new ConcurrentHashMap<>();
servlets = new ArrayList<>();
- executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(requestQueueSize),
+ workQueue = new LinkedBlockingQueue<>(requestQueueSize);
+ executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue,
runnable -> new Thread(runnable, "HttpExecutor(port:" + port + ")-" + threadId.getAndIncrement()));
long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK
+ numExecutorThreads * HttpServerInitializer.RESPONSE_CHUNK_SIZE;
@@ -270,4 +271,8 @@
protected EventLoopGroup getWorkerGroup() {
return workerGroup;
}
+
+ public int getWorkQueueSize() {
+ return workQueue.size();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
index 863eddd..bf0452b 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
@@ -54,6 +54,11 @@
}
@Override
+ protected void post(IServletRequest request, IServletResponse response) throws Exception {
+ get(request, response);
+ }
+
+ @Override
protected void get(IServletRequest request, IServletResponse response) throws Exception {
response.setStatus(HttpResponseStatus.OK);
HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, HttpUtil.Encoding.UTF8);
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SlowServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
similarity index 71%
rename from hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SlowServlet.java
rename to hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
index 065d803..6bfa0cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SlowServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/SleepyServlet.java
@@ -28,16 +28,39 @@
import io.netty.handler.codec.http.HttpResponseStatus;
-public class SlowServlet extends AbstractServlet {
- public SlowServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
+public class SleepyServlet extends AbstractServlet {
+
+ private volatile boolean sleep = true;
+
+ public SleepyServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
super(ctx, paths);
}
@Override
+ protected void post(IServletRequest request, IServletResponse response) throws Exception {
+ get(request, response);
+ }
+
+ @Override
protected void get(IServletRequest request, IServletResponse response) throws Exception {
response.setStatus(HttpResponseStatus.OK);
- Thread.sleep(5000); // NOSONAR
+ if (sleep) {
+ synchronized (this) {
+ while (sleep) {
+ this.wait();
+ }
+ }
+ }
HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, HttpUtil.Encoding.UTF8);
response.outputStream().write("I am playing hard to get".getBytes(StandardCharsets.UTF_8));
}
+
+ public synchronized void wakeUp() {
+ sleep = false;
+ notifyAll();
+ }
+
+ public void sleep() {
+ sleep = true;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 6512dc1..66d1b77 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -30,6 +30,9 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
@@ -37,12 +40,13 @@
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.methods.RequestBuilder;
+import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
import org.apache.hyracks.http.server.HttpServer;
import org.apache.hyracks.http.server.WebManager;
import org.apache.hyracks.http.servlet.ChattyServlet;
-import org.apache.hyracks.http.servlet.SlowServlet;
+import org.apache.hyracks.http.servlet.SleepyServlet;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -52,16 +56,14 @@
public class HttpServerTest {
static final boolean PRINT_TO_CONSOLE = false;
static final int PORT = 9898;
- static final int NUM_EXECUTOR_THREADS = 16;
- static final int SERVER_QUEUE_SIZE = 16;
- static final int NUM_OF_REQUESTS = 48;
static final String HOST = "localhost";
static final String PROTOCOL = "http";
static final String PATH = "/";
static final AtomicInteger SUCCESS_COUNT = new AtomicInteger();
static final AtomicInteger UNAVAILABLE_COUNT = new AtomicInteger();
static final AtomicInteger OTHER_COUNT = new AtomicInteger();
- static final List<Thread> THREADS = new ArrayList<>();
+ static final List<Future<Void>> FUTURES = new ArrayList<>();
+ static final ExecutorService executor = Executors.newCachedThreadPool();
@Before
public void setUp() {
@@ -73,31 +75,106 @@
@Test
public void testOverloadingServer() throws Exception {
WebManager webMgr = new WebManager();
+ int numExecutors = 16;
+ int serverQueueSize = 16;
+ int numRequests = 48;
HttpServer server =
- new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, NUM_EXECUTOR_THREADS, SERVER_QUEUE_SIZE);
- SlowServlet servlet = new SlowServlet(server.ctx(), new String[] { PATH });
+ new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
+ SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
server.addServlet(servlet);
webMgr.add(server);
webMgr.start();
+ int expectedSuccess = numExecutors + serverQueueSize;
+ int expectedUnavailable = numRequests - expectedSuccess;
try {
- request(NUM_OF_REQUESTS);
- for (Thread thread : THREADS) {
- thread.join();
+ request(expectedSuccess);
+ waitTillQueued(server, serverQueueSize);
+ ArrayList<Future<Void>> successSet = started();
+ request(expectedUnavailable);
+ ArrayList<Future<Void>> rejectedSet = started();
+ for (Future<Void> f : rejectedSet) {
+ f.get();
}
- Assert.assertEquals(32, SUCCESS_COUNT.get());
- Assert.assertEquals(16, UNAVAILABLE_COUNT.get());
+ servlet.wakeUp();
+ for (Future<Void> f : successSet) {
+ f.get();
+ }
+ Assert.assertEquals(expectedSuccess, SUCCESS_COUNT.get());
+ Assert.assertEquals(expectedUnavailable, UNAVAILABLE_COUNT.get());
Assert.assertEquals(0, OTHER_COUNT.get());
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
} finally {
webMgr.stop();
}
}
+ private void waitTillQueued(HttpServer server, int expectedQueued) throws Exception {
+ int maxAttempts = 5;
+ int attempt = 0;
+ int queued = server.getWorkQueueSize();
+ while (queued != expectedQueued) {
+ attempt++;
+ if (attempt > maxAttempts) {
+ throw new Exception("Number of queued requests (" + queued + ") didn't match the expected number ("
+ + expectedQueued + ")");
+ }
+ Thread.sleep(1000); // NOSONAR polling is the clean way
+ queued = server.getWorkQueueSize();
+ }
+ }
+
+ @Test
+ public void testReleaseRejectedRequest() throws Exception {
+ WebManager webMgr = new WebManager();
+ int numRequests = 64;
+ int numExecutors = 2;
+ int serverQueueSize = 2;
+ int numPatches = 60;
+ HttpServer server =
+ new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
+ SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
+ server.addServlet(servlet);
+ webMgr.add(server);
+ webMgr.start();
+ request(numExecutors + serverQueueSize);
+ ArrayList<Future<Void>> stuck = started();
+ waitTillQueued(server, serverQueueSize);
+ try {
+ try {
+ for (int i = 0; i < numPatches; i++) {
+ ChattyServlet.printMemUsage();
+ request(numRequests);
+ for (Future<Void> f : FUTURES) {
+ f.get();
+ }
+ FUTURES.clear();
+ }
+ } finally {
+ ChattyServlet.printMemUsage();
+ servlet.wakeUp();
+ for (Future<Void> f : stuck) {
+ f.get();
+ }
+ }
+ } finally {
+ webMgr.stop();
+ }
+ }
+
+ private ArrayList<Future<Void>> started() {
+ ArrayList<Future<Void>> started = new ArrayList<>(FUTURES);
+ FUTURES.clear();
+ return started;
+ }
+
@Test
public void testChattyServer() throws Exception {
- ChattyServlet.printMemUsage();
int numRequests = 64;
int numExecutors = 32;
int serverQueueSize = 32;
+ ChattyServlet.printMemUsage();
WebManager webMgr = new WebManager();
HttpServer server =
new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
@@ -107,8 +184,8 @@
webMgr.start();
try {
request(numRequests);
- for (Thread thread : THREADS) {
- thread.join();
+ for (Future<Void> thread : FUTURES) {
+ thread.get();
}
Assert.assertEquals(numRequests, SUCCESS_COUNT.get());
Assert.assertEquals(0, UNAVAILABLE_COUNT.get());
@@ -120,10 +197,12 @@
@Test
public void testMalformedString() throws Exception {
+ int numExecutors = 16;
+ int serverQueueSize = 16;
WebManager webMgr = new WebManager();
HttpServer server =
- new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, NUM_EXECUTOR_THREADS, SERVER_QUEUE_SIZE);
- SlowServlet servlet = new SlowServlet(server.ctx(), new String[] { PATH });
+ new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
+ SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
server.addServlet(servlet);
webMgr.add(server);
webMgr.start();
@@ -160,9 +239,9 @@
private void request(int count) {
for (int i = 0; i < count; i++) {
- Thread next = new Thread(() -> {
+ Future<Void> next = executor.submit(() -> {
try {
- HttpUriRequest request = request(null);
+ HttpUriRequest request = post(null);
HttpResponse response = executeHttpRequest(request);
if (response.getStatusLine().getStatusCode() == HttpResponseStatus.OK.code()) {
SUCCESS_COUNT.incrementAndGet();
@@ -183,10 +262,11 @@
IOUtils.closeQuietly(in);
} catch (Throwable th) {
th.printStackTrace();
+ throw th;
}
+ return null;
});
- THREADS.add(next);
- next.start();
+ FUTURES.add(next);
}
}
@@ -200,10 +280,26 @@
}
}
- protected HttpUriRequest request(String query) throws URISyntaxException {
+ protected HttpUriRequest get(String query) throws URISyntaxException {
URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null);
RequestBuilder builder = RequestBuilder.get(uri);
builder.setCharset(StandardCharsets.UTF_8);
return builder.build();
}
+
+ protected HttpUriRequest post(String query) throws URISyntaxException {
+ URI uri = new URI(PROTOCOL, null, HOST, PORT, PATH, query, null);
+ RequestBuilder builder = RequestBuilder.post(uri);
+ StringBuilder str = new StringBuilder();
+ for (int i = 0; i < 32; i++) {
+ str.append("This is a string statement that will be ignored");
+ str.append('\n');
+ }
+ String statement = str.toString();
+ builder.setHeader("Content-type", "application/x-www-form-urlencoded");
+ builder.addParameter("statement", statement);
+ builder.setEntity(new StringEntity(statement, StandardCharsets.UTF_8));
+ builder.setCharset(StandardCharsets.UTF_8);
+ return builder.build();
+ }
}