[ASTERIXDB-2475][OTH] Reject HTTP Pipelined Requests
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- If a client sends multiple requests on the same connection
before reading the response of each request (i.e. pipelined
requests), the request will be rejected and the connection
will be closed.
- Add test case.
- Fix typo in method name.
Change-Id: I67c370d4d37a3e267b30e13333714605b07b7515
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3021
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/pom.xml b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
index 7b1dc63..5d56c5e 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
@@ -62,6 +62,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore-nio</artifactId>
+ <version>4.4.10</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index adea133..4f7133a 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -90,7 +90,7 @@
}
}
} else {
- response.fullReponse(buffer);
+ response.fullResponse(buffer);
}
super.close();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index b3a7587..0aadb1e 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -31,12 +31,14 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
@@ -63,13 +65,16 @@
private final ChannelHandlerContext ctx;
private final ChunkedNettyOutputStream outputStream;
private final PrintWriter writer;
+ private final HttpServerHandler<?> handler;
private DefaultHttpResponse response;
private boolean headerSent;
private ByteBuf error;
private ChannelFuture future;
private boolean done;
- public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request, int chunkSize) {
+ public ChunkedResponse(HttpServerHandler<?> handler, ChannelHandlerContext ctx, FullHttpRequest request,
+ int chunkSize) {
+ this.handler = handler;
this.ctx = ctx;
outputStream = new ChunkedNettyOutputStream(ctx, chunkSize, this);
writer = new PrintWriter(outputStream);
@@ -102,7 +107,7 @@
writer.close();
if (error == null && response.status() == HttpResponseStatus.OK) {
if (!done) {
- future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ respond(LastHttpContent.EMPTY_LAST_CONTENT);
}
} else {
// There was an error
@@ -111,7 +116,7 @@
if (error != null) {
error.release();
}
- future = ctx.channel().close();
+ future = ctx.channel().close().addListener(handler);
} else {
// we didn't send anything to the user, we need to send an non-chunked error response
fullResponse(response.protocolVersion(), response.status(),
@@ -155,7 +160,7 @@
return headerSent;
}
- public void fullReponse(ByteBuf buffer) {
+ public void fullResponse(ByteBuf buffer) {
fullResponse(response.protocolVersion(), response.status(), buffer, response.headers());
}
@@ -165,7 +170,7 @@
// for a full response remove chunked transfer-encoding and set the content length instead
fullResponse.headers().remove(HttpHeaderNames.TRANSFER_ENCODING);
fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buffer.readableBytes());
- future = ctx.writeAndFlush(fullResponse);
+ respond(fullResponse);
headerSent = true;
done = true;
}
@@ -184,4 +189,10 @@
public void cancel() {
outputStream.cancel();
}
+
+ private void respond(HttpObject response) {
+ final ChannelPromise responseCompletionPromise = ctx.newPromise();
+ responseCompletionPromise.addListener(handler);
+ future = ctx.writeAndFlush(response, responseCompletionPromise);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
index 55bbd30..2b1e8b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
@@ -29,6 +29,7 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
@@ -41,9 +42,11 @@
private final ByteArrayOutputStream baos;
private final PrintWriter writer;
private final DefaultFullHttpResponse response;
+ private final HttpServerHandler<?> handler;
private ChannelFuture future;
- public FullResponse(ChannelHandlerContext ctx, FullHttpRequest request) {
+ public FullResponse(HttpServerHandler<?> handler, ChannelHandlerContext ctx, FullHttpRequest request) {
+ this.handler = handler;
this.ctx = ctx;
baos = new ByteArrayOutputStream();
writer = new PrintWriter(baos);
@@ -56,7 +59,9 @@
writer.close();
FullHttpResponse fullResponse = response.replace(Unpooled.copiedBuffer(baos.toByteArray()));
fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, fullResponse.content().readableBytes());
- future = ctx.writeAndFlush(fullResponse);
+ final ChannelPromise responseCompletionPromise = ctx.newPromise();
+ responseCompletionPromise.addListener(handler);
+ future = ctx.writeAndFlush(fullResponse, responseCompletionPromise);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
index 1c0801c..652be7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
@@ -40,15 +40,18 @@
private final IServlet servlet;
private final IServletRequest request;
private final IServletResponse response;
+ private final HttpServerHandler<?> handler;
private boolean started = false;
private boolean cancelled = false;
- public HttpRequestHandler(ChannelHandlerContext ctx, IServlet servlet, IServletRequest request, int chunkSize) {
+ public HttpRequestHandler(HttpServerHandler<?> handler, ChannelHandlerContext ctx, IServlet servlet,
+ IServletRequest request, int chunkSize) {
+ this.handler = handler;
this.ctx = ctx;
this.servlet = servlet;
this.request = request;
- response = chunkSize == 0 ? new FullResponse(ctx, request.getHttpRequest())
- : new ChunkedResponse(ctx, request.getHttpRequest(), chunkSize);
+ response = chunkSize == 0 ? new FullResponse(handler, ctx, request.getHttpRequest())
+ : new ChunkedResponse(handler, ctx, request.getHttpRequest(), chunkSize);
request.getHttpRequest().retain();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 36d79f3..e3a0d4b 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -33,23 +33,28 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
-public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboundHandler<Object> {
+public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboundHandler<Object>
+ implements ChannelFutureListener {
private static final Logger LOGGER = LogManager.getLogger();
+ private static final String PIPELINED_REQUEST_ERROR_MSG = "Server doesn't support pipelined requests";
protected final T server;
- protected final int chunkSize;
- protected HttpRequestHandler handler;
- protected IChannelClosedHandler closeHandler;
- protected Future<Void> task;
- protected IServlet servlet;
+ protected volatile HttpRequestHandler handler;
+ protected volatile Future<Void> task;
+ protected volatile IServlet servlet;
+ private volatile IChannelClosedHandler closeHandler;
+ private volatile boolean pipelinedRequest = false;
+ private final int chunkSize;
public HttpServerHandler(T server, int chunkSize) {
this.server = server;
@@ -63,19 +68,24 @@
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
- if (ctx.channel().isWritable()) {
- handler.notifyChannelWritable();
+ final HttpRequestHandler currentHandler = handler;
+ if (currentHandler != null && ctx.channel().isWritable()) {
+ currentHandler.notifyChannelWritable();
}
super.channelWritabilityChanged(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- if (handler != null) {
- handler.notifyChannelInactive();
+ final HttpRequestHandler currentHandler = handler;
+ if (currentHandler != null) {
+ currentHandler.notifyChannelInactive();
}
- if (closeHandler != null) {
- closeHandler.channelClosed(server, servlet, task);
+ final IChannelClosedHandler currentCloseHandler = closeHandler;
+ final IServlet currentServlet = servlet;
+ final Future<Void> currentTask = task;
+ if (currentCloseHandler != null && currentServlet != null && currentTask != null) {
+ currentCloseHandler.channelClosed(server, currentServlet, currentTask);
}
super.channelInactive(ctx);
}
@@ -83,9 +93,15 @@
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
FullHttpRequest request = (FullHttpRequest) msg;
- handler = null;
- task = null;
- closeHandler = null;
+ if (isPipelinedRequest()) {
+ pipelinedRequest = true;
+ rejectPipelinedRequestAndClose(ctx, request);
+ return;
+ }
+ if (request.decoderResult().isFailure()) {
+ respond(ctx, request, HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
try {
servlet = server.getServlet(request);
if (servlet == null) {
@@ -94,7 +110,7 @@
submit(ctx, servlet, request);
}
} catch (Exception e) {
- LOGGER.log(Level.WARN, "Failure Submitting HTTP Request", e);
+ LOGGER.log(Level.WARN, "Failure handling HTTP request", e);
respond(ctx, request, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
@@ -103,7 +119,9 @@
final DefaultHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), status);
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, 0);
HttpUtil.setConnectionHeader(request, response);
- final ChannelFuture clientChannel = ctx.writeAndFlush(response);
+ final ChannelPromise responseCompletionPromise = ctx.newPromise();
+ responseCompletionPromise.addListener(this);
+ final ChannelFuture clientChannel = ctx.writeAndFlush(response, responseCompletionPromise);
if (!io.netty.handler.codec.http.HttpUtil.isKeepAlive(request)) {
clientChannel.addListener(ChannelFutureListener.CLOSE);
}
@@ -118,7 +136,7 @@
respond(ctx, request, HttpResponseStatus.BAD_REQUEST);
return;
}
- handler = new HttpRequestHandler(ctx, servlet, servletRequest, chunkSize);
+ handler = new HttpRequestHandler(this, ctx, servlet, servletRequest, chunkSize);
submit(servlet);
}
@@ -144,4 +162,29 @@
LOGGER.log(Level.WARN, "Failure handling HTTP Request", cause);
ctx.close();
}
-}
+
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (!pipelinedRequest) {
+ requestHandled();
+ }
+ }
+
+ private boolean isPipelinedRequest() {
+ return handler != null || servlet != null || closeHandler != null || task != null;
+ }
+
+ private void rejectPipelinedRequestAndClose(ChannelHandlerContext ctx, FullHttpRequest request) {
+ LOGGER.warn(PIPELINED_REQUEST_ERROR_MSG);
+ request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
+ respond(ctx, request,
+ new HttpResponseStatus(HttpResponseStatus.BAD_REQUEST.code(), PIPELINED_REQUEST_ERROR_MSG));
+ }
+
+ private void requestHandled() {
+ handler = null;
+ servlet = null;
+ task = null;
+ closeHandler = null;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java
new file mode 100644
index 0000000..0c96eb6
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.config.ConnectionConfig;
+import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
+import org.apache.http.impl.nio.pool.BasicNIOConnPool;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.message.BasicHttpRequest;
+import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
+import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
+import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
+import org.apache.http.nio.protocol.HttpAsyncRequester;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.protocol.HttpCoreContext;
+import org.apache.http.protocol.HttpProcessor;
+import org.apache.http.protocol.HttpProcessorBuilder;
+import org.apache.http.protocol.RequestConnControl;
+import org.apache.http.protocol.RequestContent;
+import org.apache.http.protocol.RequestExpectContinue;
+import org.apache.http.protocol.RequestTargetHost;
+import org.apache.hyracks.http.server.HttpServer;
+import org.apache.hyracks.http.server.HttpServerConfig;
+import org.apache.hyracks.http.server.HttpServerConfigBuilder;
+import org.apache.hyracks.http.server.InterruptOnCloseHandler;
+import org.apache.hyracks.http.server.WebManager;
+import org.apache.hyracks.http.servlet.SleepyServlet;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PipelinedRequestsTest {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final int PORT = 9898;
+ private static final String PATH = "/";
+
+ @Test
+ public void pipelinedRequests() throws Exception {
+ setupServer();
+ final HttpHost target = new HttpHost("localhost", PORT);
+ final List<BasicAsyncRequestProducer> requestProducers =
+ Arrays.asList(new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", PATH)),
+ new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", PATH)));
+ final List<BasicAsyncResponseConsumer> responseConsumers =
+ Arrays.asList(new BasicAsyncResponseConsumer(), new BasicAsyncResponseConsumer());
+ final List<HttpResponse> httpResponses = executePipelined(target, requestProducers, responseConsumers);
+ for (HttpResponse response : httpResponses) {
+ Assert.assertNotEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
+ }
+ }
+
+ private void setupServer() throws Exception {
+ final WebManager webMgr = new WebManager();
+ final HttpServerConfig config =
+ HttpServerConfigBuilder.custom().setThreadCount(16).setRequestQueueSize(16).build();
+ final HttpServer server =
+ new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, config, InterruptOnCloseHandler.INSTANCE);
+ final SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
+ server.addServlet(servlet);
+ webMgr.add(server);
+ webMgr.start();
+ }
+
+ private List<HttpResponse> executePipelined(HttpHost host, List<BasicAsyncRequestProducer> requestProducers,
+ List<BasicAsyncResponseConsumer> responseConsumers) throws Exception {
+ final List<HttpResponse> results = new ArrayList<>();
+ final HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor();
+ final IOEventDispatch ioEventDispatch =
+ new DefaultHttpClientIODispatch(protocolHandler, ConnectionConfig.DEFAULT);
+ final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
+ final BasicNIOConnPool pool = new BasicNIOConnPool(ioReactor, ConnectionConfig.DEFAULT);
+ pool.setDefaultMaxPerRoute(1);
+ pool.setMaxTotal(1);
+ final Thread reactorThread = new Thread(() -> {
+ try {
+ ioReactor.execute(ioEventDispatch);
+ } catch (final IOException e) {
+ LOGGER.error(e);
+ }
+ });
+ reactorThread.start();
+ final HttpCoreContext context = HttpCoreContext.create();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final HttpProcessor httpProc =
+ HttpProcessorBuilder.create().add(new RequestContent()).add(new RequestTargetHost())
+ .add(new RequestConnControl()).add(new RequestExpectContinue(true)).build();
+ final HttpAsyncRequester requester = new HttpAsyncRequester(httpProc);
+ requester.executePipelined(host, requestProducers, responseConsumers, pool, context,
+ new FutureCallback<List<HttpResponse>>() {
+ @Override
+ public void completed(final List<HttpResponse> result) {
+ results.addAll(result);
+ latch.countDown();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ latch.countDown();
+ }
+
+ @Override
+ public void cancelled() {
+ latch.countDown();
+ }
+ });
+ latch.await(5, TimeUnit.SECONDS);
+ ioReactor.shutdown();
+ return results;
+ }
+}