[ASTERIXDB-2479][API] Add Network Diagnostics API

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Add a new API that shows the state of node to node
  connections and their logical channels to help diagnose
  networking issues.
- Add channel details to the waiting thread name in
  NetworkOutputChannel.
- Add test case.

Change-Id: Id6fd5a96c56e7078d1404bebcbab8afe93ba8f64
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3025
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
index 192a80a..525543f 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
@@ -60,5 +60,9 @@
       <artifactId>hyracks-util</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index f7ef2aa..3a35212 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -29,6 +29,10 @@
 import org.apache.hyracks.api.comm.IChannelWriteInterface;
 import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection.WriterState;
+import org.apache.hyracks.util.JSONUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
  * Handle to a channel that represents a logical full-duplex communication end-point.
@@ -168,4 +172,17 @@
     public InetSocketAddress getRemoteAddress() {
         return cSet.getMultiplexedConnection().getRemoteAddress();
     }
+
+    public JsonNode getState() {
+        final ObjectNode state = JSONUtil.createObject();
+        state.put("id", channelId);
+        state.put("localClose", localClose.get());
+        state.put("localCloseAck", localCloseAck.get());
+        state.put("remoteClose", remoteClose.get());
+        state.put("remoteCloseAck", remoteCloseAck.get());
+        state.put("readCredits", ri.getCredits());
+        state.put("writeCredits", wi.getCredits());
+        state.put("completelyClosed", completelyClosed());
+        return state;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
index f5cdf2c..179f42c 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -24,9 +24,12 @@
 import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
 import org.apache.hyracks.api.comm.MuxDemuxCommand;
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.util.JSONUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
 public class ChannelSet {
     private static final Logger LOGGER = LogManager.getLogger();
 
@@ -243,4 +246,14 @@
     public MultiplexedConnection getMultiplexedConnection() {
         return mConn;
     }
+
+    public synchronized ArrayNode getState() {
+        final ArrayNode state = JSONUtil.createArray();
+        for (ChannelControlBlock ccb : ccbArray) {
+            if (ccb != null) {
+                state.add(ccb.getState());
+            }
+        }
+        return state;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 4c3836a..96ccafb 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -24,6 +24,7 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.BitSet;
+import java.util.Optional;
 
 import org.apache.hyracks.api.comm.IChannelControlBlock;
 import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
@@ -32,10 +33,15 @@
 import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
 import org.apache.hyracks.net.protocols.tcp.TCPConnection;
+import org.apache.hyracks.util.JSONUtil;
 import org.apache.hyracks.util.annotations.ThreadSafetyGuaranteedBy;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 /**
  * A {@link MultiplexedConnection} can be used by clients to create multiple "channels"
  * that can have independent full-duplex conversations.
@@ -442,4 +448,15 @@
     public InetSocketAddress getRemoteAddress() {
         return tcpConnection == null ? null : tcpConnection.getRemoteAddress();
     }
+
+    public synchronized Optional<JsonNode> getState() {
+        if (tcpConnection == null) {
+            return Optional.empty();
+        }
+        final ObjectNode state = JSONUtil.createObject();
+        state.put("remoteAddress", getRemoteAddress().toString());
+        final ArrayNode channels = cSet.getState();
+        state.set("channels", channels);
+        return Optional.of(state);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index c58cb86..4ee7e83 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -28,6 +28,11 @@
 import org.apache.hyracks.net.protocols.tcp.ITCPConnectionListener;
 import org.apache.hyracks.net.protocols.tcp.TCPConnection;
 import org.apache.hyracks.net.protocols.tcp.TCPEndpoint;
+import org.apache.hyracks.util.JSONUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
  * Multiplexed Connection Manager.
@@ -43,7 +48,8 @@
 
     private final int maxConnectionAttempts;
 
-    private final Map<InetSocketAddress, MultiplexedConnection> connectionMap;
+    private final Map<InetSocketAddress, MultiplexedConnection> outgoingConnectionMap;
+    private final Map<InetSocketAddress, MultiplexedConnection> incomingConnectionMap;
 
     private final TCPEndpoint tcpEndpoint;
 
@@ -69,13 +75,14 @@
         this.channelOpenListener = listener;
         this.maxConnectionAttempts = maxConnectionAttempts;
         this.channelInterfaceFatory = channelInterfaceFatory;
-        connectionMap = new HashMap<>();
+        outgoingConnectionMap = new HashMap<>();
+        incomingConnectionMap = new HashMap<>();
         this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() {
             @Override
             public void connectionEstablished(TCPConnection connection) {
                 MultiplexedConnection mConn;
                 synchronized (MuxDemux.this) {
-                    mConn = connectionMap.get(connection.getRemoteAddress());
+                    mConn = outgoingConnectionMap.get(connection.getRemoteAddress());
                 }
                 assert mConn != null;
                 mConn.setTCPConnection(connection);
@@ -89,17 +96,18 @@
                 mConn.setTCPConnection(connection);
                 connection.setEventListener(mConn);
                 connection.setAttachment(mConn);
+                incomingConnectionMap.put(connection.getRemoteAddress(), mConn);
             }
 
             @Override
             public void connectionFailure(InetSocketAddress remoteAddress, IOException error) {
                 MultiplexedConnection mConn;
                 synchronized (MuxDemux.this) {
-                    mConn = connectionMap.get(remoteAddress);
+                    mConn = outgoingConnectionMap.get(remoteAddress);
                     assert mConn != null;
                     int nConnectionAttempts = mConn.getConnectionAttempts();
                     if (nConnectionAttempts > MuxDemux.this.maxConnectionAttempts) {
-                        connectionMap.remove(remoteAddress);
+                        outgoingConnectionMap.remove(remoteAddress);
                         mConn.setConnectionFailure(new IOException(remoteAddress.toString() + ": " + error, error));
                     } else {
                         mConn.setConnectionAttempts(nConnectionAttempts + 1);
@@ -112,7 +120,9 @@
             public void connectionClosed(TCPConnection connection) {
                 synchronized (MuxDemux.this) {
                     if (connection.getType() == TCPConnection.ConnectionType.OUTGOING) {
-                        connectionMap.remove(connection.getRemoteAddress());
+                        outgoingConnectionMap.remove(connection.getRemoteAddress());
+                    } else if (connection.getType() == TCPConnection.ConnectionType.INCOMING) {
+                        incomingConnectionMap.remove(connection.getRemoteAddress());
                     }
                 }
             }
@@ -144,10 +154,10 @@
     public MultiplexedConnection connect(InetSocketAddress remoteAddress) throws InterruptedException, NetException {
         MultiplexedConnection mConn;
         synchronized (this) {
-            mConn = connectionMap.get(remoteAddress);
+            mConn = outgoingConnectionMap.get(remoteAddress);
             if (mConn == null) {
                 mConn = new MultiplexedConnection(this);
-                connectionMap.put(remoteAddress, mConn);
+                outgoingConnectionMap.put(remoteAddress, mConn);
                 tcpEndpoint.initiateConnection(remoteAddress);
             }
         }
@@ -186,4 +196,20 @@
     public IChannelInterfaceFactory getChannelInterfaceFactory() {
         return channelInterfaceFatory;
     }
+
+    public synchronized JsonNode getState() {
+        final ObjectNode state = JSONUtil.createObject();
+        state.put("localAddress", tcpEndpoint.getLocalAddress().toString());
+        final ArrayNode outgoingConnections = JSONUtil.createArray();
+        state.set("outgoingConnections", outgoingConnections);
+        for (MultiplexedConnection connection : outgoingConnectionMap.values()) {
+            connection.getState().ifPresent(outgoingConnections::add);
+        }
+        final ArrayNode incomingConnections = JSONUtil.createArray();
+        state.set("incomingConnections", incomingConnections);
+        for (MultiplexedConnection connection : incomingConnectionMap.values()) {
+            connection.getState().ifPresent(incomingConnections::add);
+        }
+        return state;
+    }
 }
\ No newline at end of file