[NO ISSUE][NET] Networking Improvements

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Set keep alive and no TCP delay options
  on socket channels.
- Cancel key and close IPC handle on failed
  read/writes to avoid getting the same failures
  with every NetworkThread loop.

Change-Id: I60c1f9cfe2ea577fca14cd2e98c6461c49df011a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2418
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index d3ddc43..bfac451 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -32,7 +32,7 @@
 import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
 import org.apache.asterix.replication.sync.ReplicaSynchronizer;
-import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.hyracks.util.StorageUtil;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.LogManager;
@@ -97,6 +97,7 @@
         try {
             if (sc == null || !sc.isOpen() || !sc.isConnected()) {
                 sc = SocketChannel.open();
+                NetworkUtil.configure(sc);
                 sc.configureBlocking(true);
                 sc.connect(id.getLocation());
             }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index a092322..8ccfced 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -30,6 +30,7 @@
 import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.replication.IReplicationDestination;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -78,6 +79,7 @@
         try {
             if (logRepChannel == null || !logRepChannel.isOpen() || !logRepChannel.isConnected()) {
                 logRepChannel = SocketChannel.open();
+                NetworkUtil.configure(logRepChannel);
                 logRepChannel.configureBlocking(true);
                 logRepChannel.connect(location);
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
index 7f492eb..7f59db1 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
@@ -53,5 +53,10 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 86c8c75..3e6c64b 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -21,7 +21,6 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channel;
 import java.nio.channels.ClosedChannelException;
@@ -41,6 +40,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -218,14 +218,12 @@
                     if (!workingPendingConnections.isEmpty()) {
                         for (IPCHandle handle : workingPendingConnections) {
                             SocketChannel channel = SocketChannel.open();
-                            openChannels.add(channel);
-                            channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
-                            channel.configureBlocking(false);
+                            register(channel);
                             SelectionKey cKey;
                             if (channel.connect(handle.getRemoteAddress())) {
                                 cKey = channel.register(selector, SelectionKey.OP_READ);
                                 handle.setState(HandleState.CONNECT_SENT);
-                                write(createInitialReqMessage(handle));
+                                IPCConnectionManager.this.write(createInitialReqMessage(handle));
                             } else {
                                 cKey = channel.register(selector, SelectionKey.OP_CONNECT);
                             }
@@ -273,48 +271,15 @@
                         for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
                             SelectionKey key = i.next();
                             i.remove();
-                            SelectableChannel sc = key.channel();
+                            final SelectableChannel sc = key.channel();
                             if (key.isReadable()) {
-                                SocketChannel channel = (SocketChannel) sc;
-                                IPCHandle handle = (IPCHandle) key.attachment();
-                                ByteBuffer readBuffer = handle.getInBuffer();
-                                int len = channel.read(readBuffer);
-                                system.getPerformanceCounters().addMessageBytesReceived(len);
-                                if (len < 0) {
-                                    key.cancel();
-                                    IOUtils.closeQuietly(channel);
-                                    openChannels.remove(channel);
-                                    handle.close();
-                                } else {
-                                    handle.processIncomingMessages();
-                                    if (!readBuffer.hasRemaining()) {
-                                        handle.resizeInBuffer();
-                                    }
-                                }
+                                read(key);
                             } else if (key.isWritable()) {
-                                SocketChannel channel = (SocketChannel) sc;
-                                IPCHandle handle = (IPCHandle) key.attachment();
-                                ByteBuffer writeBuffer = handle.getOutBuffer();
-                                int len = channel.write(writeBuffer);
-                                system.getPerformanceCounters().addMessageBytesSent(len);
-                                if (len < 0) {
-                                    key.cancel();
-                                    IOUtils.closeQuietly(channel);
-                                    openChannels.remove(channel);
-                                    handle.close();
-                                } else if (!writeBuffer.hasRemaining()) {
-                                    key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
-                                }
-                                if (handle.full()) {
-                                    handle.clearFull();
-                                    selector.wakeup();
-                                }
+                                write(key);
                             } else if (key.isAcceptable()) {
                                 assert sc == serverSocketChannel;
                                 SocketChannel channel = serverSocketChannel.accept();
-                                openChannels.add(channel);
-                                channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
-                                channel.configureBlocking(false);
+                                register(channel);
                                 IPCHandle handle = new IPCHandle(system, null);
                                 SelectionKey cKey = channel.register(selector, SelectionKey.OP_READ);
                                 handle.setKey(cKey);
@@ -331,7 +296,7 @@
                                 handle.setState(HandleState.CONNECT_SENT);
                                 registerHandle(handle);
                                 key.interestOps(SelectionKey.OP_READ);
-                                write(createInitialReqMessage(handle));
+                                IPCConnectionManager.this.write(createInitialReqMessage(handle));
                             }
                         }
                     }
@@ -378,6 +343,65 @@
             workingSendList.clear();
             moveAll(tempUnsentMessages, workingSendList);
         }
+
+        private void register(SocketChannel channel) throws IOException {
+            openChannels.add(channel);
+            NetworkUtil.configure(channel);
+            channel.configureBlocking(false);
+        }
+
+        private void read(SelectionKey readableKey) {
+            SocketChannel channel = (SocketChannel) readableKey.channel();
+            IPCHandle handle = (IPCHandle) readableKey.attachment();
+            ByteBuffer readBuffer = handle.getInBuffer();
+            try {
+                int len = channel.read(readBuffer);
+                if (len < 0) {
+                    close(readableKey, channel);
+                    return;
+                }
+                system.getPerformanceCounters().addMessageBytesReceived(len);
+                handle.processIncomingMessages();
+                if (!readBuffer.hasRemaining()) {
+                    handle.resizeInBuffer();
+                }
+            } catch (IOException e) {
+                LOGGER.error("TCP read error from {}", handle.getRemoteAddress(), e);
+                close(readableKey, channel);
+            }
+        }
+
+        private void write(SelectionKey writableKey) {
+            SocketChannel channel = (SocketChannel) writableKey.channel();
+            IPCHandle handle = (IPCHandle) writableKey.attachment();
+            ByteBuffer writeBuffer = handle.getOutBuffer();
+            try {
+                int len = channel.write(writeBuffer);
+                if (len < 0) {
+                    close(writableKey, channel);
+                    return;
+                }
+                system.getPerformanceCounters().addMessageBytesSent(len);
+                if (!writeBuffer.hasRemaining()) {
+                    writableKey.interestOps(writableKey.interestOps() & ~SelectionKey.OP_WRITE);
+                }
+                if (handle.full()) {
+                    handle.clearFull();
+                    selector.wakeup();
+                }
+            } catch (IOException e) {
+                LOGGER.error("TCP write error to {}", handle.getRemoteAddress(), e);
+                close(writableKey, channel);
+            }
+        }
+
+        private void close(SelectionKey key, SocketChannel sc) {
+            key.cancel();
+            NetworkUtil.closeQuietly(sc);
+            openChannels.remove(sc);
+            final IPCHandle handle = (IPCHandle) key.attachment();
+            handle.close();
+        }
     }
 
     private <T> void moveAll(List<T> source, List<T> target) {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
index 36e33c5..1040e81 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
@@ -55,5 +55,10 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index 4633cf3..b2efe7f 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -21,7 +21,6 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.net.StandardSocketOptions;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
@@ -31,6 +30,7 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -129,9 +129,7 @@
                     if (!workingPendingConnections.isEmpty()) {
                         for (InetSocketAddress address : workingPendingConnections) {
                             SocketChannel channel = SocketChannel.open();
-                            channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
-                            channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
-                            channel.configureBlocking(false);
+                            register(channel);
                             boolean connect = false;
                             boolean failure = false;
                             try {
@@ -156,9 +154,7 @@
                     }
                     if (!workingIncomingConnections.isEmpty()) {
                         for (SocketChannel channel : workingIncomingConnections) {
-                            channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
-                            channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
-                            channel.configureBlocking(false);
+                            register(channel);
                             SelectionKey sKey = channel.register(selector, 0);
                             TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
                             sKey.attach(connection);
@@ -211,7 +207,7 @@
                         }
                     }
                 } catch (Exception e) {
-                    LOGGER.error("Error in TCPEndpoint " + localAddress, e);
+                    LOGGER.error("Error in TCPEndpoint {}", localAddress, e);
                 }
             }
         }
@@ -250,5 +246,10 @@
                 incomingConnections.clear();
             }
         }
+
+        private void register(SocketChannel channel) throws IOException {
+            NetworkUtil.configure(channel);
+            channel.configureBlocking(false);
+        }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
new file mode 100644
index 0000000..f9f45c1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+import java.net.StandardSocketOptions;
+import java.nio.channels.SocketChannel;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class NetworkUtil {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private NetworkUtil() {
+    }
+
+    public static void configure(SocketChannel sc) throws IOException {
+        sc.setOption(StandardSocketOptions.TCP_NODELAY, true);
+        sc.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
+    }
+
+    public static void closeQuietly(SocketChannel sc) {
+        if (sc.isOpen()) {
+            try {
+                sc.close();
+            } catch (IOException e) {
+                LOGGER.warn("Failed to close socket", e);
+            }
+        }
+    }
+}