Fixed socket distribution among threads
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1040 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index 37641cb..d73ba21 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -70,7 +70,7 @@
ioThreads[targetThread].initiateConnection(remoteAddress);
}
- private void addIncomingConnection(SocketChannel channel) {
+ private void distributeIncomingConnection(SocketChannel channel) {
int targetThread = getNextThread();
ioThreads[targetThread].addIncomingConnection(channel);
}
@@ -80,24 +80,23 @@
}
private class IOThread extends Thread {
- private final List<InetSocketAddress>[] pendingConnections;
+ private final List<InetSocketAddress> pendingConnections;
- private final List<SocketChannel>[] incomingConnections;
+ private final List<InetSocketAddress> workingPendingConnections;
- private int writerIndex;
+ private final List<SocketChannel> incomingConnections;
- private int readerIndex;
+ private final List<SocketChannel> workingIncomingConnections;
private Selector selector;
public IOThread() throws IOException {
super("TCPEndpoint IO Thread");
setPriority(MAX_PRIORITY);
- this.pendingConnections = new List[] { new ArrayList<InetSocketAddress>(),
- new ArrayList<InetSocketAddress>() };
- this.incomingConnections = new List[] { new ArrayList<SocketChannel>(), new ArrayList<SocketChannel>() };
- writerIndex = 0;
- readerIndex = 1;
+ this.pendingConnections = new ArrayList<InetSocketAddress>();
+ this.workingPendingConnections = new ArrayList<InetSocketAddress>();
+ this.incomingConnections = new ArrayList<SocketChannel>();
+ this.workingIncomingConnections = new ArrayList<SocketChannel>();
selector = Selector.open();
}
@@ -106,9 +105,9 @@
while (true) {
try {
int n = selector.select();
- swapReadersAndWriters();
- if (!pendingConnections[readerIndex].isEmpty()) {
- for (InetSocketAddress address : pendingConnections[readerIndex]) {
+ collectOutstandingWork();
+ if (!workingPendingConnections.isEmpty()) {
+ for (InetSocketAddress address : workingPendingConnections) {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
if (!channel.connect(address)) {
@@ -118,10 +117,10 @@
createConnection(key, channel);
}
}
- pendingConnections[readerIndex].clear();
+ workingPendingConnections.clear();
}
- if (!incomingConnections[readerIndex].isEmpty()) {
- for (SocketChannel channel : incomingConnections[readerIndex]) {
+ if (!workingIncomingConnections.isEmpty()) {
+ for (SocketChannel channel : workingIncomingConnections) {
channel.configureBlocking(false);
SelectionKey sKey = channel.register(selector, 0);
TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
@@ -130,7 +129,7 @@
connectionListener.acceptedConnection(connection);
}
}
- incomingConnections[readerIndex].clear();
+ workingIncomingConnections.clear();
}
if (n > 0) {
Iterator<SelectionKey> i = selector.selectedKeys().iterator();
@@ -148,7 +147,7 @@
if (key.isAcceptable()) {
assert sc == serverSocketChannel;
SocketChannel channel = serverSocketChannel.accept();
- addIncomingConnection(channel);
+ distributeIncomingConnection(channel);
} else if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) sc;
if (channel.finishConnect()) {
@@ -171,12 +170,12 @@
}
synchronized void initiateConnection(InetSocketAddress remoteAddress) {
- pendingConnections[writerIndex].add(remoteAddress);
+ pendingConnections.add(remoteAddress);
selector.wakeup();
}
synchronized void addIncomingConnection(SocketChannel channel) {
- incomingConnections[writerIndex].add(channel);
+ incomingConnections.add(channel);
selector.wakeup();
}
@@ -185,10 +184,15 @@
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
- private synchronized void swapReadersAndWriters() {
- int temp = readerIndex;
- readerIndex = writerIndex;
- writerIndex = temp;
+ private synchronized void collectOutstandingWork() {
+ if (!pendingConnections.isEmpty()) {
+ workingPendingConnections.addAll(pendingConnections);
+ pendingConnections.clear();
+ }
+ if (!incomingConnections.isEmpty()) {
+ workingIncomingConnections.addAll(incomingConnections);
+ incomingConnections.clear();
+ }
}
}
}
\ No newline at end of file