[ASTERIXDB-2282][HTTP] Revive HTTP server on unexpected channel drops
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Previously, when the http server channel drops unexpectedly, we
did nothing.
- After this change, the http server will log the event and try
to re-bind to the port until it either succeeds or
server.stop() is invoked.
Change-Id: I7da75a9e34795c94518aca243b4cef387221d8fd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2382
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 19436ab..8ce1d70 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -73,7 +73,8 @@
private final ThreadPoolExecutor executor;
// Mutable members
private volatile int state = STOPPED;
- private Channel channel;
+ private volatile Thread recoveryThread;
+ private volatile Channel channel;
private Throwable cause;
public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, int port) {
@@ -134,6 +135,14 @@
throw e;
}
}
+ // Should wait for the recovery thread outside synchronized block
+ Thread rt = recoveryThread;
+ if (rt != null) {
+ rt.join(TimeUnit.SECONDS.toMillis(5));
+ if (recoveryThread != null) {
+ LOGGER.log(Level.ERROR, "Failure stopping recovery thread of {}", this);
+ }
+ }
}
/**
@@ -209,6 +218,10 @@
* Note that it doesn't work for the case where multiple paths map to a single IServlet
*/
Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - l1.getPaths()[0].length());
+ channel = bind();
+ }
+
+ private Channel bind() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECEIVE_BUFFER_SIZE))
@@ -216,10 +229,74 @@
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WRITE_BUFFER_WATER_MARK)
.handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new HttpServerInitializer(this));
- channel = b.bind(port).sync().channel();
+ Channel newChannel = b.bind(port).sync().channel();
+ newChannel.closeFuture().addListener(f -> {
+ // This listener is invoked from within a netty IO thread. Hence, we can never block it
+ // For simplicity, we will submit the recovery task to a different thread
+ synchronized (lock) {
+ if (state != STARTED) {
+ return;
+ }
+ LOGGER.log(Level.WARN, "{} has stopped unexpectedly. Starting server recovery", this);
+ triggerRecovery();
+ }
+ });
+ return newChannel;
+ }
+
+ private void triggerRecovery() {
+ Thread rt = recoveryThread;
+ if (rt != null) {
+ try {
+ rt.join();
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, this + " recovery was interrupted", e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ // try to revive the channel
+ recoveryThread = new Thread(this::recover);
+ recoveryThread.start();
+ }
+
+ public void recover() {
+ try {
+ synchronized (lock) {
+ while (state == STARTED) {
+ try {
+ channel = bind();
+ break;
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, this + " was interrupted while attempting to revive server channel", e);
+ setFailed(e);
+ Thread.currentThread().interrupt();
+ } catch (Throwable th) {
+ // sleep for 5s
+ LOGGER.log(Level.WARN, this + " failed server recovery attempt. "
+ + "Sleeping for 5s before starting the next attempt", th);
+ try {
+ // Wait on lock to allow stop request to be executed
+ lock.wait(TimeUnit.SECONDS.toMillis(5));
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, this + " interrupted while attempting to revive server channel", e);
+ setFailed(e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ } finally {
+ recoveryThread = null;
+ }
}
protected void doStop() throws InterruptedException {
+ // stop recovery if it was ongoing
+ Thread rt = recoveryThread;
+ if (rt != null) {
+ rt.interrupt();
+ }
// stop taking new requests
executor.shutdown();
try {
@@ -300,4 +377,10 @@
public int getWorkQueueSize() {
return workQueue.size();
}
+
+ @Override
+ public String toString() {
+ return "{\"class\":\"" + getClass().getSimpleName() + "\",\"port\":" + port + ",\"state\":\"" + getState()
+ + "\"}";
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
index 9067586..298d2de 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/test/java/org/apache/hyracks/http/test/HttpServerTest.java
@@ -48,13 +48,18 @@
import org.apache.hyracks.http.server.utils.HttpUtil;
import org.apache.hyracks.http.servlet.ChattyServlet;
import org.apache.hyracks.http.servlet.SleepyServlet;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import io.netty.channel.Channel;
import io.netty.handler.codec.http.HttpResponseStatus;
public class HttpServerTest {
+ private static final Logger LOGGER = LogManager.getLogger();
static final boolean PRINT_TO_CONSOLE = false;
static final int PORT = 9898;
static final String HOST = "localhost";
@@ -241,6 +246,63 @@
}
}
+ @Test
+ public void testServerRevival() throws Exception {
+ int numExecutors = 16;
+ int serverQueueSize = 16;
+ int numRequests = 1;
+ WebManager webMgr = new WebManager();
+ HttpServer server =
+ new HttpServer(webMgr.getBosses(), webMgr.getWorkers(), PORT, numExecutors, serverQueueSize);
+ ChattyServlet servlet = new ChattyServlet(server.ctx(), new String[] { PATH });
+ server.addServlet(servlet);
+ webMgr.add(server);
+ webMgr.start();
+ try {
+ // send a request
+ request(numRequests);
+ for (Future<Void> thread : FUTURES) {
+ thread.get();
+ }
+ Assert.assertEquals(numRequests, SUCCESS_COUNT.get());
+ // close the channel
+ Field channelField = server.getClass().getDeclaredField("channel");
+ channelField.setAccessible(true);
+ Field recoveryThreadField = server.getClass().getDeclaredField("recoveryThread");
+ recoveryThreadField.setAccessible(true);
+ Channel channel = (Channel) channelField.get(server);
+ channel.close();
+ Thread.sleep(1000);
+ final int sleeps = 10;
+ for (int i = 0; i < sleeps; i++) {
+ Thread thread = (Thread) recoveryThreadField.get(server);
+ if (thread == null) {
+ break;
+ }
+ LOGGER.log(Level.WARN,
+ "Attempt #" + (i + 1) + ". Recovery thread is not null and has id " + thread.getId());
+ if (i == sleeps - 1) {
+ throw new Exception("Http server recovery didn't complete after " + sleeps + "s");
+ }
+ Thread.sleep(1000);
+ }
+ for (int i = 0; i < sleeps; i++) {
+ request(1);
+ for (Future<Void> thread : FUTURES) {
+ thread.get();
+ }
+ if (numRequests + 1 == SUCCESS_COUNT.get()) {
+ break;
+ } else if (i == sleeps - 1) {
+ throw new Exception(
+ "Http server couldn't process requests correctly after recovery for " + sleeps + "s");
+ }
+ }
+ } finally {
+ webMgr.stop();
+ }
+ }
+
public static void setPrivateField(Object obj, String filedName, Object value) throws Exception {
Field f = obj.getClass().getDeclaredField(filedName);
f.setAccessible(true);