[ASTERIXDB-2033][OTH] restrict chunk size in http server
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- This change ensures no large chunks are written to
direct buffers in the http server.
Change-Id: I08bac47ea28f66502b99df6fb8ff91dd85566d38
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1935
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/pom.xml b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
index 835cf61..6439adb 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
@@ -16,7 +16,8 @@
! specific language governing permissions and limitations
! under the License.
!-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hyracks</groupId>
@@ -27,6 +28,17 @@
<properties>
<root.dir>${basedir}/../..</root.dir>
</properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <argLine>-XX:MaxDirectMemorySize=16M</argLine>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
index 4a4c25a..30d599d 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
@@ -88,6 +88,12 @@
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Unhandled exception", e);
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ } catch (Throwable th) { //NOSONAR Just logging and then throwing again
+ try {
+ LOGGER.log(Level.WARNING, "Unhandled throwable", th);
+ } catch (Throwable loggingFailure) {// NOSONAR... swallow logging failure
+ }
+ throw th;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index 3456343..0066b77 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -27,6 +27,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.util.internal.OutOfDirectMemoryError;
public class ChunkedNettyOutputStream extends OutputStream {
@@ -50,23 +51,25 @@
@Override
public void write(byte[] b, int off, int len) throws IOException {
- if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return;
- }
- if (len > buffer.capacity()) {
- flush();
- flush(b, off, len);
- } else {
- int space = buffer.writableBytes();
- if (space >= len) {
- buffer.writeBytes(b, off, len);
- } else {
- buffer.writeBytes(b, off, space);
- flush();
- buffer.writeBytes(b, off + space, len - space);
+ try {
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)) {
+ throw new IndexOutOfBoundsException();
}
+ while (len > 0) {
+ int space = buffer.writableBytes();
+ if (space >= len) {
+ buffer.writeBytes(b, off, len);
+ len = 0; // NOSONAR
+ } else {
+ buffer.writeBytes(b, off, space);
+ len -= space; // NOSONAR
+ off += space; // NOSONAR
+ flush();
+ }
+ }
+ } catch (OutOfDirectMemoryError error) {// NOSONAR
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ throw error;
}
}
@@ -114,18 +117,6 @@
}
}
- private void flush(byte[] buf, int offset, int len) throws IOException {
- ensureWritable();
- ByteBuf aBuffer = ctx.alloc().buffer(len);
- aBuffer.writeBytes(buf, offset, len);
- if (response.status() == HttpResponseStatus.OK) {
- response.beforeFlush();
- ctx.write(new DefaultHttpContent(aBuffer), ctx.channel().voidPromise());
- } else {
- response.error(aBuffer);
- }
- }
-
private synchronized void ensureWritable() throws IOException {
while (!ctx.channel().isWritable()) {
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
new file mode 100644
index 0000000..30df003
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/servlet/ChattyServlet.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http.servlet;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class ChattyServlet extends AbstractServlet {
+ private static final Logger LOGGER = Logger.getLogger(ChattyServlet.class.getName());
+ private byte[] bytes;
+
+ public ChattyServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
+ super(ctx, paths);
+ String line = "I don't know when to stop talking\n";
+ StringBuilder responseBuilder = new StringBuilder();
+ for (int i = 0; i < 100000; i++) {
+ responseBuilder.append(line);
+ }
+ String responseString = responseBuilder.toString();
+ bytes = responseString.getBytes();
+ }
+
+ @Override
+ protected void get(IServletRequest request, IServletResponse response) throws Exception {
+ response.setStatus(HttpResponseStatus.OK);
+ HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, HttpUtil.Encoding.UTF8);
+ LOGGER.log(Level.WARNING, "I am about to flood you... and a single buffer is " + bytes.length + " bytes");
+ for (int i = 0; i < 100; i++) {
+ response.outputStream().write(bytes);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 2076201..854980e 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -41,13 +41,16 @@
import org.apache.http.impl.client.StandardHttpRequestRetryHandler;
import org.apache.hyracks.http.server.HttpServer;
import org.apache.hyracks.http.server.WebManager;
+import org.apache.hyracks.http.servlet.ChattyServlet;
import org.apache.hyracks.http.servlet.SlowServlet;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import io.netty.handler.codec.http.HttpResponseStatus;
public class HttpServerTest {
+ static final boolean PRINT_TO_CONSOLE = false;
static final int PORT = 9898;
static final int NUM_EXECUTOR_THREADS = 16;
static final int SERVER_QUEUE_SIZE = 16;
@@ -60,6 +63,13 @@
static final AtomicInteger OTHER_COUNT = new AtomicInteger();
static final List<Thread> THREADS = new ArrayList<>();
+ @Before
+ public void setUp() {
+ SUCCESS_COUNT.set(0);
+ UNAVAILABLE_COUNT.set(0);
+ OTHER_COUNT.set(0);
+ }
+
@Test
public void testOverloadingServer() throws Exception {
WebManager webMgr = new WebManager();
@@ -83,6 +93,31 @@
}
@Test
+ public void testChattyServer() throws Exception {
+ int numRequests = 64;
+ int numExecutors = 32;
+ int serverQueueSize = 32;
+ WebManager webMgr = new WebManager();
+ HttpServer server =
+ new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
+ ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { PATH });
+ server.addServlet(servlet);
+ webMgr.add(server);
+ webMgr.start();
+ try {
+ request(numRequests);
+ for (Thread thread : THREADS) {
+ thread.join();
+ }
+ Assert.assertEquals(numRequests, SUCCESS_COUNT.get());
+ Assert.assertEquals(0, UNAVAILABLE_COUNT.get());
+ Assert.assertEquals(0, OTHER_COUNT.get());
+ } finally {
+ webMgr.stop();
+ }
+ }
+
+ @Test
public void testMalformedString() throws Exception {
WebManager webMgr = new WebManager();
HttpServer server =
@@ -136,8 +171,15 @@
} else {
OTHER_COUNT.incrementAndGet();
}
- InputStream responseStream = response.getEntity().getContent();
- IOUtils.closeQuietly(responseStream);
+ InputStream in = response.getEntity().getContent();
+ if (PRINT_TO_CONSOLE) {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ System.out.println(line);
+ }
+ }
+ IOUtils.closeQuietly(in);
} catch (Throwable th) {
th.printStackTrace();
}