[ASTERIXDB-2490][NET] Support Encrypted Multiplexed Connections

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Use SocketChannelFactory in multiplex connections
  to support both unencrypted and encrypted sockets.
- Adapt TCPEndpoint to socket channels that require
  handshake.
- Adapt test cases to API changes.

Change-Id: I9cbed93c162018bad17923d50d4987011cbba16c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3071
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
index 42ec795..f69c102 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
@@ -19,10 +19,10 @@
 package org.apache.asterix.messaging;
 
 import java.io.IOException;
-import java.nio.channels.SocketChannel;
 
 import org.apache.hyracks.api.comm.IBufferAcceptor;
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelReadInterface;
 
 public class MessagingChannelReadInterface extends AbstractChannelReadInterface {
@@ -32,7 +32,7 @@
     }
 
     @Override
-    public int read(SocketChannel sc, int size) throws IOException, NetException {
+    public int read(ISocketChannel sc, int size) throws IOException, NetException {
         while (true) {
             if (size <= 0) {
                 return size;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
index 357d761..4429219 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
@@ -19,9 +19,9 @@
 package org.apache.hyracks.api.comm;
 
 import java.io.IOException;
-import java.nio.channels.SocketChannel;
 
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
 
 /**
  * Represents the read interface of a {@link IChannelControlBlock}.
@@ -68,7 +68,7 @@
      * @throws IOException
      * @throws NetException
      */
-    public int read(SocketChannel sc, int size) throws IOException, NetException;
+    public int read(ISocketChannel sc, int size) throws IOException, NetException;
 
     /**
      * Sets the read credits of this {@link IChannelReadInterface}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
index c238ae3..5fe6ecb 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
@@ -23,6 +23,7 @@
 import java.net.SocketAddress;
 
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
@@ -35,11 +36,12 @@
 
     private final MuxDemux md;
 
-    public ClientNetworkManager(int nThreads) throws IOException {
+    public ClientNetworkManager(int nThreads, ISocketChannelFactory socketChannelFactory) {
         /* This is a connect only socket and does not listen to any incoming connections, so pass null to
          * localAddress and listener.
          */
-        md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS, FullFrameChannelInterfaceFactory.INSTANCE);
+        md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS, FullFrameChannelInterfaceFactory.INSTANCE,
+                socketChannelFactory);
     }
 
     public void start() throws IOException {
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
index a72573c..4d8767f 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
@@ -44,7 +44,7 @@
         NetworkAddress ddsAddress = hcc.getResultDirectoryAddress();
         resultDirectory = new ResultDirectory(ddsAddress.getAddress(), ddsAddress.getPort(), socketChannelFactory);
 
-        netManager = new ClientNetworkManager(nReaders);
+        netManager = new ClientNetworkManager(nReaders, socketChannelFactory);
         netManager.start();
 
         resultClientCtx = new ResultClientContext(frameSize);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index a92fcb6..317d59a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -273,12 +273,12 @@
         resultNetworkManager = new ResultNetworkManager(ncConfig.getResultListenAddress(),
                 ncConfig.getResultListenPort(), resultPartitionManager, ncConfig.getNetThreadCount(),
                 ncConfig.getNetBufferCount(), ncConfig.getResultPublicAddress(), ncConfig.getResultPublicPort(),
-                FullFrameChannelInterfaceFactory.INSTANCE);
+                FullFrameChannelInterfaceFactory.INSTANCE, networkSecurityManager.getSocketChannelFactory());
         if (ncConfig.getMessagingListenAddress() != null && serviceCtx.getMessagingChannelInterfaceFactory() != null) {
             messagingNetManager = new MessagingNetworkManager(this, ncConfig.getMessagingListenAddress(),
                     ncConfig.getMessagingListenPort(), ncConfig.getNetThreadCount(),
                     ncConfig.getMessagingPublicAddress(), ncConfig.getMessagingPublicPort(),
-                    serviceCtx.getMessagingChannelInterfaceFactory());
+                    serviceCtx.getMessagingChannelInterfaceFactory(), networkSecurityManager.getSocketChannelFactory());
         }
     }
 
@@ -292,7 +292,8 @@
         partitionManager = new PartitionManager(this);
         netManager = new NetworkManager(ncConfig.getDataListenAddress(), ncConfig.getDataListenPort(), partitionManager,
                 ncConfig.getNetThreadCount(), ncConfig.getNetBufferCount(), ncConfig.getDataPublicAddress(),
-                ncConfig.getDataPublicPort(), FullFrameChannelInterfaceFactory.INSTANCE);
+                ncConfig.getDataPublicPort(), FullFrameChannelInterfaceFactory.INSTANCE,
+                networkSecurityManager.getSocketChannelFactory());
         netManager.start();
         startApplication();
         init();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
index a37d131..4c7270c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
@@ -53,10 +54,11 @@
     private final Map<IChannelControlBlock, ICloseableBufferAcceptor> channelFullBufferAcceptor = new HashMap<>();
 
     public MessagingNetworkManager(NodeControllerService ncs, String inetAddress, int inetPort, int nThreads,
-            String publicInetAddress, int publicInetPort, IChannelInterfaceFactory channelInterfaceFactory) {
+            String publicInetAddress, int publicInetPort, IChannelInterfaceFactory channelInterfaceFactory,
+            ISocketChannelFactory socketChannelFactory) {
         this.ncs = ncs;
         md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
-                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
+                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory, socketChannelFactory);
         publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index 3298b78..6876618 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
@@ -60,11 +61,11 @@
 
     public NetworkManager(String inetAddress, int inetPort, PartitionManager partitionManager, int nThreads,
             int nBuffers, String publicInetAddress, int publicInetPort,
-            IChannelInterfaceFactory channelInterfaceFactory) {
+            IChannelInterfaceFactory channelInterfaceFactory, ISocketChannelFactory socketChannelFactory) {
         this.partitionManager = partitionManager;
         this.nBuffers = nBuffers;
         md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
-                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
+                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory, socketChannelFactory);
         // Just save these values for the moment; may be reset in start()
         publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
index ee821d6..fe8f2af 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.api.result.IResultPartitionManager;
 import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
@@ -73,11 +74,11 @@
      */
     public ResultNetworkManager(String inetAddress, int inetPort, IResultPartitionManager partitionManager,
             int nThreads, int nBuffers, String publicInetAddress, int publicInetPort,
-            IChannelInterfaceFactory channelInterfaceFactory) {
+            IChannelInterfaceFactory channelInterfaceFactory, ISocketChannelFactory socketChannelFactory) {
         this.partitionManager = partitionManager;
         this.nBuffers = nBuffers;
         md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
-                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
+                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory, socketChannelFactory);
         // Just save these values for the moment; may be reset in start()
         publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 205ecfe..764ec7d 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -215,10 +215,10 @@
                 SelectionKey key = i.next();
                 i.remove();
                 final SelectableChannel sc = key.channel();
-                // do not attempt to read until handle is set (e.g. after handshake is completed)
+                // do not attempt to read/write until handle is set (e.g. after handshake is completed)
                 if (key.isReadable() && key.attachment() != null) {
                     read(key);
-                } else if (key.isWritable()) {
+                } else if (key.isWritable() && key.attachment() != null) {
                     write(key);
                 } else if (key.isAcceptable()) {
                     assert sc == serverSocketChannel;
diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
index 2cddf45..2dca39b 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
@@ -47,6 +47,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-ipc</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 3a35212..0a26097 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.channels.SocketChannel;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hyracks.api.comm.IChannelControlBlock;
@@ -28,6 +27,7 @@
 import org.apache.hyracks.api.comm.IChannelReadInterface;
 import org.apache.hyracks.api.comm.IChannelWriteInterface;
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection.WriterState;
 import org.apache.hyracks.util.JSONUtil;
 
@@ -91,7 +91,7 @@
         wi.writeComplete();
     }
 
-    synchronized int read(SocketChannel sc, int size) throws IOException, NetException {
+    synchronized int read(ISocketChannel sc, int size) throws IOException, NetException {
         return ri.read(sc, size);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index 3ba8627..ab6dbf1 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -20,13 +20,13 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.hyracks.api.comm.IBufferFactory;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -52,7 +52,7 @@
     }
 
     @Override
-    public int read(SocketChannel sc, int size) throws IOException, NetException {
+    public int read(ISocketChannel sc, int size) throws IOException, NetException {
         while (true) {
             if (size <= 0) {
                 return size;
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 96ccafb..f7c3826 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -22,7 +22,6 @@
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
 import java.util.BitSet;
 import java.util.Optional;
 
@@ -31,6 +30,7 @@
 import org.apache.hyracks.api.comm.IConnectionWriterState;
 import org.apache.hyracks.api.comm.MuxDemuxCommand;
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
 import org.apache.hyracks.net.protocols.tcp.TCPConnection;
 import org.apache.hyracks.util.JSONUtil;
@@ -160,6 +160,8 @@
 
         private IChannelControlBlock ccb;
 
+        private boolean pendingWriteCompletion = false;
+
         public WriterState() {
             cmdWriteBuffer = ByteBuffer.allocateDirect(MuxDemuxCommand.COMMAND_SIZE);
             cmdWriteBuffer.flip();
@@ -168,7 +170,8 @@
         }
 
         boolean writePending() {
-            return cmdWriteBuffer.remaining() > 0 || (pendingBuffer != null && pendingWriteSize > 0);
+            return cmdWriteBuffer.remaining() > 0 || (pendingBuffer != null && pendingWriteSize > 0)
+                    || pendingWriteCompletion;
         }
 
         @Override
@@ -181,7 +184,10 @@
             this.ccb = ccb;
         }
 
-        boolean performPendingWrite(SocketChannel sc) throws IOException {
+        boolean performPendingWrite(ISocketChannel sc) throws IOException {
+            if (pendingWriteCompletion && !sc.completeWrite()) {
+                return false;
+            }
             int len = cmdWriteBuffer.remaining();
             if (len > 0) {
                 int written = sc.write(cmdWriteBuffer);
@@ -209,10 +215,16 @@
                 pendingBuffer = null;
                 pendingWriteSize = 0;
             }
+            // must ensure all pending writes are performed before calling ccb.writeComplete()
+            if (sc.isPendingWrite()) {
+                pendingWriteCompletion = true;
+                return false;
+            }
             if (ccb != null) {
                 ccb.writeComplete();
                 ccb = null;
             }
+            pendingWriteCompletion = false;
             return true;
         }
 
@@ -223,7 +235,7 @@
     }
 
     void driveWriterStateMachine() throws IOException, NetException {
-        SocketChannel sc = tcpConnection.getSocketChannel();
+        ISocketChannel sc = tcpConnection.getSocketChannel();
         if (writerState.writePending()) {
             if (!writerState.performPendingWrite(sc)) {
                 return;
@@ -339,9 +351,9 @@
     }
 
     void driveReaderStateMachine() throws IOException, NetException {
-        SocketChannel sc = tcpConnection.getSocketChannel();
+        ISocketChannel sc = tcpConnection.getSocketChannel();
         int chunksRead = 0;
-        while (chunksRead < MAX_CHUNKS_READ_PER_CYCLE) {
+        while (chunksRead < MAX_CHUNKS_READ_PER_CYCLE || sc.isPendingRead()) {
             if (readerState.readBuffer.remaining() > 0) {
                 int read = sc.read(readerState.readBuffer);
                 if (read < 0) {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index 4ee7e83..39ce408 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -25,6 +25,7 @@
 
 import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.net.protocols.tcp.ITCPConnectionListener;
 import org.apache.hyracks.net.protocols.tcp.TCPConnection;
 import org.apache.hyracks.net.protocols.tcp.TCPEndpoint;
@@ -68,13 +69,18 @@
      *            - Number of threads to use for data transfer
      * @param maxConnectionAttempts
      *            - Maximum number of connection attempts
+     * @param channelInterfaceFactory
+     *            - The channel interface factory
+     * @param socketChannelFactory
+     *            - The socket channel factory
      */
     public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener, int nThreads,
-            int maxConnectionAttempts, IChannelInterfaceFactory channelInterfaceFatory) {
+            int maxConnectionAttempts, IChannelInterfaceFactory channelInterfaceFactory,
+            ISocketChannelFactory socketChannelFactory) {
         this.localAddress = localAddress;
         this.channelOpenListener = listener;
         this.maxConnectionAttempts = maxConnectionAttempts;
-        this.channelInterfaceFatory = channelInterfaceFatory;
+        this.channelInterfaceFatory = channelInterfaceFactory;
         outgoingConnectionMap = new HashMap<>();
         incomingConnectionMap = new HashMap<>();
         this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() {
@@ -126,7 +132,7 @@
                     }
                 }
             }
-        }, nThreads);
+        }, nThreads, socketChannelFactory);
         perfCounters = new MuxDemuxPerformanceCounters();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
index ff4627a..1814edb 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
@@ -22,8 +22,8 @@
 import java.net.InetSocketAddress;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
 
+import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -38,7 +38,7 @@
 
     private final TCPEndpoint endpoint;
 
-    private final SocketChannel channel;
+    private final ISocketChannel channel;
     private final InetSocketAddress remoteAddress;
     private final SelectionKey key;
 
@@ -50,26 +50,26 @@
 
     private ConnectionType type;
 
-    public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector,
+    public TCPConnection(TCPEndpoint endpoint, ISocketChannel channel, SelectionKey key, Selector selector,
             ConnectionType type) {
         this.endpoint = endpoint;
         this.channel = channel;
         this.key = key;
         this.selector = selector;
         this.type = type;
-        remoteAddress = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
+        remoteAddress = (InetSocketAddress) channel.getSocketChannel().socket().getRemoteSocketAddress();
     }
 
     public TCPEndpoint getEndpoint() {
         return endpoint;
     }
 
-    public SocketChannel getSocketChannel() {
+    public ISocketChannel getSocketChannel() {
         return channel;
     }
 
     public InetSocketAddress getLocalAddress() {
-        return (InetSocketAddress) channel.socket().getLocalSocketAddress();
+        return (InetSocketAddress) channel.getSocketChannel().socket().getLocalSocketAddress();
     }
 
     public InetSocketAddress getRemoteAddress() {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index 05e2175..faec871 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -19,6 +19,8 @@
 package org.apache.hyracks.net.protocols.tcp;
 
 import static org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType;
+import static org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType.INCOMING;
+import static org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType.OUTGOING;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -31,7 +33,10 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -52,9 +57,13 @@
 
     private int nextThread;
 
-    public TCPEndpoint(ITCPConnectionListener connectionListener, int nThreads) {
+    private final ISocketChannelFactory socketChannelFactory;
+
+    public TCPEndpoint(ITCPConnectionListener connectionListener, int nThreads,
+            ISocketChannelFactory socketChannelFactory) {
         this.connectionListener = connectionListener;
         this.nThreads = nThreads;
+        this.socketChannelFactory = socketChannelFactory;
     }
 
     public void start(InetSocketAddress localAddress) throws IOException {
@@ -113,6 +122,8 @@
 
         private final List<SocketChannel> workingIncomingConnections;
 
+        private final List<PendingHandshakeConnection> handshakeCompletedConnections;
+
         private final Selector selector;
 
         public IOThread() throws IOException {
@@ -123,6 +134,7 @@
             this.workingPendingConnections = new ArrayList<>();
             this.incomingConnections = new ArrayList<>();
             this.workingIncomingConnections = new ArrayList<>();
+            handshakeCompletedConnections = new ArrayList<>();
             selector = Selector.open();
         }
 
@@ -151,8 +163,7 @@
                                     SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);
                                     key.attach(address);
                                 } else {
-                                    SelectionKey key = channel.register(selector, 0);
-                                    createConnection(key, channel);
+                                    socketConnected(address, channel);
                                 }
                             }
                         }
@@ -161,16 +172,16 @@
                     if (!workingIncomingConnections.isEmpty()) {
                         for (SocketChannel channel : workingIncomingConnections) {
                             register(channel);
-                            SelectionKey sKey = channel.register(selector, 0);
-                            TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector,
-                                    ConnectionType.INCOMING);
-                            sKey.attach(connection);
-                            synchronized (connectionListener) {
-                                connectionListener.acceptedConnection(connection);
-                            }
+                            connectionReceived(channel);
                         }
                         workingIncomingConnections.clear();
                     }
+                    if (!handshakeCompletedConnections.isEmpty()) {
+                        for (final PendingHandshakeConnection conn : handshakeCompletedConnections) {
+                            handshakeCompleted(conn);
+                        }
+                        handshakeCompletedConnections.clear();
+                    }
                     if (n > 0) {
                         Iterator<SelectionKey> i = selector.selectedKeys().iterator();
                         while (i.hasNext()) {
@@ -211,7 +222,7 @@
                                     }
                                 }
                                 if (finishConnect) {
-                                    createConnection(key, channel);
+                                    socketConnected((InetSocketAddress) key.attachment(), channel);
                                 }
                             }
                         }
@@ -222,13 +233,29 @@
             }
         }
 
-        private void createConnection(SelectionKey key, SocketChannel channel) {
-            TCPConnection connection =
-                    new TCPConnection(TCPEndpoint.this, channel, key, selector, ConnectionType.OUTGOING);
-            key.attach(connection);
-            key.interestOps(0);
-            synchronized (connectionListener) {
-                connectionListener.connectionEstablished(connection);
+        private void handshakeCompleted(PendingHandshakeConnection conn) {
+            try {
+                if (conn.handshakeSuccess) {
+                    final SelectionKey key = conn.socketChannel.getSocketChannel().register(selector, 0);
+                    final TCPConnection tcpConn =
+                            new TCPConnection(TCPEndpoint.this, conn.socketChannel, key, selector, conn.type);
+                    key.attach(tcpConn);
+                    switch (conn.type) {
+                        case INCOMING:
+                            connectionAccepted(tcpConn);
+                            break;
+                        case OUTGOING:
+                            connectionEstablished(tcpConn);
+                            break;
+                        default:
+                            throw new IllegalStateException("Unknown connection type: " + conn.type);
+                    }
+                } else {
+                    handleHandshakeFailure(conn);
+                }
+            } catch (Exception e) {
+                LOGGER.error("failed to establish connection after handshake", e);
+                handleHandshakeFailure(conn);
             }
         }
 
@@ -262,5 +289,75 @@
             NetworkUtil.configure(channel);
             channel.configureBlocking(false);
         }
+
+        private void socketConnected(InetSocketAddress remoteAddress, SocketChannel channel) {
+            final ISocketChannel socketChannel = socketChannelFactory.createClientChannel(channel);
+            final PendingHandshakeConnection conn =
+                    new PendingHandshakeConnection(socketChannel, remoteAddress, OUTGOING);
+            if (socketChannel.requiresHandshake()) {
+                asyncHandshake(conn);
+            } else {
+                conn.handshakeSuccess = true;
+                handshakeCompletedConnections.add(conn);
+            }
+        }
+
+        private void connectionReceived(SocketChannel channel) {
+            final ISocketChannel socketChannel = socketChannelFactory.createServerChannel(channel);
+            final PendingHandshakeConnection conn = new PendingHandshakeConnection(socketChannel, null, INCOMING);
+            if (socketChannel.requiresHandshake()) {
+                asyncHandshake(conn);
+            } else {
+                conn.handshakeSuccess = true;
+                handshakeCompletedConnections.add(conn);
+            }
+        }
+
+        private void asyncHandshake(PendingHandshakeConnection connection) {
+            CompletableFuture.supplyAsync(connection.socketChannel::handshake).exceptionally(ex -> false)
+                    .thenAccept(handshakeSuccess -> handleHandshakeCompletion(handshakeSuccess, connection));
+        }
+
+        private void handleHandshakeCompletion(Boolean handshakeSuccess, PendingHandshakeConnection conn) {
+            conn.handshakeSuccess = handshakeSuccess;
+            handshakeCompletedConnections.add(conn);
+            selector.wakeup();
+        }
+
+        private void connectionEstablished(TCPConnection connection) {
+            synchronized (connectionListener) {
+                connectionListener.connectionEstablished(connection);
+            }
+        }
+
+        private void connectionAccepted(TCPConnection connection) {
+            synchronized (connectionListener) {
+                connectionListener.acceptedConnection(connection);
+            }
+        }
+
+        private void handleHandshakeFailure(PendingHandshakeConnection conn) {
+            NetworkUtil.closeQuietly(conn.socketChannel);
+            if (conn.type == OUTGOING) {
+                synchronized (connectionListener) {
+                    connectionListener.connectionFailure(conn.address, new IOException("handshake failure"));
+                }
+            }
+        }
+    }
+
+    private static class PendingHandshakeConnection {
+
+        private final ISocketChannel socketChannel;
+        private final ConnectionType type;
+        private final InetSocketAddress address;
+        private boolean handshakeSuccess = false;
+
+        PendingHandshakeConnection(ISocketChannel socketChannel, InetSocketAddress address,
+                ConnectionType connectionType) {
+            this.socketChannel = socketChannel;
+            this.type = connectionType;
+            this.address = address;
+        }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
index f9a610c..6d9e7c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.api.comm.IBufferFactory;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
 import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelReadInterface;
 import org.apache.hyracks.util.StorageUtil;
@@ -61,7 +62,7 @@
         readInterface.setFullBufferAcceptor(new ReadFullBufferAcceptor(fullBufferQ));
         readInterface.setBufferFactory(bufferFactory, RECEIVER_BUFFER_COUNT, FRAME_SIZE);
         Assert.assertEquals(EXPECTED_CHANNEL_CREDIT, channelCredit.get());
-        final SocketChannel socketChannel = mockSocketChannel(ccb);
+        final ISocketChannel socketChannel = mockSocketChannel(ccb);
         final Thread networkFrameReader = new Thread(() -> {
             try {
                 int framesRead = FRAMES_TO_READ_COUNT;
@@ -124,8 +125,8 @@
         return ccb;
     }
 
-    private SocketChannel mockSocketChannel(IChannelControlBlock ccb) throws IOException {
-        final SocketChannel sc = Mockito.mock(SocketChannel.class);
+    private ISocketChannel mockSocketChannel(IChannelControlBlock ccb) throws IOException {
+        final ISocketChannel sc = Mockito.mock(ISocketChannel.class);
         Mockito.when(sc.read(Mockito.any(ByteBuffer.class))).thenAnswer(invocation -> {
             ccb.addPendingCredits(-FRAME_SIZE);
             final ByteBuffer buffer = invocation.getArgumentAt(0, ByteBuffer.class);
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
index 8f582ba..ec1d21c 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
@@ -27,6 +27,7 @@
 
 import org.apache.hyracks.api.comm.IBufferAcceptor;
 import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
 import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
@@ -162,7 +163,7 @@
             }
         };
         return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), md1OpenListener, 1, 5,
-                FullFrameChannelInterfaceFactory.INSTANCE);
+                FullFrameChannelInterfaceFactory.INSTANCE, PlainSocketChannelFactory.INSTANCE);
     }
 
     private class ChannelIO {