Minor refactoring of TCP Endpoint
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1008 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index 3892b83..e7a237d 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -29,25 +29,14 @@
public class TCPEndpoint {
private final ITCPConnectionListener connectionListener;
- private final List<InetSocketAddress>[] pendingConnections;
-
private ServerSocketChannel serverSocketChannel;
private InetSocketAddress localAddress;
- private Selector selector;
-
private IOThread ioThread;
- private int writerIndex;
-
- private int readerIndex;
-
public TCPEndpoint(ITCPConnectionListener connectionListener) {
this.connectionListener = connectionListener;
- this.pendingConnections = new List[] { new ArrayList<InetSocketAddress>(), new ArrayList<InetSocketAddress>() };
- writerIndex = 0;
- readerIndex = 1;
}
public void start(InetSocketAddress localAddress) throws IOException {
@@ -55,22 +44,13 @@
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(localAddress);
this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
- serverSocketChannel.configureBlocking(false);
- selector = Selector.open();
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
ioThread = new IOThread();
+ ioThread.registerServerSocket(serverSocketChannel);
ioThread.start();
}
- public synchronized void initiateConnection(InetSocketAddress remoteAddress) {
- pendingConnections[writerIndex].add(remoteAddress);
- selector.wakeup();
- }
-
- private synchronized void swapReadersAndWriters() {
- int temp = readerIndex;
- readerIndex = writerIndex;
- writerIndex = temp;
+ public void initiateConnection(InetSocketAddress remoteAddress) {
+ ioThread.initiateConnection(remoteAddress);
}
public InetSocketAddress getLocalAddress() {
@@ -78,8 +58,22 @@
}
private class IOThread extends Thread {
- public IOThread() {
+ private final List<InetSocketAddress>[] pendingConnections;
+
+ private int writerIndex;
+
+ private int readerIndex;
+
+ private Selector selector;
+
+ public IOThread() throws IOException {
super("TCPEndpoint IO Thread");
+ setPriority(MAX_PRIORITY);
+ this.pendingConnections = new List[] { new ArrayList<InetSocketAddress>(),
+ new ArrayList<InetSocketAddress>() };
+ writerIndex = 0;
+ readerIndex = 1;
+ selector = Selector.open();
}
@Override
@@ -142,5 +136,21 @@
key.interestOps(0);
connectionListener.connectionEstablished(connection);
}
+
+ synchronized void initiateConnection(InetSocketAddress remoteAddress) {
+ pendingConnections[writerIndex].add(remoteAddress);
+ selector.wakeup();
+ }
+
+ void registerServerSocket(ServerSocketChannel serverSocketChannel) throws IOException {
+ serverSocketChannel.configureBlocking(false);
+ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+ }
+
+ private synchronized void swapReadersAndWriters() {
+ int temp = readerIndex;
+ readerIndex = writerIndex;
+ writerIndex = temp;
+ }
}
}
\ No newline at end of file