[NO ISSUE][OTH] Support multiple addresses in http servers
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Allow binding http servers to multiple addresses.
- Add test cases.
Change-Id: I68f25dc5af471c7ded29f27405c311947a007947
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9624
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 93762a6..97b8859 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -20,6 +20,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -39,6 +42,7 @@
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
@@ -50,6 +54,8 @@
import io.netty.handler.codec.http.HttpScheme;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
public class HttpServer {
// Constants
@@ -64,6 +70,7 @@
private static final int STARTING = 1;
private static final int STARTED = 2;
private static final int STOPPING = 3;
+ private static final int RECOVERING = 4;
// Final members
private final IChannelClosedHandler closedHandler;
private final Object lock = new Object();
@@ -73,38 +80,59 @@
private final ServletRegistry servlets;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
- private final InetSocketAddress address;
+ private final InetSocketAddress defaultAddress;
+ private final List<InetSocketAddress> addresses;
private final ThreadPoolExecutor executor;
// Mutable members
private volatile int state = STOPPED;
private volatile Thread recoveryThread;
- private volatile Channel channel;
+ private final List<Channel> channels;
private Throwable cause;
private HttpServerConfig config;
- public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, HttpServerConfig config) {
- this(bossGroup, workerGroup, new InetSocketAddress(port), config, null);
- }
+ private final GenericFutureListener<Future<Void>> channelCloseListener = f -> {
+ // This listener is invoked from within a netty IO thread. Hence, we can never block it
+ // For simplicity, we will submit the recovery task to a different thread. We will also
+ // close all channels on this server and attempt to rebind them.
+ synchronized (lock) {
+ if (state != STARTED) {
+ return;
+ }
+ LOGGER.log(Level.WARN, "{} has stopped unexpectedly. Starting server recovery", this);
+ MXHelper.logFileDescriptors();
+ state = RECOVERING;
+ triggerRecovery();
+ }
+ };
- public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, InetSocketAddress address,
- HttpServerConfig config) {
- this(bossGroup, workerGroup, address, config, null);
+ public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, HttpServerConfig config) {
+ this(bossGroup, workerGroup, Collections.singletonList(new InetSocketAddress(port)), config, null);
}
public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, InetSocketAddress address,
HttpServerConfig config, IChannelClosedHandler closeHandler) {
+ this(bossGroup, workerGroup, Collections.singletonList(address), config, closeHandler);
+ }
+
+ public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, List<InetSocketAddress> addresses,
+ HttpServerConfig config, IChannelClosedHandler closeHandler) {
+ if (addresses.isEmpty()) {
+ throw new IllegalArgumentException("no addresses specified");
+ }
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
- this.address = address;
+ this.addresses = addresses;
+ defaultAddress = addresses.get(0);
this.closedHandler = closeHandler;
this.config = config;
+ channels = new ArrayList<>();
ctx = new ConcurrentHashMap<>();
servlets = new ServletRegistry();
workQueue = new LinkedBlockingQueue<>(config.getRequestQueueSize());
int numExecutorThreads = config.getThreadCount();
executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue,
runnable -> new Thread(runnable,
- "HttpExecutor(port:" + address.getPort() + ")-" + threadId.getAndIncrement()));
+ "HttpExecutor(port:" + defaultAddress.getPort() + ")-" + threadId.getAndIncrement()));
long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK
+ numExecutorThreads * config.getMaxResponseChunkSize();
LOGGER.log(Level.DEBUG,
@@ -128,7 +156,7 @@
doStart();
setStarted();
} catch (Throwable e) { // NOSONAR
- LOGGER.error("Failure starting an Http Server at: {}", address, e);
+ LOGGER.error("Failure starting an Http Server at: {}", defaultAddress, e);
setFailed(e);
throw e;
}
@@ -175,6 +203,8 @@
return "STOPPING";
case STOPPED:
return "STOPPED";
+ case RECOVERING:
+ return "RECOVERING";
default:
return "UNKNOWN";
}
@@ -229,10 +259,10 @@
for (IServlet servlet : servlets.getServlets()) {
servlet.init();
}
- channel = bind();
+ bind();
}
- private Channel bind() throws InterruptedException {
+ private void bind() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
@@ -240,23 +270,20 @@
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK)
.handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(getChannelInitializer());
- Channel newChannel = b.bind(address).sync().channel();
- newChannel.closeFuture().addListener(f -> {
- // This listener is invoked from within a netty IO thread. Hence, we can never block it
- // For simplicity, we will submit the recovery task to a different thread
+ List<ChannelFuture> channelFutures = new ArrayList<>();
+ for (InetSocketAddress address : addresses) {
+ channelFutures.add(b.bind(address));
+ }
+ for (ChannelFuture future : channelFutures) {
+ Channel channel = future.sync().channel();
+ channel.closeFuture().addListener(channelCloseListener);
synchronized (lock) {
- if (state != STARTED) {
- return;
- }
- LOGGER.log(Level.WARN, "{} has stopped unexpectedly. Starting server recovery", this);
- MXHelper.logFileDescriptors();
- triggerRecovery();
+ channels.add(channel);
}
- });
- return newChannel;
+ }
}
- private void triggerRecovery() {
+ private void triggerRecovery() throws InterruptedException {
Thread rt = recoveryThread;
if (rt != null) {
try {
@@ -267,7 +294,7 @@
return;
}
}
- // try to revive the channel
+ // try to revive the channels
recoveryThread = new Thread(this::recover);
recoveryThread.start();
}
@@ -275,9 +302,11 @@
public void recover() {
try {
synchronized (lock) {
- while (state == STARTED) {
+ while (state == RECOVERING) {
try {
- channel = bind();
+ closeChannels();
+ bind();
+ setStarted();
break;
} catch (InterruptedException e) {
LOGGER.log(Level.WARN, this + " was interrupted while attempting to revive server channel", e);
@@ -329,10 +358,7 @@
} catch (Exception e) {
LOGGER.log(Level.ERROR, "Error while shutting down http server executor", e);
}
- if (channel != null) {
- channel.close();
- channel.closeFuture().sync();
- }
+ closeChannels();
}
public IServlet getServlet(FullHttpRequest request) {
@@ -369,8 +395,8 @@
@Override
public String toString() {
- return "{\"class\":\"" + getClass().getSimpleName() + "\",\"address\":" + address + ",\"state\":\"" + getState()
- + "\"}";
+ return "{\"class\":\"" + getClass().getSimpleName() + "\",\"address\":" + defaultAddress + ",\"state\":\""
+ + getState() + "\"}";
}
public HttpServerConfig getConfig() {
@@ -378,6 +404,17 @@
}
public InetSocketAddress getAddress() {
- return address;
+ return defaultAddress;
+ }
+
+ private void closeChannels() throws InterruptedException {
+ synchronized (lock) {
+ for (Channel channel : channels) {
+ channel.closeFuture().removeListener(channelCloseListener);
+ channel.close();
+ channel.closeFuture().sync();
+ }
+ channels.clear();
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java
index 84c8c65..76438aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java
@@ -28,6 +28,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -263,7 +264,10 @@
WebManager webMgr = new WebManager();
final HttpServerConfig config = HttpServerConfigBuilder.custom().setThreadCount(numExecutors)
.setRequestQueueSize(serverQueueSize).build();
- HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, config);
+ List<InetSocketAddress> addresses = new ArrayList<>();
+ addresses.add(new InetSocketAddress(PORT));
+ addresses.add(new InetSocketAddress(PORT + 1));
+ HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), addresses, config, null);
ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { PATH });
server.addServlet(servlet);
webMgr.add(server);
@@ -276,12 +280,12 @@
}
Assert.assertEquals(numRequests, SUCCESS_COUNT.get());
// close the channel
- Field channelField = server.getClass().getDeclaredField("channel");
+ Field channelField = server.getClass().getDeclaredField("channels");
channelField.setAccessible(true);
Field recoveryThreadField = server.getClass().getDeclaredField("recoveryThread");
recoveryThreadField.setAccessible(true);
- Channel channel = (Channel) channelField.get(server);
- channel.close();
+ List<Channel> channels = (ArrayList<Channel>) channelField.get(server);
+ channels.get(0).close();
Thread.sleep(1000);
final int sleeps = 10;
for (int i = 0; i < sleeps; i++) {
@@ -409,6 +413,43 @@
}
}
+ @Test
+ public void multiAddressServerTest() throws Exception {
+ final WebManager webMgr = new WebManager();
+ final HttpServerConfig config =
+ HttpServerConfigBuilder.custom().setThreadCount(16).setRequestQueueSize(16).build();
+ List<Integer> ports = Arrays.asList(PORT, PORT + 1);
+ List<InetSocketAddress> addresses = new ArrayList<>();
+ for (Integer port : ports) {
+ addresses.add(new InetSocketAddress(port));
+ }
+ HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), addresses, config, null);
+ EchoServlet servlet = new EchoServlet(server.ctx(), PATH);
+ server.addServlet(servlet);
+ webMgr.add(server);
+ webMgr.start();
+ try {
+ for (Integer port : ports) {
+ try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+ final URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, port,
+ HttpServerTest.PATH, null, null);
+ final HttpPost postRequest = new HttpPost(uri);
+ final String requestBody = "test";
+ final StringEntity chunkedEntity = new StringEntity(requestBody);
+ chunkedEntity.setChunked(true);
+ postRequest.setEntity(chunkedEntity);
+ try (CloseableHttpResponse response = httpClient.execute(postRequest)) {
+ final String responseBody = EntityUtils.toString(response.getEntity());
+ Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpResponseStatus.OK.code());
+ Assert.assertEquals(responseBody, requestBody);
+ }
+ }
+ }
+ } finally {
+ webMgr.stop();
+ }
+ }
+
private void request(int count) throws URISyntaxException {
request(count, 0);
}