[NO ISSUE][NET] Networking Improvements
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Set keep alive and no TCP delay options
on socket channels.
- Cancel key and close IPC handle on failed
read/writes to avoid getting the same failures
with every NetworkThread loop.
Change-Id: I60c1f9cfe2ea577fca14cd2e98c6461c49df011a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2418
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index d3ddc43..bfac451 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -32,7 +32,7 @@
import org.apache.asterix.common.storage.ReplicaIdentifier;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
import org.apache.asterix.replication.sync.ReplicaSynchronizer;
-import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.NetworkUtil;
import org.apache.hyracks.util.StorageUtil;
import org.apache.hyracks.util.annotations.ThreadSafe;
import org.apache.logging.log4j.LogManager;
@@ -97,6 +97,7 @@
try {
if (sc == null || !sc.isOpen() || !sc.isConnected()) {
sc = SocketChannel.open();
+ NetworkUtil.configure(sc);
sc.configureBlocking(true);
sc.connect(id.getLocation());
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index a092322..8ccfced 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.replication.IPartitionReplica;
import org.apache.asterix.common.replication.IReplicationDestination;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import org.apache.hyracks.util.NetworkUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -78,6 +79,7 @@
try {
if (logRepChannel == null || !logRepChannel.isOpen() || !logRepChannel.isConnected()) {
logRepChannel = SocketChannel.open();
+ NetworkUtil.configure(logRepChannel);
logRepChannel.configureBlocking(true);
logRepChannel.connect(location);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
index 7f492eb..7f59db1 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
@@ -53,5 +53,10 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 86c8c75..3e6c64b 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
-import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.ClosedChannelException;
@@ -41,6 +40,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
+import org.apache.hyracks.util.NetworkUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -218,14 +218,12 @@
if (!workingPendingConnections.isEmpty()) {
for (IPCHandle handle : workingPendingConnections) {
SocketChannel channel = SocketChannel.open();
- openChannels.add(channel);
- channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
- channel.configureBlocking(false);
+ register(channel);
SelectionKey cKey;
if (channel.connect(handle.getRemoteAddress())) {
cKey = channel.register(selector, SelectionKey.OP_READ);
handle.setState(HandleState.CONNECT_SENT);
- write(createInitialReqMessage(handle));
+ IPCConnectionManager.this.write(createInitialReqMessage(handle));
} else {
cKey = channel.register(selector, SelectionKey.OP_CONNECT);
}
@@ -273,48 +271,15 @@
for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
SelectionKey key = i.next();
i.remove();
- SelectableChannel sc = key.channel();
+ final SelectableChannel sc = key.channel();
if (key.isReadable()) {
- SocketChannel channel = (SocketChannel) sc;
- IPCHandle handle = (IPCHandle) key.attachment();
- ByteBuffer readBuffer = handle.getInBuffer();
- int len = channel.read(readBuffer);
- system.getPerformanceCounters().addMessageBytesReceived(len);
- if (len < 0) {
- key.cancel();
- IOUtils.closeQuietly(channel);
- openChannels.remove(channel);
- handle.close();
- } else {
- handle.processIncomingMessages();
- if (!readBuffer.hasRemaining()) {
- handle.resizeInBuffer();
- }
- }
+ read(key);
} else if (key.isWritable()) {
- SocketChannel channel = (SocketChannel) sc;
- IPCHandle handle = (IPCHandle) key.attachment();
- ByteBuffer writeBuffer = handle.getOutBuffer();
- int len = channel.write(writeBuffer);
- system.getPerformanceCounters().addMessageBytesSent(len);
- if (len < 0) {
- key.cancel();
- IOUtils.closeQuietly(channel);
- openChannels.remove(channel);
- handle.close();
- } else if (!writeBuffer.hasRemaining()) {
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
- }
- if (handle.full()) {
- handle.clearFull();
- selector.wakeup();
- }
+ write(key);
} else if (key.isAcceptable()) {
assert sc == serverSocketChannel;
SocketChannel channel = serverSocketChannel.accept();
- openChannels.add(channel);
- channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
- channel.configureBlocking(false);
+ register(channel);
IPCHandle handle = new IPCHandle(system, null);
SelectionKey cKey = channel.register(selector, SelectionKey.OP_READ);
handle.setKey(cKey);
@@ -331,7 +296,7 @@
handle.setState(HandleState.CONNECT_SENT);
registerHandle(handle);
key.interestOps(SelectionKey.OP_READ);
- write(createInitialReqMessage(handle));
+ IPCConnectionManager.this.write(createInitialReqMessage(handle));
}
}
}
@@ -378,6 +343,65 @@
workingSendList.clear();
moveAll(tempUnsentMessages, workingSendList);
}
+
+ private void register(SocketChannel channel) throws IOException {
+ openChannels.add(channel);
+ NetworkUtil.configure(channel);
+ channel.configureBlocking(false);
+ }
+
+ private void read(SelectionKey readableKey) {
+ SocketChannel channel = (SocketChannel) readableKey.channel();
+ IPCHandle handle = (IPCHandle) readableKey.attachment();
+ ByteBuffer readBuffer = handle.getInBuffer();
+ try {
+ int len = channel.read(readBuffer);
+ if (len < 0) {
+ close(readableKey, channel);
+ return;
+ }
+ system.getPerformanceCounters().addMessageBytesReceived(len);
+ handle.processIncomingMessages();
+ if (!readBuffer.hasRemaining()) {
+ handle.resizeInBuffer();
+ }
+ } catch (IOException e) {
+ LOGGER.error("TCP read error from {}", handle.getRemoteAddress(), e);
+ close(readableKey, channel);
+ }
+ }
+
+ private void write(SelectionKey writableKey) {
+ SocketChannel channel = (SocketChannel) writableKey.channel();
+ IPCHandle handle = (IPCHandle) writableKey.attachment();
+ ByteBuffer writeBuffer = handle.getOutBuffer();
+ try {
+ int len = channel.write(writeBuffer);
+ if (len < 0) {
+ close(writableKey, channel);
+ return;
+ }
+ system.getPerformanceCounters().addMessageBytesSent(len);
+ if (!writeBuffer.hasRemaining()) {
+ writableKey.interestOps(writableKey.interestOps() & ~SelectionKey.OP_WRITE);
+ }
+ if (handle.full()) {
+ handle.clearFull();
+ selector.wakeup();
+ }
+ } catch (IOException e) {
+ LOGGER.error("TCP write error to {}", handle.getRemoteAddress(), e);
+ close(writableKey, channel);
+ }
+ }
+
+ private void close(SelectionKey key, SocketChannel sc) {
+ key.cancel();
+ NetworkUtil.closeQuietly(sc);
+ openChannels.remove(sc);
+ final IPCHandle handle = (IPCHandle) key.attachment();
+ handle.close();
+ }
}
private <T> void moveAll(List<T> source, List<T> target) {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
index 36e33c5..1040e81 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
@@ -55,5 +55,10 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index 4633cf3..b2efe7f 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
-import java.net.StandardSocketOptions;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@@ -31,6 +30,7 @@
import java.util.Iterator;
import java.util.List;
+import org.apache.hyracks.util.NetworkUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -129,9 +129,7 @@
if (!workingPendingConnections.isEmpty()) {
for (InetSocketAddress address : workingPendingConnections) {
SocketChannel channel = SocketChannel.open();
- channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
- channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
- channel.configureBlocking(false);
+ register(channel);
boolean connect = false;
boolean failure = false;
try {
@@ -156,9 +154,7 @@
}
if (!workingIncomingConnections.isEmpty()) {
for (SocketChannel channel : workingIncomingConnections) {
- channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
- channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
- channel.configureBlocking(false);
+ register(channel);
SelectionKey sKey = channel.register(selector, 0);
TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
sKey.attach(connection);
@@ -211,7 +207,7 @@
}
}
} catch (Exception e) {
- LOGGER.error("Error in TCPEndpoint " + localAddress, e);
+ LOGGER.error("Error in TCPEndpoint {}", localAddress, e);
}
}
}
@@ -250,5 +246,10 @@
incomingConnections.clear();
}
}
+
+ private void register(SocketChannel channel) throws IOException {
+ NetworkUtil.configure(channel);
+ channel.configureBlocking(false);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
new file mode 100644
index 0000000..f9f45c1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+import java.net.StandardSocketOptions;
+import java.nio.channels.SocketChannel;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class NetworkUtil {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private NetworkUtil() {
+ }
+
+ public static void configure(SocketChannel sc) throws IOException {
+ sc.setOption(StandardSocketOptions.TCP_NODELAY, true);
+ sc.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
+ }
+
+ public static void closeQuietly(SocketChannel sc) {
+ if (sc.isOpen()) {
+ try {
+ sc.close();
+ } catch (IOException e) {
+ LOGGER.warn("Failed to close socket", e);
+ }
+ }
+ }
+}