Allow TCPEndpoints without listeners by overloading the required methods.

Constructors and methods that mandate TCPEndpoints supporting listeners
are overloaded with methods that don't need listener socket information.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2513 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index c719bc4..541253f 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -103,13 +103,61 @@
         perfCounters = new MuxDemuxPerformanceCounters();
     }
 
+    // Unidirectional MuxDemux: If this constructor is called, this MuxDemux will not listen to any incoming connections.
+    public MuxDemux(int nThreads, int maxConnectionAttempts) {
+        this.localAddress = null;
+        this.channelOpenListener = null;
+        this.maxConnectionAttempts = maxConnectionAttempts;
+        connectionMap = new HashMap<InetSocketAddress, MultiplexedConnection>();
+        this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() {
+            @Override
+            public void connectionEstablished(TCPConnection connection) {
+                MultiplexedConnection mConn;
+                synchronized (MuxDemux.this) {
+                    mConn = connectionMap.get(connection.getRemoteAddress());
+                }
+                assert mConn != null;
+                mConn.setTCPConnection(connection);
+                connection.setEventListener(mConn);
+                connection.setAttachment(mConn);
+            }
+
+            @Override
+            public void acceptedConnection(TCPConnection connection) {
+                // No implementation because we don't accept any connections.
+            }
+
+            @Override
+            public void connectionFailure(InetSocketAddress remoteAddress) {
+                MultiplexedConnection mConn;
+                synchronized (MuxDemux.this) {
+                    mConn = connectionMap.get(remoteAddress);
+                    assert mConn != null;
+                    int nConnectionAttempts = mConn.getConnectionAttempts();
+                    if (nConnectionAttempts > MuxDemux.this.maxConnectionAttempts) {
+                        connectionMap.remove(remoteAddress);
+                        mConn.setConnectionFailure();
+                    } else {
+                        mConn.setConnectionAttempts(nConnectionAttempts + 1);
+                        tcpEndpoint.initiateConnection(remoteAddress);
+                    }
+                }
+            }
+        }, nThreads);
+        perfCounters = new MuxDemuxPerformanceCounters();
+    }
+
     /**
      * Starts listening for remote connections and is capable of initiating connections.
      * 
      * @throws IOException
      */
     public void start() throws IOException {
-        tcpEndpoint.start(localAddress);
+        if (localAddress == null) {
+            tcpEndpoint.start();
+        } else {
+            tcpEndpoint.start(localAddress);
+        }
     }
 
     /**
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index d13a17e..3a3252c 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -59,6 +59,17 @@
         }
     }
 
+    public void start() throws IOException {
+        this.localAddress = null;
+        ioThreads = new IOThread[nThreads];
+        for (int i = 0; i < ioThreads.length; ++i) {
+            ioThreads[i] = new IOThread();
+        }
+        for (int i = 0; i < ioThreads.length; ++i) {
+            ioThreads[i].start();
+        }
+    }
+
     private synchronized int getNextThread() {
         int result = nextThread;
         nextThread = (nextThread + 1) % nThreads;