[NO ISSUE][NET] Improve Logging in TCPEndpoint

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Include end point local address in IO thread name.
- Log TCPEndpoint/Connection addresses on failures.
- Fix multiple IO Threads initialization.
- Fix synchronization for closed connection notification.

Change-Id: I5e78755150a6424fd22f587c4311da7c60b3b55a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2487
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
index 29afc6d..b0e2eed 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
@@ -101,4 +101,9 @@
             LOGGER.error(() -> "Error closing channel at: " + remoteAddress, e);
         }
     }
+
+    @Override
+    public String toString() {
+        return "TCPConnection[Remote Address: " + remoteAddress + " Local Address: " + endpoint.getLocalAddress() + "]";
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index b2efe7f..affa59e 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -63,18 +63,17 @@
             serverSocket.bind(localAddress);
             this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
         }
-
         ioThreads = new IOThread[nThreads];
         for (int i = 0; i < ioThreads.length; ++i) {
             ioThreads[i] = new IOThread();
         }
-
         if (localAddress != null) {
-            ioThreads[0].registerServerSocket(serverSocketChannel);
+            for (IOThread ioThread : ioThreads) {
+                ioThread.registerServerSocket(serverSocketChannel);
+            }
         }
-
-        for (int i = 0; i < ioThreads.length; ++i) {
-            ioThreads[i].start();
+        for (IOThread ioThread : ioThreads) {
+            ioThread.start();
         }
     }
 
@@ -98,6 +97,11 @@
         return localAddress;
     }
 
+    @Override
+    public String toString() {
+        return "TCPEndpoint [Local Address: " + localAddress + "]";
+    }
+
     private class IOThread extends Thread {
         private final List<InetSocketAddress> pendingConnections;
 
@@ -110,7 +114,7 @@
         private final Selector selector;
 
         public IOThread() throws IOException {
-            super("TCPEndpoint IO Thread");
+            super("TCPEndpoint IO Thread [" + localAddress + "]");
             setDaemon(true);
             setPriority(Thread.NORM_PRIORITY);
             this.pendingConnections = new ArrayList<>();
@@ -178,10 +182,12 @@
                                 try {
                                     connection.getEventListener().notifyIOReady(connection, readable, writable);
                                 } catch (Exception e) {
-                                    LOGGER.error("Unexpected tcp io error", e);
+                                    LOGGER.error("Unexpected tcp io error in connection {}", connection, e);
                                     connection.getEventListener().notifyIOError(e);
                                     connection.close();
-                                    connectionListener.connectionClosed(connection);
+                                    synchronized (connectionListener) {
+                                        connectionListener.connectionClosed(connection);
+                                    }
                                     continue;
                                 }
                             }
@@ -195,6 +201,7 @@
                                 try {
                                     finishConnect = channel.finishConnect();
                                 } catch (IOException e) {
+                                    LOGGER.error("Failed to finish connect to channel {}", key.attachment(), e);
                                     key.cancel();
                                     synchronized (connectionListener) {
                                         connectionListener.connectionFailure((InetSocketAddress) key.attachment(), e);
@@ -207,7 +214,7 @@
                         }
                     }
                 } catch (Exception e) {
-                    LOGGER.error("Error in TCPEndpoint {}", localAddress, e);
+                    LOGGER.error("Error in TCPEndpoint {}", TCPEndpoint.this, e);
                 }
             }
         }