[NO ISSUE][HYR][NET] Send accepted messages prior to ipc shutdown
Change-Id: Ia7bb36a0552e21bd3d67b0882c8898af9b74d59d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5023
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 58aa39e..eaae8e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -37,12 +37,14 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.hyracks.api.network.ISocketChannelFactory;
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.NetworkUtil;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,6 +54,7 @@
// TODO(mblow): the next two could be config parameters
private static final int INITIAL_RETRY_DELAY_MILLIS = 100;
private static final int MAX_RETRY_DELAY_MILLIS = 15000;
+ private static final int MAX_STOP_JOIN_WAIT_MILLIS = 30000;
private final IPCSystem system;
@@ -107,6 +110,11 @@
stopped = true;
NetworkUtil.closeQuietly(serverSocketChannel);
networkThread.selector.wakeup();
+ InvokeUtil.doUninterruptibly(() -> networkThread.join(MAX_STOP_JOIN_WAIT_MILLIS));
+ if (networkThread.isAlive()) {
+ LOGGER.warn("giving up after waiting {}s for networkThread to exit",
+ TimeUnit.MILLISECONDS.toSeconds(MAX_STOP_JOIN_WAIT_MILLIS));
+ }
}
IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int maxRetries) throws IOException, InterruptedException {
@@ -154,7 +162,14 @@
}
}
- synchronized void write(Message msg) {
+ synchronized void send(Message msg) throws IPCException {
+ if (stopped) {
+ throw new IPCException("ipc system has been stopped");
+ }
+ write(msg);
+ }
+
+ private synchronized void write(Message msg) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Enqueued message: " + msg);
}
@@ -216,9 +231,24 @@
processSelectedKeys();
}
} catch (Exception e) {
- LOGGER.log(Level.ERROR, "Exception processing message", e);
+ LOGGER.error("Exception processing message", e);
}
}
+ // process any last work we accepted prior to being stopped, before we terminate
+ collectOutstandingWork();
+ LOGGER.trace("had {} pending messages at stop time!", workingSendList.size());
+ if (!workingSendList.isEmpty()) {
+ sendPendingMessages();
+ }
+ try {
+ int n = selector.selectNow();
+ LOGGER.trace("had {} keys remaining at stop time!", n);
+ if (n > 0) {
+ processSelectedKeys();
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception processing message", e);
+ }
}
private void processSelectedKeys() {
@@ -340,6 +370,7 @@
IPCHandle handle = msg.getIPCHandle();
if (handle.getState() == HandleState.CLOSED) {
// message will never be sent
+ LOGGER.info("Could not send message: {}, due to {}", msg, handle);
return true;
}
if (handle.full()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
index 25abfe1..ddcc677 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
@@ -81,7 +81,7 @@
msg.setFlag(Message.NORMAL);
msg.setPayload(req);
}
- system.getConnectionManager().write(msg);
+ system.getConnectionManager().send(msg);
return mid;
}