Minor refactoring of TCP Endpoint

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1008 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index 3892b83..e7a237d 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -29,25 +29,14 @@
 public class TCPEndpoint {
     private final ITCPConnectionListener connectionListener;
 
-    private final List<InetSocketAddress>[] pendingConnections;
-
     private ServerSocketChannel serverSocketChannel;
 
     private InetSocketAddress localAddress;
 
-    private Selector selector;
-
     private IOThread ioThread;
 
-    private int writerIndex;
-
-    private int readerIndex;
-
     public TCPEndpoint(ITCPConnectionListener connectionListener) {
         this.connectionListener = connectionListener;
-        this.pendingConnections = new List[] { new ArrayList<InetSocketAddress>(), new ArrayList<InetSocketAddress>() };
-        writerIndex = 0;
-        readerIndex = 1;
     }
 
     public void start(InetSocketAddress localAddress) throws IOException {
@@ -55,22 +44,13 @@
         ServerSocket serverSocket = serverSocketChannel.socket();
         serverSocket.bind(localAddress);
         this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
-        serverSocketChannel.configureBlocking(false);
-        selector = Selector.open();
-        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
         ioThread = new IOThread();
+        ioThread.registerServerSocket(serverSocketChannel);
         ioThread.start();
     }
 
-    public synchronized void initiateConnection(InetSocketAddress remoteAddress) {
-        pendingConnections[writerIndex].add(remoteAddress);
-        selector.wakeup();
-    }
-
-    private synchronized void swapReadersAndWriters() {
-        int temp = readerIndex;
-        readerIndex = writerIndex;
-        writerIndex = temp;
+    public void initiateConnection(InetSocketAddress remoteAddress) {
+        ioThread.initiateConnection(remoteAddress);
     }
 
     public InetSocketAddress getLocalAddress() {
@@ -78,8 +58,22 @@
     }
 
     private class IOThread extends Thread {
-        public IOThread() {
+        private final List<InetSocketAddress>[] pendingConnections;
+
+        private int writerIndex;
+
+        private int readerIndex;
+
+        private Selector selector;
+
+        public IOThread() throws IOException {
             super("TCPEndpoint IO Thread");
+            setPriority(MAX_PRIORITY);
+            this.pendingConnections = new List[] { new ArrayList<InetSocketAddress>(),
+                    new ArrayList<InetSocketAddress>() };
+            writerIndex = 0;
+            readerIndex = 1;
+            selector = Selector.open();
         }
 
         @Override
@@ -142,5 +136,21 @@
             key.interestOps(0);
             connectionListener.connectionEstablished(connection);
         }
+
+        synchronized void initiateConnection(InetSocketAddress remoteAddress) {
+            pendingConnections[writerIndex].add(remoteAddress);
+            selector.wakeup();
+        }
+
+        void registerServerSocket(ServerSocketChannel serverSocketChannel) throws IOException {
+            serverSocketChannel.configureBlocking(false);
+            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+        }
+
+        private synchronized void swapReadersAndWriters() {
+            int temp = readerIndex;
+            readerIndex = writerIndex;
+            writerIndex = temp;
+        }
     }
 }
\ No newline at end of file