[ASTERIXDB-2490][NET] Support Encrypted Multiplexed Connections
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Use SocketChannelFactory in multiplex connections
to support both unencrypted and encrypted sockets.
- Adapt TCPEndpoint to socket channels that require
handshake.
- Adapt test cases to API changes.
Change-Id: I9cbed93c162018bad17923d50d4987011cbba16c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3071
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
index 42ec795..f69c102 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
@@ -19,10 +19,10 @@
package org.apache.asterix.messaging;
import java.io.IOException;
-import java.nio.channels.SocketChannel;
import org.apache.hyracks.api.comm.IBufferAcceptor;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelReadInterface;
public class MessagingChannelReadInterface extends AbstractChannelReadInterface {
@@ -32,7 +32,7 @@
}
@Override
- public int read(SocketChannel sc, int size) throws IOException, NetException {
+ public int read(ISocketChannel sc, int size) throws IOException, NetException {
while (true) {
if (size <= 0) {
return size;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
index 357d761..4429219 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
@@ -19,9 +19,9 @@
package org.apache.hyracks.api.comm;
import java.io.IOException;
-import java.nio.channels.SocketChannel;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
/**
* Represents the read interface of a {@link IChannelControlBlock}.
@@ -68,7 +68,7 @@
* @throws IOException
* @throws NetException
*/
- public int read(SocketChannel sc, int size) throws IOException, NetException;
+ public int read(ISocketChannel sc, int size) throws IOException, NetException;
/**
* Sets the read credits of this {@link IChannelReadInterface}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
index c238ae3..5fe6ecb 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
@@ -23,6 +23,7 @@
import java.net.SocketAddress;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
@@ -35,11 +36,12 @@
private final MuxDemux md;
- public ClientNetworkManager(int nThreads) throws IOException {
+ public ClientNetworkManager(int nThreads, ISocketChannelFactory socketChannelFactory) {
/* This is a connect only socket and does not listen to any incoming connections, so pass null to
* localAddress and listener.
*/
- md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS, FullFrameChannelInterfaceFactory.INSTANCE);
+ md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS, FullFrameChannelInterfaceFactory.INSTANCE,
+ socketChannelFactory);
}
public void start() throws IOException {
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
index a72573c..4d8767f 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
@@ -44,7 +44,7 @@
NetworkAddress ddsAddress = hcc.getResultDirectoryAddress();
resultDirectory = new ResultDirectory(ddsAddress.getAddress(), ddsAddress.getPort(), socketChannelFactory);
- netManager = new ClientNetworkManager(nReaders);
+ netManager = new ClientNetworkManager(nReaders, socketChannelFactory);
netManager.start();
resultClientCtx = new ResultClientContext(frameSize);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index a92fcb6..317d59a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -273,12 +273,12 @@
resultNetworkManager = new ResultNetworkManager(ncConfig.getResultListenAddress(),
ncConfig.getResultListenPort(), resultPartitionManager, ncConfig.getNetThreadCount(),
ncConfig.getNetBufferCount(), ncConfig.getResultPublicAddress(), ncConfig.getResultPublicPort(),
- FullFrameChannelInterfaceFactory.INSTANCE);
+ FullFrameChannelInterfaceFactory.INSTANCE, networkSecurityManager.getSocketChannelFactory());
if (ncConfig.getMessagingListenAddress() != null && serviceCtx.getMessagingChannelInterfaceFactory() != null) {
messagingNetManager = new MessagingNetworkManager(this, ncConfig.getMessagingListenAddress(),
ncConfig.getMessagingListenPort(), ncConfig.getNetThreadCount(),
ncConfig.getMessagingPublicAddress(), ncConfig.getMessagingPublicPort(),
- serviceCtx.getMessagingChannelInterfaceFactory());
+ serviceCtx.getMessagingChannelInterfaceFactory(), networkSecurityManager.getSocketChannelFactory());
}
}
@@ -292,7 +292,8 @@
partitionManager = new PartitionManager(this);
netManager = new NetworkManager(ncConfig.getDataListenAddress(), ncConfig.getDataListenPort(), partitionManager,
ncConfig.getNetThreadCount(), ncConfig.getNetBufferCount(), ncConfig.getDataPublicAddress(),
- ncConfig.getDataPublicPort(), FullFrameChannelInterfaceFactory.INSTANCE);
+ ncConfig.getDataPublicPort(), FullFrameChannelInterfaceFactory.INSTANCE,
+ networkSecurityManager.getSocketChannelFactory());
netManager.start();
startApplication();
init();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
index a37d131..4c7270c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
@@ -32,6 +32,7 @@
import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
@@ -53,10 +54,11 @@
private final Map<IChannelControlBlock, ICloseableBufferAcceptor> channelFullBufferAcceptor = new HashMap<>();
public MessagingNetworkManager(NodeControllerService ncs, String inetAddress, int inetPort, int nThreads,
- String publicInetAddress, int publicInetPort, IChannelInterfaceFactory channelInterfaceFactory) {
+ String publicInetAddress, int publicInetPort, IChannelInterfaceFactory channelInterfaceFactory,
+ ISocketChannelFactory socketChannelFactory) {
this.ncs = ncs;
md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
- MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
+ MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory, socketChannelFactory);
publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index 3298b78..6876618 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.exceptions.NetException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
import org.apache.hyracks.comm.channels.NetworkOutputChannel;
@@ -60,11 +61,11 @@
public NetworkManager(String inetAddress, int inetPort, PartitionManager partitionManager, int nThreads,
int nBuffers, String publicInetAddress, int publicInetPort,
- IChannelInterfaceFactory channelInterfaceFactory) {
+ IChannelInterfaceFactory channelInterfaceFactory, ISocketChannelFactory socketChannelFactory) {
this.partitionManager = partitionManager;
this.nBuffers = nBuffers;
md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
- MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
+ MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory, socketChannelFactory);
// Just save these values for the moment; may be reset in start()
publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
index ee821d6..fe8f2af 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.exceptions.NetException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.api.result.IResultPartitionManager;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
@@ -73,11 +74,11 @@
*/
public ResultNetworkManager(String inetAddress, int inetPort, IResultPartitionManager partitionManager,
int nThreads, int nBuffers, String publicInetAddress, int publicInetPort,
- IChannelInterfaceFactory channelInterfaceFactory) {
+ IChannelInterfaceFactory channelInterfaceFactory, ISocketChannelFactory socketChannelFactory) {
this.partitionManager = partitionManager;
this.nBuffers = nBuffers;
md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
- MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
+ MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory, socketChannelFactory);
// Just save these values for the moment; may be reset in start()
publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 205ecfe..764ec7d 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -215,10 +215,10 @@
SelectionKey key = i.next();
i.remove();
final SelectableChannel sc = key.channel();
- // do not attempt to read until handle is set (e.g. after handshake is completed)
+ // do not attempt to read/write until handle is set (e.g. after handshake is completed)
if (key.isReadable() && key.attachment() != null) {
read(key);
- } else if (key.isWritable()) {
+ } else if (key.isWritable() && key.attachment() != null) {
write(key);
} else if (key.isAcceptable()) {
assert sc == serverSocketChannel;
diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
index 2cddf45..2dca39b 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
@@ -47,6 +47,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-ipc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 3a35212..0a26097 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hyracks.api.comm.IChannelControlBlock;
@@ -28,6 +27,7 @@
import org.apache.hyracks.api.comm.IChannelReadInterface;
import org.apache.hyracks.api.comm.IChannelWriteInterface;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection.WriterState;
import org.apache.hyracks.util.JSONUtil;
@@ -91,7 +91,7 @@
wi.writeComplete();
}
- synchronized int read(SocketChannel sc, int size) throws IOException, NetException {
+ synchronized int read(ISocketChannel sc, int size) throws IOException, NetException {
return ri.read(sc, size);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index 3ba8627..ab6dbf1 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -20,13 +20,13 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.hyracks.api.comm.IBufferFactory;
import org.apache.hyracks.api.comm.IChannelControlBlock;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,7 +52,7 @@
}
@Override
- public int read(SocketChannel sc, int size) throws IOException, NetException {
+ public int read(ISocketChannel sc, int size) throws IOException, NetException {
while (true) {
if (size <= 0) {
return size;
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 96ccafb..f7c3826 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -22,7 +22,6 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
import java.util.BitSet;
import java.util.Optional;
@@ -31,6 +30,7 @@
import org.apache.hyracks.api.comm.IConnectionWriterState;
import org.apache.hyracks.api.comm.MuxDemuxCommand;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
import org.apache.hyracks.net.protocols.tcp.TCPConnection;
import org.apache.hyracks.util.JSONUtil;
@@ -160,6 +160,8 @@
private IChannelControlBlock ccb;
+ private boolean pendingWriteCompletion = false;
+
public WriterState() {
cmdWriteBuffer = ByteBuffer.allocateDirect(MuxDemuxCommand.COMMAND_SIZE);
cmdWriteBuffer.flip();
@@ -168,7 +170,8 @@
}
boolean writePending() {
- return cmdWriteBuffer.remaining() > 0 || (pendingBuffer != null && pendingWriteSize > 0);
+ return cmdWriteBuffer.remaining() > 0 || (pendingBuffer != null && pendingWriteSize > 0)
+ || pendingWriteCompletion;
}
@Override
@@ -181,7 +184,10 @@
this.ccb = ccb;
}
- boolean performPendingWrite(SocketChannel sc) throws IOException {
+ boolean performPendingWrite(ISocketChannel sc) throws IOException {
+ if (pendingWriteCompletion && !sc.completeWrite()) {
+ return false;
+ }
int len = cmdWriteBuffer.remaining();
if (len > 0) {
int written = sc.write(cmdWriteBuffer);
@@ -209,10 +215,16 @@
pendingBuffer = null;
pendingWriteSize = 0;
}
+ // must ensure all pending writes are performed before calling ccb.writeComplete()
+ if (sc.isPendingWrite()) {
+ pendingWriteCompletion = true;
+ return false;
+ }
if (ccb != null) {
ccb.writeComplete();
ccb = null;
}
+ pendingWriteCompletion = false;
return true;
}
@@ -223,7 +235,7 @@
}
void driveWriterStateMachine() throws IOException, NetException {
- SocketChannel sc = tcpConnection.getSocketChannel();
+ ISocketChannel sc = tcpConnection.getSocketChannel();
if (writerState.writePending()) {
if (!writerState.performPendingWrite(sc)) {
return;
@@ -339,9 +351,9 @@
}
void driveReaderStateMachine() throws IOException, NetException {
- SocketChannel sc = tcpConnection.getSocketChannel();
+ ISocketChannel sc = tcpConnection.getSocketChannel();
int chunksRead = 0;
- while (chunksRead < MAX_CHUNKS_READ_PER_CYCLE) {
+ while (chunksRead < MAX_CHUNKS_READ_PER_CYCLE || sc.isPendingRead()) {
if (readerState.readBuffer.remaining() > 0) {
int read = sc.read(readerState.readBuffer);
if (read < 0) {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index 4ee7e83..39ce408 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.net.protocols.tcp.ITCPConnectionListener;
import org.apache.hyracks.net.protocols.tcp.TCPConnection;
import org.apache.hyracks.net.protocols.tcp.TCPEndpoint;
@@ -68,13 +69,18 @@
* - Number of threads to use for data transfer
* @param maxConnectionAttempts
* - Maximum number of connection attempts
+ * @param channelInterfaceFactory
+ * - The channel interface factory
+ * @param socketChannelFactory
+ * - The socket channel factory
*/
public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener, int nThreads,
- int maxConnectionAttempts, IChannelInterfaceFactory channelInterfaceFatory) {
+ int maxConnectionAttempts, IChannelInterfaceFactory channelInterfaceFactory,
+ ISocketChannelFactory socketChannelFactory) {
this.localAddress = localAddress;
this.channelOpenListener = listener;
this.maxConnectionAttempts = maxConnectionAttempts;
- this.channelInterfaceFatory = channelInterfaceFatory;
+ this.channelInterfaceFatory = channelInterfaceFactory;
outgoingConnectionMap = new HashMap<>();
incomingConnectionMap = new HashMap<>();
this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() {
@@ -126,7 +132,7 @@
}
}
}
- }, nThreads);
+ }, nThreads, socketChannelFactory);
perfCounters = new MuxDemuxPerformanceCounters();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
index ff4627a..1814edb 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
@@ -22,8 +22,8 @@
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
+import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -38,7 +38,7 @@
private final TCPEndpoint endpoint;
- private final SocketChannel channel;
+ private final ISocketChannel channel;
private final InetSocketAddress remoteAddress;
private final SelectionKey key;
@@ -50,26 +50,26 @@
private ConnectionType type;
- public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector,
+ public TCPConnection(TCPEndpoint endpoint, ISocketChannel channel, SelectionKey key, Selector selector,
ConnectionType type) {
this.endpoint = endpoint;
this.channel = channel;
this.key = key;
this.selector = selector;
this.type = type;
- remoteAddress = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
+ remoteAddress = (InetSocketAddress) channel.getSocketChannel().socket().getRemoteSocketAddress();
}
public TCPEndpoint getEndpoint() {
return endpoint;
}
- public SocketChannel getSocketChannel() {
+ public ISocketChannel getSocketChannel() {
return channel;
}
public InetSocketAddress getLocalAddress() {
- return (InetSocketAddress) channel.socket().getLocalSocketAddress();
+ return (InetSocketAddress) channel.getSocketChannel().socket().getLocalSocketAddress();
}
public InetSocketAddress getRemoteAddress() {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index 05e2175..faec871 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -19,6 +19,8 @@
package org.apache.hyracks.net.protocols.tcp;
import static org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType;
+import static org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType.INCOMING;
+import static org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType.OUTGOING;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -31,7 +33,10 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.util.NetworkUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,9 +57,13 @@
private int nextThread;
- public TCPEndpoint(ITCPConnectionListener connectionListener, int nThreads) {
+ private final ISocketChannelFactory socketChannelFactory;
+
+ public TCPEndpoint(ITCPConnectionListener connectionListener, int nThreads,
+ ISocketChannelFactory socketChannelFactory) {
this.connectionListener = connectionListener;
this.nThreads = nThreads;
+ this.socketChannelFactory = socketChannelFactory;
}
public void start(InetSocketAddress localAddress) throws IOException {
@@ -113,6 +122,8 @@
private final List<SocketChannel> workingIncomingConnections;
+ private final List<PendingHandshakeConnection> handshakeCompletedConnections;
+
private final Selector selector;
public IOThread() throws IOException {
@@ -123,6 +134,7 @@
this.workingPendingConnections = new ArrayList<>();
this.incomingConnections = new ArrayList<>();
this.workingIncomingConnections = new ArrayList<>();
+ handshakeCompletedConnections = new ArrayList<>();
selector = Selector.open();
}
@@ -151,8 +163,7 @@
SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);
key.attach(address);
} else {
- SelectionKey key = channel.register(selector, 0);
- createConnection(key, channel);
+ socketConnected(address, channel);
}
}
}
@@ -161,16 +172,16 @@
if (!workingIncomingConnections.isEmpty()) {
for (SocketChannel channel : workingIncomingConnections) {
register(channel);
- SelectionKey sKey = channel.register(selector, 0);
- TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector,
- ConnectionType.INCOMING);
- sKey.attach(connection);
- synchronized (connectionListener) {
- connectionListener.acceptedConnection(connection);
- }
+ connectionReceived(channel);
}
workingIncomingConnections.clear();
}
+ if (!handshakeCompletedConnections.isEmpty()) {
+ for (final PendingHandshakeConnection conn : handshakeCompletedConnections) {
+ handshakeCompleted(conn);
+ }
+ handshakeCompletedConnections.clear();
+ }
if (n > 0) {
Iterator<SelectionKey> i = selector.selectedKeys().iterator();
while (i.hasNext()) {
@@ -211,7 +222,7 @@
}
}
if (finishConnect) {
- createConnection(key, channel);
+ socketConnected((InetSocketAddress) key.attachment(), channel);
}
}
}
@@ -222,13 +233,29 @@
}
}
- private void createConnection(SelectionKey key, SocketChannel channel) {
- TCPConnection connection =
- new TCPConnection(TCPEndpoint.this, channel, key, selector, ConnectionType.OUTGOING);
- key.attach(connection);
- key.interestOps(0);
- synchronized (connectionListener) {
- connectionListener.connectionEstablished(connection);
+ private void handshakeCompleted(PendingHandshakeConnection conn) {
+ try {
+ if (conn.handshakeSuccess) {
+ final SelectionKey key = conn.socketChannel.getSocketChannel().register(selector, 0);
+ final TCPConnection tcpConn =
+ new TCPConnection(TCPEndpoint.this, conn.socketChannel, key, selector, conn.type);
+ key.attach(tcpConn);
+ switch (conn.type) {
+ case INCOMING:
+ connectionAccepted(tcpConn);
+ break;
+ case OUTGOING:
+ connectionEstablished(tcpConn);
+ break;
+ default:
+ throw new IllegalStateException("Unknown connection type: " + conn.type);
+ }
+ } else {
+ handleHandshakeFailure(conn);
+ }
+ } catch (Exception e) {
+ LOGGER.error("failed to establish connection after handshake", e);
+ handleHandshakeFailure(conn);
}
}
@@ -262,5 +289,75 @@
NetworkUtil.configure(channel);
channel.configureBlocking(false);
}
+
+ private void socketConnected(InetSocketAddress remoteAddress, SocketChannel channel) {
+ final ISocketChannel socketChannel = socketChannelFactory.createClientChannel(channel);
+ final PendingHandshakeConnection conn =
+ new PendingHandshakeConnection(socketChannel, remoteAddress, OUTGOING);
+ if (socketChannel.requiresHandshake()) {
+ asyncHandshake(conn);
+ } else {
+ conn.handshakeSuccess = true;
+ handshakeCompletedConnections.add(conn);
+ }
+ }
+
+ private void connectionReceived(SocketChannel channel) {
+ final ISocketChannel socketChannel = socketChannelFactory.createServerChannel(channel);
+ final PendingHandshakeConnection conn = new PendingHandshakeConnection(socketChannel, null, INCOMING);
+ if (socketChannel.requiresHandshake()) {
+ asyncHandshake(conn);
+ } else {
+ conn.handshakeSuccess = true;
+ handshakeCompletedConnections.add(conn);
+ }
+ }
+
+ private void asyncHandshake(PendingHandshakeConnection connection) {
+ CompletableFuture.supplyAsync(connection.socketChannel::handshake).exceptionally(ex -> false)
+ .thenAccept(handshakeSuccess -> handleHandshakeCompletion(handshakeSuccess, connection));
+ }
+
+ private void handleHandshakeCompletion(Boolean handshakeSuccess, PendingHandshakeConnection conn) {
+ conn.handshakeSuccess = handshakeSuccess;
+ handshakeCompletedConnections.add(conn);
+ selector.wakeup();
+ }
+
+ private void connectionEstablished(TCPConnection connection) {
+ synchronized (connectionListener) {
+ connectionListener.connectionEstablished(connection);
+ }
+ }
+
+ private void connectionAccepted(TCPConnection connection) {
+ synchronized (connectionListener) {
+ connectionListener.acceptedConnection(connection);
+ }
+ }
+
+ private void handleHandshakeFailure(PendingHandshakeConnection conn) {
+ NetworkUtil.closeQuietly(conn.socketChannel);
+ if (conn.type == OUTGOING) {
+ synchronized (connectionListener) {
+ connectionListener.connectionFailure(conn.address, new IOException("handshake failure"));
+ }
+ }
+ }
+ }
+
+ private static class PendingHandshakeConnection {
+
+ private final ISocketChannel socketChannel;
+ private final ConnectionType type;
+ private final InetSocketAddress address;
+ private boolean handshakeSuccess = false;
+
+ PendingHandshakeConnection(ISocketChannel socketChannel, InetSocketAddress address,
+ ConnectionType connectionType) {
+ this.socketChannel = socketChannel;
+ this.type = connectionType;
+ this.address = address;
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
index f9a610c..6d9e7c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.api.comm.IBufferFactory;
import org.apache.hyracks.api.comm.IChannelControlBlock;
import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelReadInterface;
import org.apache.hyracks.util.StorageUtil;
@@ -61,7 +62,7 @@
readInterface.setFullBufferAcceptor(new ReadFullBufferAcceptor(fullBufferQ));
readInterface.setBufferFactory(bufferFactory, RECEIVER_BUFFER_COUNT, FRAME_SIZE);
Assert.assertEquals(EXPECTED_CHANNEL_CREDIT, channelCredit.get());
- final SocketChannel socketChannel = mockSocketChannel(ccb);
+ final ISocketChannel socketChannel = mockSocketChannel(ccb);
final Thread networkFrameReader = new Thread(() -> {
try {
int framesRead = FRAMES_TO_READ_COUNT;
@@ -124,8 +125,8 @@
return ccb;
}
- private SocketChannel mockSocketChannel(IChannelControlBlock ccb) throws IOException {
- final SocketChannel sc = Mockito.mock(SocketChannel.class);
+ private ISocketChannel mockSocketChannel(IChannelControlBlock ccb) throws IOException {
+ final ISocketChannel sc = Mockito.mock(ISocketChannel.class);
Mockito.when(sc.read(Mockito.any(ByteBuffer.class))).thenAnswer(invocation -> {
ccb.addPendingCredits(-FRAME_SIZE);
final ByteBuffer buffer = invocation.getArgumentAt(0, ByteBuffer.class);
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
index 8f582ba..ec1d21c 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.api.comm.IBufferAcceptor;
import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
@@ -162,7 +163,7 @@
}
};
return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), md1OpenListener, 1, 5,
- FullFrameChannelInterfaceFactory.INSTANCE);
+ FullFrameChannelInterfaceFactory.INSTANCE, PlainSocketChannelFactory.INSTANCE);
}
private class ChannelIO {