Fixed socket distribution among threads

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1040 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index 37641cb..d73ba21 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -70,7 +70,7 @@
         ioThreads[targetThread].initiateConnection(remoteAddress);
     }
 
-    private void addIncomingConnection(SocketChannel channel) {
+    private void distributeIncomingConnection(SocketChannel channel) {
         int targetThread = getNextThread();
         ioThreads[targetThread].addIncomingConnection(channel);
     }
@@ -80,24 +80,23 @@
     }
 
     private class IOThread extends Thread {
-        private final List<InetSocketAddress>[] pendingConnections;
+        private final List<InetSocketAddress> pendingConnections;
 
-        private final List<SocketChannel>[] incomingConnections;
+        private final List<InetSocketAddress> workingPendingConnections;
 
-        private int writerIndex;
+        private final List<SocketChannel> incomingConnections;
 
-        private int readerIndex;
+        private final List<SocketChannel> workingIncomingConnections;
 
         private Selector selector;
 
         public IOThread() throws IOException {
             super("TCPEndpoint IO Thread");
             setPriority(MAX_PRIORITY);
-            this.pendingConnections = new List[] { new ArrayList<InetSocketAddress>(),
-                    new ArrayList<InetSocketAddress>() };
-            this.incomingConnections = new List[] { new ArrayList<SocketChannel>(), new ArrayList<SocketChannel>() };
-            writerIndex = 0;
-            readerIndex = 1;
+            this.pendingConnections = new ArrayList<InetSocketAddress>();
+            this.workingPendingConnections = new ArrayList<InetSocketAddress>();
+            this.incomingConnections = new ArrayList<SocketChannel>();
+            this.workingIncomingConnections = new ArrayList<SocketChannel>();
             selector = Selector.open();
         }
 
@@ -106,9 +105,9 @@
             while (true) {
                 try {
                     int n = selector.select();
-                    swapReadersAndWriters();
-                    if (!pendingConnections[readerIndex].isEmpty()) {
-                        for (InetSocketAddress address : pendingConnections[readerIndex]) {
+                    collectOutstandingWork();
+                    if (!workingPendingConnections.isEmpty()) {
+                        for (InetSocketAddress address : workingPendingConnections) {
                             SocketChannel channel = SocketChannel.open();
                             channel.configureBlocking(false);
                             if (!channel.connect(address)) {
@@ -118,10 +117,10 @@
                                 createConnection(key, channel);
                             }
                         }
-                        pendingConnections[readerIndex].clear();
+                        workingPendingConnections.clear();
                     }
-                    if (!incomingConnections[readerIndex].isEmpty()) {
-                        for (SocketChannel channel : incomingConnections[readerIndex]) {
+                    if (!workingIncomingConnections.isEmpty()) {
+                        for (SocketChannel channel : workingIncomingConnections) {
                             channel.configureBlocking(false);
                             SelectionKey sKey = channel.register(selector, 0);
                             TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
@@ -130,7 +129,7 @@
                                 connectionListener.acceptedConnection(connection);
                             }
                         }
-                        incomingConnections[readerIndex].clear();
+                        workingIncomingConnections.clear();
                     }
                     if (n > 0) {
                         Iterator<SelectionKey> i = selector.selectedKeys().iterator();
@@ -148,7 +147,7 @@
                             if (key.isAcceptable()) {
                                 assert sc == serverSocketChannel;
                                 SocketChannel channel = serverSocketChannel.accept();
-                                addIncomingConnection(channel);
+                                distributeIncomingConnection(channel);
                             } else if (key.isConnectable()) {
                                 SocketChannel channel = (SocketChannel) sc;
                                 if (channel.finishConnect()) {
@@ -171,12 +170,12 @@
         }
 
         synchronized void initiateConnection(InetSocketAddress remoteAddress) {
-            pendingConnections[writerIndex].add(remoteAddress);
+            pendingConnections.add(remoteAddress);
             selector.wakeup();
         }
 
         synchronized void addIncomingConnection(SocketChannel channel) {
-            incomingConnections[writerIndex].add(channel);
+            incomingConnections.add(channel);
             selector.wakeup();
         }
 
@@ -185,10 +184,15 @@
             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
         }
 
-        private synchronized void swapReadersAndWriters() {
-            int temp = readerIndex;
-            readerIndex = writerIndex;
-            writerIndex = temp;
+        private synchronized void collectOutstandingWork() {
+            if (!pendingConnections.isEmpty()) {
+                workingPendingConnections.addAll(pendingConnections);
+                pendingConnections.clear();
+            }
+            if (!incomingConnections.isEmpty()) {
+                workingIncomingConnections.addAll(incomingConnections);
+                incomingConnections.clear();
+            }
         }
     }
 }
\ No newline at end of file