Just use null as values to localAddress and listener for non listening TCP endpoints instead of copy pasting the constructor method.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2537 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/net/ClientNetworkManager.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/net/ClientNetworkManager.java
index 9d8b5a4..7aef8b9 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/net/ClientNetworkManager.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/net/ClientNetworkManager.java
@@ -31,7 +31,10 @@
     private final MuxDemux md;
 
     public ClientNetworkManager(int nThreads) throws IOException {
-        md = new MuxDemux(nThreads, MAX_CONNECTION_ATTEMPTS);
+        /* This is a connect only socket and does not listen to any incoming connections, so pass null to
+         * localAddress and listener.
+         */
+        md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS);
     }
 
     public void start() throws IOException {
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index 541253f..e4df6b9 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -48,9 +48,9 @@
      * Constructor.
      * 
      * @param localAddress
-     *            - TCP/IP socket address to listen on
+     *            - TCP/IP socket address to listen on. Null for non-listening unidirectional sockets
      * @param listener
-     *            - Callback interface to report channel events
+     *            - Callback interface to report channel events. Null for non-listening unidirectional sockets
      * @param nThreads
      *            - Number of threads to use for data transfer
      * @param maxConnectionAttempts
@@ -103,61 +103,13 @@
         perfCounters = new MuxDemuxPerformanceCounters();
     }
 
-    // Unidirectional MuxDemux: If this constructor is called, this MuxDemux will not listen to any incoming connections.
-    public MuxDemux(int nThreads, int maxConnectionAttempts) {
-        this.localAddress = null;
-        this.channelOpenListener = null;
-        this.maxConnectionAttempts = maxConnectionAttempts;
-        connectionMap = new HashMap<InetSocketAddress, MultiplexedConnection>();
-        this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() {
-            @Override
-            public void connectionEstablished(TCPConnection connection) {
-                MultiplexedConnection mConn;
-                synchronized (MuxDemux.this) {
-                    mConn = connectionMap.get(connection.getRemoteAddress());
-                }
-                assert mConn != null;
-                mConn.setTCPConnection(connection);
-                connection.setEventListener(mConn);
-                connection.setAttachment(mConn);
-            }
-
-            @Override
-            public void acceptedConnection(TCPConnection connection) {
-                // No implementation because we don't accept any connections.
-            }
-
-            @Override
-            public void connectionFailure(InetSocketAddress remoteAddress) {
-                MultiplexedConnection mConn;
-                synchronized (MuxDemux.this) {
-                    mConn = connectionMap.get(remoteAddress);
-                    assert mConn != null;
-                    int nConnectionAttempts = mConn.getConnectionAttempts();
-                    if (nConnectionAttempts > MuxDemux.this.maxConnectionAttempts) {
-                        connectionMap.remove(remoteAddress);
-                        mConn.setConnectionFailure();
-                    } else {
-                        mConn.setConnectionAttempts(nConnectionAttempts + 1);
-                        tcpEndpoint.initiateConnection(remoteAddress);
-                    }
-                }
-            }
-        }, nThreads);
-        perfCounters = new MuxDemuxPerformanceCounters();
-    }
-
     /**
      * Starts listening for remote connections and is capable of initiating connections.
      * 
      * @throws IOException
      */
     public void start() throws IOException {
-        if (localAddress == null) {
-            tcpEndpoint.start();
-        } else {
-            tcpEndpoint.start(localAddress);
-        }
+        tcpEndpoint.start(localAddress);
     }
 
     /**
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index 3a3252c..a9061e1 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -45,26 +45,23 @@
     }
 
     public void start(InetSocketAddress localAddress) throws IOException {
-        serverSocketChannel = ServerSocketChannel.open();
-        ServerSocket serverSocket = serverSocketChannel.socket();
-        serverSocket.bind(localAddress);
-        this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
-        ioThreads = new IOThread[nThreads];
-        for (int i = 0; i < ioThreads.length; ++i) {
-            ioThreads[i] = new IOThread();
+        // Setup a server socket listening channel only if the TCPEndpoint is a listening endpoint.
+        if (localAddress != null) {
+            serverSocketChannel = ServerSocketChannel.open();
+            ServerSocket serverSocket = serverSocketChannel.socket();
+            serverSocket.bind(localAddress);
+            this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
         }
-        ioThreads[0].registerServerSocket(serverSocketChannel);
-        for (int i = 0; i < ioThreads.length; ++i) {
-            ioThreads[i].start();
-        }
-    }
 
-    public void start() throws IOException {
-        this.localAddress = null;
         ioThreads = new IOThread[nThreads];
         for (int i = 0; i < ioThreads.length; ++i) {
             ioThreads[i] = new IOThread();
         }
+
+        if (localAddress != null) {
+            ioThreads[0].registerServerSocket(serverSocketChannel);
+        }
+
         for (int i = 0; i < ioThreads.length; ++i) {
             ioThreads[i].start();
         }