[NO ISSUE][OTH] Support multiple addresses in http servers

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Allow binding http servers to multiple addresses.
- Add test cases.

Change-Id: I68f25dc5af471c7ded29f27405c311947a007947
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9624
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 93762a6..97b8859 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -20,6 +20,9 @@
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -39,6 +42,7 @@
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
@@ -50,6 +54,8 @@
 import io.netty.handler.codec.http.HttpScheme;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 
 public class HttpServer {
     // Constants
@@ -64,6 +70,7 @@
     private static final int STARTING = 1;
     private static final int STARTED = 2;
     private static final int STOPPING = 3;
+    private static final int RECOVERING = 4;
     // Final members
     private final IChannelClosedHandler closedHandler;
     private final Object lock = new Object();
@@ -73,38 +80,59 @@
     private final ServletRegistry servlets;
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
-    private final InetSocketAddress address;
+    private final InetSocketAddress defaultAddress;
+    private final List<InetSocketAddress> addresses;
     private final ThreadPoolExecutor executor;
     // Mutable members
     private volatile int state = STOPPED;
     private volatile Thread recoveryThread;
-    private volatile Channel channel;
+    private final List<Channel> channels;
     private Throwable cause;
     private HttpServerConfig config;
 
-    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, HttpServerConfig config) {
-        this(bossGroup, workerGroup, new InetSocketAddress(port), config, null);
-    }
+    private final GenericFutureListener<Future<Void>> channelCloseListener = f -> {
+        // This listener is invoked from within a netty IO thread. Hence, we can never block it
+        // For simplicity, we will submit the recovery task to a different thread. We will also
+        // close all channels on this server and attempt to rebind them.
+        synchronized (lock) {
+            if (state != STARTED) {
+                return;
+            }
+            LOGGER.log(Level.WARN, "{} has stopped unexpectedly. Starting server recovery", this);
+            MXHelper.logFileDescriptors();
+            state = RECOVERING;
+            triggerRecovery();
+        }
+    };
 
-    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, InetSocketAddress address,
-            HttpServerConfig config) {
-        this(bossGroup, workerGroup, address, config, null);
+    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port, HttpServerConfig config) {
+        this(bossGroup, workerGroup, Collections.singletonList(new InetSocketAddress(port)), config, null);
     }
 
     public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, InetSocketAddress address,
             HttpServerConfig config, IChannelClosedHandler closeHandler) {
+        this(bossGroup, workerGroup, Collections.singletonList(address), config, closeHandler);
+    }
+
+    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, List<InetSocketAddress> addresses,
+            HttpServerConfig config, IChannelClosedHandler closeHandler) {
+        if (addresses.isEmpty()) {
+            throw new IllegalArgumentException("no addresses specified");
+        }
         this.bossGroup = bossGroup;
         this.workerGroup = workerGroup;
-        this.address = address;
+        this.addresses = addresses;
+        defaultAddress = addresses.get(0);
         this.closedHandler = closeHandler;
         this.config = config;
+        channels = new ArrayList<>();
         ctx = new ConcurrentHashMap<>();
         servlets = new ServletRegistry();
         workQueue = new LinkedBlockingQueue<>(config.getRequestQueueSize());
         int numExecutorThreads = config.getThreadCount();
         executor = new ThreadPoolExecutor(numExecutorThreads, numExecutorThreads, 0L, TimeUnit.MILLISECONDS, workQueue,
                 runnable -> new Thread(runnable,
-                        "HttpExecutor(port:" + address.getPort() + ")-" + threadId.getAndIncrement()));
+                        "HttpExecutor(port:" + defaultAddress.getPort() + ")-" + threadId.getAndIncrement()));
         long directMemoryBudget = numExecutorThreads * (long) HIGH_WRITE_BUFFER_WATER_MARK
                 + numExecutorThreads * config.getMaxResponseChunkSize();
         LOGGER.log(Level.DEBUG,
@@ -128,7 +156,7 @@
                 doStart();
                 setStarted();
             } catch (Throwable e) { // NOSONAR
-                LOGGER.error("Failure starting an Http Server at: {}", address, e);
+                LOGGER.error("Failure starting an Http Server at: {}", defaultAddress, e);
                 setFailed(e);
                 throw e;
             }
@@ -175,6 +203,8 @@
                 return "STOPPING";
             case STOPPED:
                 return "STOPPED";
+            case RECOVERING:
+                return "RECOVERING";
             default:
                 return "UNKNOWN";
         }
@@ -229,10 +259,10 @@
         for (IServlet servlet : servlets.getServlets()) {
             servlet.init();
         }
-        channel = bind();
+        bind();
     }
 
-    private Channel bind() throws InterruptedException {
+    private void bind() throws InterruptedException {
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                 .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
@@ -240,23 +270,20 @@
                 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK)
                 .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(getChannelInitializer());
-        Channel newChannel = b.bind(address).sync().channel();
-        newChannel.closeFuture().addListener(f -> {
-            // This listener is invoked from within a netty IO thread. Hence, we can never block it
-            // For simplicity, we will submit the recovery task to a different thread
+        List<ChannelFuture> channelFutures = new ArrayList<>();
+        for (InetSocketAddress address : addresses) {
+            channelFutures.add(b.bind(address));
+        }
+        for (ChannelFuture future : channelFutures) {
+            Channel channel = future.sync().channel();
+            channel.closeFuture().addListener(channelCloseListener);
             synchronized (lock) {
-                if (state != STARTED) {
-                    return;
-                }
-                LOGGER.log(Level.WARN, "{} has stopped unexpectedly. Starting server recovery", this);
-                MXHelper.logFileDescriptors();
-                triggerRecovery();
+                channels.add(channel);
             }
-        });
-        return newChannel;
+        }
     }
 
-    private void triggerRecovery() {
+    private void triggerRecovery() throws InterruptedException {
         Thread rt = recoveryThread;
         if (rt != null) {
             try {
@@ -267,7 +294,7 @@
                 return;
             }
         }
-        // try to revive the channel
+        // try to revive the channels
         recoveryThread = new Thread(this::recover);
         recoveryThread.start();
     }
@@ -275,9 +302,11 @@
     public void recover() {
         try {
             synchronized (lock) {
-                while (state == STARTED) {
+                while (state == RECOVERING) {
                     try {
-                        channel = bind();
+                        closeChannels();
+                        bind();
+                        setStarted();
                         break;
                     } catch (InterruptedException e) {
                         LOGGER.log(Level.WARN, this + " was interrupted while attempting to revive server channel", e);
@@ -329,10 +358,7 @@
         } catch (Exception e) {
             LOGGER.log(Level.ERROR, "Error while shutting down http server executor", e);
         }
-        if (channel != null) {
-            channel.close();
-            channel.closeFuture().sync();
-        }
+        closeChannels();
     }
 
     public IServlet getServlet(FullHttpRequest request) {
@@ -369,8 +395,8 @@
 
     @Override
     public String toString() {
-        return "{\"class\":\"" + getClass().getSimpleName() + "\",\"address\":" + address + ",\"state\":\"" + getState()
-                + "\"}";
+        return "{\"class\":\"" + getClass().getSimpleName() + "\",\"address\":" + defaultAddress + ",\"state\":\""
+                + getState() + "\"}";
     }
 
     public HttpServerConfig getConfig() {
@@ -378,6 +404,17 @@
     }
 
     public InetSocketAddress getAddress() {
-        return address;
+        return defaultAddress;
+    }
+
+    private void closeChannels() throws InterruptedException {
+        synchronized (lock) {
+            for (Channel channel : channels) {
+                channel.closeFuture().removeListener(channelCloseListener);
+                channel.close();
+                channel.closeFuture().sync();
+            }
+            channels.clear();
+        }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java
index 84c8c65..76438aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/test/http/HttpServerTest.java
@@ -28,6 +28,7 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -263,7 +264,10 @@
         WebManager webMgr = new WebManager();
         final HttpServerConfig config = HttpServerConfigBuilder.custom().setThreadCount(numExecutors)
                 .setRequestQueueSize(serverQueueSize).build();
-        HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, config);
+        List<InetSocketAddress> addresses = new ArrayList<>();
+        addresses.add(new InetSocketAddress(PORT));
+        addresses.add(new InetSocketAddress(PORT + 1));
+        HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), addresses, config, null);
         ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { PATH });
         server.addServlet(servlet);
         webMgr.add(server);
@@ -276,12 +280,12 @@
             }
             Assert.assertEquals(numRequests, SUCCESS_COUNT.get());
             // close the channel
-            Field channelField = server.getClass().getDeclaredField("channel");
+            Field channelField = server.getClass().getDeclaredField("channels");
             channelField.setAccessible(true);
             Field recoveryThreadField = server.getClass().getDeclaredField("recoveryThread");
             recoveryThreadField.setAccessible(true);
-            Channel channel = (Channel) channelField.get(server);
-            channel.close();
+            List<Channel> channels = (ArrayList<Channel>) channelField.get(server);
+            channels.get(0).close();
             Thread.sleep(1000);
             final int sleeps = 10;
             for (int i = 0; i < sleeps; i++) {
@@ -409,6 +413,43 @@
         }
     }
 
+    @Test
+    public void multiAddressServerTest() throws Exception {
+        final WebManager webMgr = new WebManager();
+        final HttpServerConfig config =
+                HttpServerConfigBuilder.custom().setThreadCount(16).setRequestQueueSize(16).build();
+        List<Integer> ports = Arrays.asList(PORT, PORT + 1);
+        List<InetSocketAddress> addresses = new ArrayList<>();
+        for (Integer port : ports) {
+            addresses.add(new InetSocketAddress(port));
+        }
+        HttpServer server = new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), addresses, config, null);
+        EchoServlet servlet = new EchoServlet(server.ctx(), PATH);
+        server.addServlet(servlet);
+        webMgr.add(server);
+        webMgr.start();
+        try {
+            for (Integer port : ports) {
+                try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+                    final URI uri = new URI(HttpServerTest.PROTOCOL, null, HttpServerTest.HOST, port,
+                            HttpServerTest.PATH, null, null);
+                    final HttpPost postRequest = new HttpPost(uri);
+                    final String requestBody = "test";
+                    final StringEntity chunkedEntity = new StringEntity(requestBody);
+                    chunkedEntity.setChunked(true);
+                    postRequest.setEntity(chunkedEntity);
+                    try (CloseableHttpResponse response = httpClient.execute(postRequest)) {
+                        final String responseBody = EntityUtils.toString(response.getEntity());
+                        Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpResponseStatus.OK.code());
+                        Assert.assertEquals(responseBody, requestBody);
+                    }
+                }
+            }
+        } finally {
+            webMgr.stop();
+        }
+    }
+
     private void request(int count) throws URISyntaxException {
         request(count, 0);
     }