Added multi-threading support to TCPEndpoint
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1010 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index c55f34a..574f552 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -52,6 +52,9 @@
@Option(name = "-dcache-client-path", usage = "Sets the path to store the files retrieved from the DCache server (default /tmp/dcache-client)")
public String dcacheClientPath = "/tmp/dcache-client";
+ @Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
+ public int nNetThreads = 1;
+
public void toCommandLine(List<String> cList) {
cList.add("-cc-host");
cList.add(ccHost);
@@ -75,5 +78,7 @@
}
cList.add("-dcache-client-path");
cList.add(dcacheClientPath);
+ cList.add("-net-thread-count");
+ cList.add(String.valueOf(nNetThreads));
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index ea18ad7..e211859 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -133,7 +133,7 @@
throw new Exception("id not set");
}
partitionManager = new PartitionManager(this);
- netManager = new NetworkManager(ctx, getIpAddress(ncConfig), partitionManager);
+ netManager = new NetworkManager(ctx, getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
queue = new WorkQueue();
jobletMap = new Hashtable<JobId, Joblet>();
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index 8e2a8ad..f94be22 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -51,10 +51,10 @@
private NetworkAddress networkAddress;
public NetworkManager(IHyracksRootContext ctx, InetAddress inetAddress,
- IPartitionRequestListener partitionRequestListener) throws IOException {
+ IPartitionRequestListener partitionRequestListener, int nThreads) throws IOException {
this.ctx = ctx;
this.partitionRequestListener = partitionRequestListener;
- md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener());
+ md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads);
}
public void start() throws IOException {
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index 1ed73c5..b041e2c 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -34,7 +34,7 @@
private final PerformanceCounters perfCounters;
- public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener) {
+ public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener, int nThreads) {
this.localAddress = localAddress;
this.channelOpenListener = listener;
connectionMap = new HashMap<InetSocketAddress, MultiplexedConnection>();
@@ -58,7 +58,7 @@
connection.setEventListener(mConn);
connection.setAttachment(mConn);
}
- });
+ }, nThreads);
perfCounters = new PerformanceCounters();
}
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index e7a237d..37641cb 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -29,14 +29,19 @@
public class TCPEndpoint {
private final ITCPConnectionListener connectionListener;
+ private final int nThreads;
+
private ServerSocketChannel serverSocketChannel;
private InetSocketAddress localAddress;
- private IOThread ioThread;
+ private IOThread[] ioThreads;
- public TCPEndpoint(ITCPConnectionListener connectionListener) {
+ private int nextThread;
+
+ public TCPEndpoint(ITCPConnectionListener connectionListener, int nThreads) {
this.connectionListener = connectionListener;
+ this.nThreads = nThreads;
}
public void start(InetSocketAddress localAddress) throws IOException {
@@ -44,13 +49,30 @@
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(localAddress);
this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
- ioThread = new IOThread();
- ioThread.registerServerSocket(serverSocketChannel);
- ioThread.start();
+ ioThreads = new IOThread[nThreads];
+ for (int i = 0; i < ioThreads.length; ++i) {
+ ioThreads[i] = new IOThread();
+ }
+ ioThreads[0].registerServerSocket(serverSocketChannel);
+ for (int i = 0; i < ioThreads.length; ++i) {
+ ioThreads[i].start();
+ }
+ }
+
+ private synchronized int getNextThread() {
+ int result = nextThread;
+ nextThread = (nextThread + 1) % nThreads;
+ return result;
}
public void initiateConnection(InetSocketAddress remoteAddress) {
- ioThread.initiateConnection(remoteAddress);
+ int targetThread = getNextThread();
+ ioThreads[targetThread].initiateConnection(remoteAddress);
+ }
+
+ private void addIncomingConnection(SocketChannel channel) {
+ int targetThread = getNextThread();
+ ioThreads[targetThread].addIncomingConnection(channel);
}
public InetSocketAddress getLocalAddress() {
@@ -60,6 +82,8 @@
private class IOThread extends Thread {
private final List<InetSocketAddress>[] pendingConnections;
+ private final List<SocketChannel>[] incomingConnections;
+
private int writerIndex;
private int readerIndex;
@@ -71,6 +95,7 @@
setPriority(MAX_PRIORITY);
this.pendingConnections = new List[] { new ArrayList<InetSocketAddress>(),
new ArrayList<InetSocketAddress>() };
+ this.incomingConnections = new List[] { new ArrayList<SocketChannel>(), new ArrayList<SocketChannel>() };
writerIndex = 0;
readerIndex = 1;
selector = Selector.open();
@@ -95,6 +120,18 @@
}
pendingConnections[readerIndex].clear();
}
+ if (!incomingConnections[readerIndex].isEmpty()) {
+ for (SocketChannel channel : incomingConnections[readerIndex]) {
+ channel.configureBlocking(false);
+ SelectionKey sKey = channel.register(selector, 0);
+ TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
+ sKey.attach(connection);
+ synchronized (connectionListener) {
+ connectionListener.acceptedConnection(connection);
+ }
+ }
+ incomingConnections[readerIndex].clear();
+ }
if (n > 0) {
Iterator<SelectionKey> i = selector.selectedKeys().iterator();
while (i.hasNext()) {
@@ -111,11 +148,7 @@
if (key.isAcceptable()) {
assert sc == serverSocketChannel;
SocketChannel channel = serverSocketChannel.accept();
- channel.configureBlocking(false);
- SelectionKey sKey = channel.register(selector, 0);
- TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
- sKey.attach(connection);
- connectionListener.acceptedConnection(connection);
+ addIncomingConnection(channel);
} else if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) sc;
if (channel.finishConnect()) {
@@ -142,6 +175,11 @@
selector.wakeup();
}
+ synchronized void addIncomingConnection(SocketChannel channel) {
+ incomingConnections[writerIndex].add(channel);
+ selector.wakeup();
+ }
+
void registerServerSocket(ServerSocketChannel serverSocketChannel) throws IOException {
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
diff --git a/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java b/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
index c09f7b4..31bc2df 100644
--- a/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
+++ b/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
@@ -158,7 +158,7 @@
}.start();
}
};
- return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), md1OpenListener);
+ return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), md1OpenListener, 1);
}
private class ChannelIO {