[NO ISSUE][HYR][NET] Send accepted messages prior to ipc shutdown

Change-Id: Ia7bb36a0552e21bd3d67b0882c8898af9b74d59d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5023
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 58aa39e..eaae8e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -37,12 +37,14 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.hyracks.api.network.ISocketChannelFactory;
+import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.util.ExitUtil;
 import org.apache.hyracks.util.NetworkUtil;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -52,6 +54,7 @@
     // TODO(mblow): the next two could be config parameters
     private static final int INITIAL_RETRY_DELAY_MILLIS = 100;
     private static final int MAX_RETRY_DELAY_MILLIS = 15000;
+    private static final int MAX_STOP_JOIN_WAIT_MILLIS = 30000;
 
     private final IPCSystem system;
 
@@ -107,6 +110,11 @@
         stopped = true;
         NetworkUtil.closeQuietly(serverSocketChannel);
         networkThread.selector.wakeup();
+        InvokeUtil.doUninterruptibly(() -> networkThread.join(MAX_STOP_JOIN_WAIT_MILLIS));
+        if (networkThread.isAlive()) {
+            LOGGER.warn("giving up after waiting {}s for networkThread to exit",
+                    TimeUnit.MILLISECONDS.toSeconds(MAX_STOP_JOIN_WAIT_MILLIS));
+        }
     }
 
     IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int maxRetries) throws IOException, InterruptedException {
@@ -154,7 +162,14 @@
         }
     }
 
-    synchronized void write(Message msg) {
+    synchronized void send(Message msg) throws IPCException {
+        if (stopped) {
+            throw new IPCException("ipc system has been stopped");
+        }
+        write(msg);
+    }
+
+    private synchronized void write(Message msg) {
         if (LOGGER.isTraceEnabled()) {
             LOGGER.trace("Enqueued message: " + msg);
         }
@@ -216,9 +231,24 @@
                         processSelectedKeys();
                     }
                 } catch (Exception e) {
-                    LOGGER.log(Level.ERROR, "Exception processing message", e);
+                    LOGGER.error("Exception processing message", e);
                 }
             }
+            // process any last work we accepted prior to being stopped, before we terminate
+            collectOutstandingWork();
+            LOGGER.trace("had {} pending messages at stop time!", workingSendList.size());
+            if (!workingSendList.isEmpty()) {
+                sendPendingMessages();
+            }
+            try {
+                int n = selector.selectNow();
+                LOGGER.trace("had {} keys remaining at stop time!", n);
+                if (n > 0) {
+                    processSelectedKeys();
+                }
+            } catch (Exception e) {
+                LOGGER.error("Exception processing message", e);
+            }
         }
 
         private void processSelectedKeys() {
@@ -340,6 +370,7 @@
             IPCHandle handle = msg.getIPCHandle();
             if (handle.getState() == HandleState.CLOSED) {
                 // message will never be sent
+                LOGGER.info("Could not send message: {}, due to {}", msg, handle);
                 return true;
             }
             if (handle.full()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
index 25abfe1..ddcc677 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
@@ -81,7 +81,7 @@
             msg.setFlag(Message.NORMAL);
             msg.setPayload(req);
         }
-        system.getConnectionManager().write(msg);
+        system.getConnectionManager().send(msg);
         return mid;
     }