[NO ISSUE][NET] Do Not Track Incoming IPC Connections Handles
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Incoming IPC connections use a random port every time
they are established even if they are coming from the
same remote node. We should not track any incoming
connections handles as the IPCConnectionManager only
needs to keep track of out-going connections to avoid
establishing multiple out-going connections to the same
destination.
- Unregister IPC handles from IPCConnectionManager when
they are closed.
Change-Id: I8e2328c3baf4d208bc9f4de7ce6c8d49ac7d4a61
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3291
Reviewed-by: Michael Blow <mblow@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index b4828e9..8cdd485 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -36,7 +36,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.NetworkUtil;
@@ -137,6 +136,17 @@
ipcHandleMap.put(handle.getRemoteAddress(), handle);
}
+ synchronized void unregisterHandle(IPCHandle handle) {
+ final InetSocketAddress remoteAddress = handle.getRemoteAddress();
+ if (remoteAddress != null) {
+ final IPCHandle ipcHandle = ipcHandleMap.get(remoteAddress);
+ // remove only if in closed state to avoid removing a new handle that was created for the same destination
+ if (ipcHandle != null && ipcHandle.getState() == HandleState.CLOSED) {
+ ipcHandleMap.remove(remoteAddress);
+ }
+ }
+ }
+
synchronized void write(Message msg) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Enqueued message: " + msg);
@@ -411,7 +421,9 @@
if (key != null) {
final Object attachment = key.attachment();
if (attachment != null) {
- ((IPCHandle) attachment).close();
+ final IPCHandle handle = (IPCHandle) attachment;
+ handle.close();
+ unregisterHandle(handle);
}
key.cancel();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
index 09c7c97..cdd7bfb 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
@@ -158,7 +158,6 @@
final boolean error = message.getFlag() == Message.ERROR;
if (!error && state == HandleState.CONNECT_RECEIVED) {
remoteAddress = (InetSocketAddress) message.getPayload();
- system.getConnectionManager().registerHandle(this);
setState(HandleState.CONNECTED);
system.getConnectionManager().ack(this, message);
} else if (!error && state == HandleState.CONNECT_SENT) {