Added multi-threading support to TCPEndpoint

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1010 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index c55f34a..574f552 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -52,6 +52,9 @@
     @Option(name = "-dcache-client-path", usage = "Sets the path to store the files retrieved from the DCache server (default /tmp/dcache-client)")
     public String dcacheClientPath = "/tmp/dcache-client";
 
+    @Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
+    public int nNetThreads = 1;
+
     public void toCommandLine(List<String> cList) {
         cList.add("-cc-host");
         cList.add(ccHost);
@@ -75,5 +78,7 @@
         }
         cList.add("-dcache-client-path");
         cList.add(dcacheClientPath);
+        cList.add("-net-thread-count");
+        cList.add(String.valueOf(nNetThreads));
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index ea18ad7..e211859 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -133,7 +133,7 @@
             throw new Exception("id not set");
         }
         partitionManager = new PartitionManager(this);
-        netManager = new NetworkManager(ctx, getIpAddress(ncConfig), partitionManager);
+        netManager = new NetworkManager(ctx, getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
 
         queue = new WorkQueue();
         jobletMap = new Hashtable<JobId, Joblet>();
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index 8e2a8ad..f94be22 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -51,10 +51,10 @@
     private NetworkAddress networkAddress;
 
     public NetworkManager(IHyracksRootContext ctx, InetAddress inetAddress,
-            IPartitionRequestListener partitionRequestListener) throws IOException {
+            IPartitionRequestListener partitionRequestListener, int nThreads) throws IOException {
         this.ctx = ctx;
         this.partitionRequestListener = partitionRequestListener;
-        md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener());
+        md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads);
     }
 
     public void start() throws IOException {
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index 1ed73c5..b041e2c 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -34,7 +34,7 @@
 
     private final PerformanceCounters perfCounters;
 
-    public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener) {
+    public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener, int nThreads) {
         this.localAddress = localAddress;
         this.channelOpenListener = listener;
         connectionMap = new HashMap<InetSocketAddress, MultiplexedConnection>();
@@ -58,7 +58,7 @@
                 connection.setEventListener(mConn);
                 connection.setAttachment(mConn);
             }
-        });
+        }, nThreads);
         perfCounters = new PerformanceCounters();
     }
 
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index e7a237d..37641cb 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -29,14 +29,19 @@
 public class TCPEndpoint {
     private final ITCPConnectionListener connectionListener;
 
+    private final int nThreads;
+
     private ServerSocketChannel serverSocketChannel;
 
     private InetSocketAddress localAddress;
 
-    private IOThread ioThread;
+    private IOThread[] ioThreads;
 
-    public TCPEndpoint(ITCPConnectionListener connectionListener) {
+    private int nextThread;
+
+    public TCPEndpoint(ITCPConnectionListener connectionListener, int nThreads) {
         this.connectionListener = connectionListener;
+        this.nThreads = nThreads;
     }
 
     public void start(InetSocketAddress localAddress) throws IOException {
@@ -44,13 +49,30 @@
         ServerSocket serverSocket = serverSocketChannel.socket();
         serverSocket.bind(localAddress);
         this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
-        ioThread = new IOThread();
-        ioThread.registerServerSocket(serverSocketChannel);
-        ioThread.start();
+        ioThreads = new IOThread[nThreads];
+        for (int i = 0; i < ioThreads.length; ++i) {
+            ioThreads[i] = new IOThread();
+        }
+        ioThreads[0].registerServerSocket(serverSocketChannel);
+        for (int i = 0; i < ioThreads.length; ++i) {
+            ioThreads[i].start();
+        }
+    }
+
+    private synchronized int getNextThread() {
+        int result = nextThread;
+        nextThread = (nextThread + 1) % nThreads;
+        return result;
     }
 
     public void initiateConnection(InetSocketAddress remoteAddress) {
-        ioThread.initiateConnection(remoteAddress);
+        int targetThread = getNextThread();
+        ioThreads[targetThread].initiateConnection(remoteAddress);
+    }
+
+    private void addIncomingConnection(SocketChannel channel) {
+        int targetThread = getNextThread();
+        ioThreads[targetThread].addIncomingConnection(channel);
     }
 
     public InetSocketAddress getLocalAddress() {
@@ -60,6 +82,8 @@
     private class IOThread extends Thread {
         private final List<InetSocketAddress>[] pendingConnections;
 
+        private final List<SocketChannel>[] incomingConnections;
+
         private int writerIndex;
 
         private int readerIndex;
@@ -71,6 +95,7 @@
             setPriority(MAX_PRIORITY);
             this.pendingConnections = new List[] { new ArrayList<InetSocketAddress>(),
                     new ArrayList<InetSocketAddress>() };
+            this.incomingConnections = new List[] { new ArrayList<SocketChannel>(), new ArrayList<SocketChannel>() };
             writerIndex = 0;
             readerIndex = 1;
             selector = Selector.open();
@@ -95,6 +120,18 @@
                         }
                         pendingConnections[readerIndex].clear();
                     }
+                    if (!incomingConnections[readerIndex].isEmpty()) {
+                        for (SocketChannel channel : incomingConnections[readerIndex]) {
+                            channel.configureBlocking(false);
+                            SelectionKey sKey = channel.register(selector, 0);
+                            TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
+                            sKey.attach(connection);
+                            synchronized (connectionListener) {
+                                connectionListener.acceptedConnection(connection);
+                            }
+                        }
+                        incomingConnections[readerIndex].clear();
+                    }
                     if (n > 0) {
                         Iterator<SelectionKey> i = selector.selectedKeys().iterator();
                         while (i.hasNext()) {
@@ -111,11 +148,7 @@
                             if (key.isAcceptable()) {
                                 assert sc == serverSocketChannel;
                                 SocketChannel channel = serverSocketChannel.accept();
-                                channel.configureBlocking(false);
-                                SelectionKey sKey = channel.register(selector, 0);
-                                TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
-                                sKey.attach(connection);
-                                connectionListener.acceptedConnection(connection);
+                                addIncomingConnection(channel);
                             } else if (key.isConnectable()) {
                                 SocketChannel channel = (SocketChannel) sc;
                                 if (channel.finishConnect()) {
@@ -142,6 +175,11 @@
             selector.wakeup();
         }
 
+        synchronized void addIncomingConnection(SocketChannel channel) {
+            incomingConnections[writerIndex].add(channel);
+            selector.wakeup();
+        }
+
         void registerServerSocket(ServerSocketChannel serverSocketChannel) throws IOException {
             serverSocketChannel.configureBlocking(false);
             serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
diff --git a/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java b/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
index c09f7b4..31bc2df 100644
--- a/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
+++ b/hyracks-net/src/test/java/edu/uci/ics/hyracks/net/tests/NetTest.java
@@ -158,7 +158,7 @@
                 }.start();
             }
         };
-        return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), md1OpenListener);
+        return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), md1OpenListener, 1);
     }
 
     private class ChannelIO {