[ASTERIXDB-2475][OTH] Reject HTTP Pipelined Requests

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- If a client sends multiple requests on the same connection
  before reading the response of each request (i.e. pipelined
  requests), the request will be rejected and the connection
  will be closed.
- Add test case.
- Fix typo in method name.

Change-Id: I67c370d4d37a3e267b30e13333714605b07b7515
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3021
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/pom.xml b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
index 7b1dc63..5d56c5e 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
@@ -62,6 +62,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpcore-nio</artifactId>
+      <version>4.4.10</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
index adea133..4f7133a 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedNettyOutputStream.java
@@ -90,7 +90,7 @@
                     }
                 }
             } else {
-                response.fullReponse(buffer);
+                response.fullResponse(buffer);
             }
             super.close();
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index b3a7587..0aadb1e 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -31,12 +31,14 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.DefaultHttpResponse;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpHeaderValues;
 import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpObject;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.LastHttpContent;
@@ -63,13 +65,16 @@
     private final ChannelHandlerContext ctx;
     private final ChunkedNettyOutputStream outputStream;
     private final PrintWriter writer;
+    private final HttpServerHandler<?> handler;
     private DefaultHttpResponse response;
     private boolean headerSent;
     private ByteBuf error;
     private ChannelFuture future;
     private boolean done;
 
-    public ChunkedResponse(ChannelHandlerContext ctx, FullHttpRequest request, int chunkSize) {
+    public ChunkedResponse(HttpServerHandler<?> handler, ChannelHandlerContext ctx, FullHttpRequest request,
+            int chunkSize) {
+        this.handler = handler;
         this.ctx = ctx;
         outputStream = new ChunkedNettyOutputStream(ctx, chunkSize, this);
         writer = new PrintWriter(outputStream);
@@ -102,7 +107,7 @@
         writer.close();
         if (error == null && response.status() == HttpResponseStatus.OK) {
             if (!done) {
-                future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+                respond(LastHttpContent.EMPTY_LAST_CONTENT);
             }
         } else {
             // There was an error
@@ -111,7 +116,7 @@
                 if (error != null) {
                     error.release();
                 }
-                future = ctx.channel().close();
+                future = ctx.channel().close().addListener(handler);
             } else {
                 // we didn't send anything to the user, we need to send an non-chunked error response
                 fullResponse(response.protocolVersion(), response.status(),
@@ -155,7 +160,7 @@
         return headerSent;
     }
 
-    public void fullReponse(ByteBuf buffer) {
+    public void fullResponse(ByteBuf buffer) {
         fullResponse(response.protocolVersion(), response.status(), buffer, response.headers());
     }
 
@@ -165,7 +170,7 @@
         // for a full response remove chunked transfer-encoding and set the content length instead
         fullResponse.headers().remove(HttpHeaderNames.TRANSFER_ENCODING);
         fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buffer.readableBytes());
-        future = ctx.writeAndFlush(fullResponse);
+        respond(fullResponse);
         headerSent = true;
         done = true;
     }
@@ -184,4 +189,10 @@
     public void cancel() {
         outputStream.cancel();
     }
+
+    private void respond(HttpObject response) {
+        final ChannelPromise responseCompletionPromise = ctx.newPromise();
+        responseCompletionPromise.addListener(handler);
+        future = ctx.writeAndFlush(response, responseCompletionPromise);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
index 55bbd30..2b1e8b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/FullResponse.java
@@ -29,6 +29,7 @@
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
@@ -41,9 +42,11 @@
     private final ByteArrayOutputStream baos;
     private final PrintWriter writer;
     private final DefaultFullHttpResponse response;
+    private final HttpServerHandler<?> handler;
     private ChannelFuture future;
 
-    public FullResponse(ChannelHandlerContext ctx, FullHttpRequest request) {
+    public FullResponse(HttpServerHandler<?> handler, ChannelHandlerContext ctx, FullHttpRequest request) {
+        this.handler = handler;
         this.ctx = ctx;
         baos = new ByteArrayOutputStream();
         writer = new PrintWriter(baos);
@@ -56,7 +59,9 @@
         writer.close();
         FullHttpResponse fullResponse = response.replace(Unpooled.copiedBuffer(baos.toByteArray()));
         fullResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, fullResponse.content().readableBytes());
-        future = ctx.writeAndFlush(fullResponse);
+        final ChannelPromise responseCompletionPromise = ctx.newPromise();
+        responseCompletionPromise.addListener(handler);
+        future = ctx.writeAndFlush(fullResponse, responseCompletionPromise);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
index 1c0801c..652be7c 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpRequestHandler.java
@@ -40,15 +40,18 @@
     private final IServlet servlet;
     private final IServletRequest request;
     private final IServletResponse response;
+    private final HttpServerHandler<?> handler;
     private boolean started = false;
     private boolean cancelled = false;
 
-    public HttpRequestHandler(ChannelHandlerContext ctx, IServlet servlet, IServletRequest request, int chunkSize) {
+    public HttpRequestHandler(HttpServerHandler<?> handler, ChannelHandlerContext ctx, IServlet servlet,
+            IServletRequest request, int chunkSize) {
+        this.handler = handler;
         this.ctx = ctx;
         this.servlet = servlet;
         this.request = request;
-        response = chunkSize == 0 ? new FullResponse(ctx, request.getHttpRequest())
-                : new ChunkedResponse(ctx, request.getHttpRequest(), chunkSize);
+        response = chunkSize == 0 ? new FullResponse(handler, ctx, request.getHttpRequest())
+                : new ChunkedResponse(handler, ctx, request.getHttpRequest(), chunkSize);
         request.getHttpRequest().retain();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 36d79f3..e3a0d4b 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -33,23 +33,28 @@
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.DefaultHttpResponse;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponseStatus;
 
-public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboundHandler<Object> {
+public class HttpServerHandler<T extends HttpServer> extends SimpleChannelInboundHandler<Object>
+        implements ChannelFutureListener {
 
     private static final Logger LOGGER = LogManager.getLogger();
+    private static final String PIPELINED_REQUEST_ERROR_MSG = "Server doesn't support pipelined requests";
     protected final T server;
-    protected final int chunkSize;
-    protected HttpRequestHandler handler;
-    protected IChannelClosedHandler closeHandler;
-    protected Future<Void> task;
-    protected IServlet servlet;
+    protected volatile HttpRequestHandler handler;
+    protected volatile Future<Void> task;
+    protected volatile IServlet servlet;
+    private volatile IChannelClosedHandler closeHandler;
+    private volatile boolean pipelinedRequest = false;
+    private final int chunkSize;
 
     public HttpServerHandler(T server, int chunkSize) {
         this.server = server;
@@ -63,19 +68,24 @@
 
     @Override
     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
-        if (ctx.channel().isWritable()) {
-            handler.notifyChannelWritable();
+        final HttpRequestHandler currentHandler = handler;
+        if (currentHandler != null && ctx.channel().isWritable()) {
+            currentHandler.notifyChannelWritable();
         }
         super.channelWritabilityChanged(ctx);
     }
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        if (handler != null) {
-            handler.notifyChannelInactive();
+        final HttpRequestHandler currentHandler = handler;
+        if (currentHandler != null) {
+            currentHandler.notifyChannelInactive();
         }
-        if (closeHandler != null) {
-            closeHandler.channelClosed(server, servlet, task);
+        final IChannelClosedHandler currentCloseHandler = closeHandler;
+        final IServlet currentServlet = servlet;
+        final Future<Void> currentTask = task;
+        if (currentCloseHandler != null && currentServlet != null && currentTask != null) {
+            currentCloseHandler.channelClosed(server, currentServlet, currentTask);
         }
         super.channelInactive(ctx);
     }
@@ -83,9 +93,15 @@
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
         FullHttpRequest request = (FullHttpRequest) msg;
-        handler = null;
-        task = null;
-        closeHandler = null;
+        if (isPipelinedRequest()) {
+            pipelinedRequest = true;
+            rejectPipelinedRequestAndClose(ctx, request);
+            return;
+        }
+        if (request.decoderResult().isFailure()) {
+            respond(ctx, request, HttpResponseStatus.BAD_REQUEST);
+            return;
+        }
         try {
             servlet = server.getServlet(request);
             if (servlet == null) {
@@ -94,7 +110,7 @@
                 submit(ctx, servlet, request);
             }
         } catch (Exception e) {
-            LOGGER.log(Level.WARN, "Failure Submitting HTTP Request", e);
+            LOGGER.log(Level.WARN, "Failure handling HTTP request", e);
             respond(ctx, request, HttpResponseStatus.INTERNAL_SERVER_ERROR);
         }
     }
@@ -103,7 +119,9 @@
         final DefaultHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(), status);
         response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, 0);
         HttpUtil.setConnectionHeader(request, response);
-        final ChannelFuture clientChannel = ctx.writeAndFlush(response);
+        final ChannelPromise responseCompletionPromise = ctx.newPromise();
+        responseCompletionPromise.addListener(this);
+        final ChannelFuture clientChannel = ctx.writeAndFlush(response, responseCompletionPromise);
         if (!io.netty.handler.codec.http.HttpUtil.isKeepAlive(request)) {
             clientChannel.addListener(ChannelFutureListener.CLOSE);
         }
@@ -118,7 +136,7 @@
             respond(ctx, request, HttpResponseStatus.BAD_REQUEST);
             return;
         }
-        handler = new HttpRequestHandler(ctx, servlet, servletRequest, chunkSize);
+        handler = new HttpRequestHandler(this, ctx, servlet, servletRequest, chunkSize);
         submit(servlet);
     }
 
@@ -144,4 +162,29 @@
         LOGGER.log(Level.WARN, "Failure handling HTTP Request", cause);
         ctx.close();
     }
-}
+
+    @Override
+    public void operationComplete(ChannelFuture future) {
+        if (!pipelinedRequest) {
+            requestHandled();
+        }
+    }
+
+    private boolean isPipelinedRequest() {
+        return handler != null || servlet != null || closeHandler != null || task != null;
+    }
+
+    private void rejectPipelinedRequestAndClose(ChannelHandlerContext ctx, FullHttpRequest request) {
+        LOGGER.warn(PIPELINED_REQUEST_ERROR_MSG);
+        request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
+        respond(ctx, request,
+                new HttpResponseStatus(HttpResponseStatus.BAD_REQUEST.code(), PIPELINED_REQUEST_ERROR_MSG));
+    }
+
+    private void requestHandled() {
+        handler = null;
+        servlet = null;
+        task = null;
+        closeHandler = null;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java
new file mode 100644
index 0000000..0c96eb6
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/PipelinedRequestsTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.http;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.config.ConnectionConfig;
+import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
+import org.apache.http.impl.nio.pool.BasicNIOConnPool;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.message.BasicHttpRequest;
+import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
+import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
+import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
+import org.apache.http.nio.protocol.HttpAsyncRequester;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.protocol.HttpCoreContext;
+import org.apache.http.protocol.HttpProcessor;
+import org.apache.http.protocol.HttpProcessorBuilder;
+import org.apache.http.protocol.RequestConnControl;
+import org.apache.http.protocol.RequestContent;
+import org.apache.http.protocol.RequestExpectContinue;
+import org.apache.http.protocol.RequestTargetHost;
+import org.apache.hyracks.http.server.HttpServer;
+import org.apache.hyracks.http.server.HttpServerConfig;
+import org.apache.hyracks.http.server.HttpServerConfigBuilder;
+import org.apache.hyracks.http.server.InterruptOnCloseHandler;
+import org.apache.hyracks.http.server.WebManager;
+import org.apache.hyracks.http.servlet.SleepyServlet;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PipelinedRequestsTest {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final int PORT = 9898;
+    private static final String PATH = "/";
+
+    @Test
+    public void pipelinedRequests() throws Exception {
+        setupServer();
+        final HttpHost target = new HttpHost("localhost", PORT);
+        final List<BasicAsyncRequestProducer> requestProducers =
+                Arrays.asList(new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", PATH)),
+                        new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", PATH)));
+        final List<BasicAsyncResponseConsumer> responseConsumers =
+                Arrays.asList(new BasicAsyncResponseConsumer(), new BasicAsyncResponseConsumer());
+        final List<HttpResponse> httpResponses = executePipelined(target, requestProducers, responseConsumers);
+        for (HttpResponse response : httpResponses) {
+            Assert.assertNotEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
+        }
+    }
+
+    private void setupServer() throws Exception {
+        final WebManager webMgr = new WebManager();
+        final HttpServerConfig config =
+                HttpServerConfigBuilder.custom().setThreadCount(16).setRequestQueueSize(16).build();
+        final HttpServer server =
+                new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, config, InterruptOnCloseHandler.INSTANCE);
+        final SleepyServlet servlet = new SleepyServlet(server.ctx(), new String[] { PATH });
+        server.addServlet(servlet);
+        webMgr.add(server);
+        webMgr.start();
+    }
+
+    private List<HttpResponse> executePipelined(HttpHost host, List<BasicAsyncRequestProducer> requestProducers,
+            List<BasicAsyncResponseConsumer> responseConsumers) throws Exception {
+        final List<HttpResponse> results = new ArrayList<>();
+        final HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor();
+        final IOEventDispatch ioEventDispatch =
+                new DefaultHttpClientIODispatch(protocolHandler, ConnectionConfig.DEFAULT);
+        final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
+        final BasicNIOConnPool pool = new BasicNIOConnPool(ioReactor, ConnectionConfig.DEFAULT);
+        pool.setDefaultMaxPerRoute(1);
+        pool.setMaxTotal(1);
+        final Thread reactorThread = new Thread(() -> {
+            try {
+                ioReactor.execute(ioEventDispatch);
+            } catch (final IOException e) {
+                LOGGER.error(e);
+            }
+        });
+        reactorThread.start();
+        final HttpCoreContext context = HttpCoreContext.create();
+        final CountDownLatch latch = new CountDownLatch(1);
+        final HttpProcessor httpProc =
+                HttpProcessorBuilder.create().add(new RequestContent()).add(new RequestTargetHost())
+                        .add(new RequestConnControl()).add(new RequestExpectContinue(true)).build();
+        final HttpAsyncRequester requester = new HttpAsyncRequester(httpProc);
+        requester.executePipelined(host, requestProducers, responseConsumers, pool, context,
+                new FutureCallback<List<HttpResponse>>() {
+                    @Override
+                    public void completed(final List<HttpResponse> result) {
+                        results.addAll(result);
+                        latch.countDown();
+                    }
+
+                    @Override
+                    public void failed(final Exception ex) {
+                        latch.countDown();
+                    }
+
+                    @Override
+                    public void cancelled() {
+                        latch.countDown();
+                    }
+                });
+        latch.await(5, TimeUnit.SECONDS);
+        ioReactor.shutdown();
+        return results;
+    }
+}