Just use null as values to localAddress and listener for non listening TCP endpoints instead of copy pasting the constructor method.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2537 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/net/ClientNetworkManager.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/net/ClientNetworkManager.java
index 9d8b5a4..7aef8b9 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/net/ClientNetworkManager.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/net/ClientNetworkManager.java
@@ -31,7 +31,10 @@
private final MuxDemux md;
public ClientNetworkManager(int nThreads) throws IOException {
- md = new MuxDemux(nThreads, MAX_CONNECTION_ATTEMPTS);
+ /* This is a connect only socket and does not listen to any incoming connections, so pass null to
+ * localAddress and listener.
+ */
+ md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS);
}
public void start() throws IOException {
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index 541253f..e4df6b9 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -48,9 +48,9 @@
* Constructor.
*
* @param localAddress
- * - TCP/IP socket address to listen on
+ * - TCP/IP socket address to listen on. Null for non-listening unidirectional sockets
* @param listener
- * - Callback interface to report channel events
+ * - Callback interface to report channel events. Null for non-listening unidirectional sockets
* @param nThreads
* - Number of threads to use for data transfer
* @param maxConnectionAttempts
@@ -103,61 +103,13 @@
perfCounters = new MuxDemuxPerformanceCounters();
}
- // Unidirectional MuxDemux: If this constructor is called, this MuxDemux will not listen to any incoming connections.
- public MuxDemux(int nThreads, int maxConnectionAttempts) {
- this.localAddress = null;
- this.channelOpenListener = null;
- this.maxConnectionAttempts = maxConnectionAttempts;
- connectionMap = new HashMap<InetSocketAddress, MultiplexedConnection>();
- this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() {
- @Override
- public void connectionEstablished(TCPConnection connection) {
- MultiplexedConnection mConn;
- synchronized (MuxDemux.this) {
- mConn = connectionMap.get(connection.getRemoteAddress());
- }
- assert mConn != null;
- mConn.setTCPConnection(connection);
- connection.setEventListener(mConn);
- connection.setAttachment(mConn);
- }
-
- @Override
- public void acceptedConnection(TCPConnection connection) {
- // No implementation because we don't accept any connections.
- }
-
- @Override
- public void connectionFailure(InetSocketAddress remoteAddress) {
- MultiplexedConnection mConn;
- synchronized (MuxDemux.this) {
- mConn = connectionMap.get(remoteAddress);
- assert mConn != null;
- int nConnectionAttempts = mConn.getConnectionAttempts();
- if (nConnectionAttempts > MuxDemux.this.maxConnectionAttempts) {
- connectionMap.remove(remoteAddress);
- mConn.setConnectionFailure();
- } else {
- mConn.setConnectionAttempts(nConnectionAttempts + 1);
- tcpEndpoint.initiateConnection(remoteAddress);
- }
- }
- }
- }, nThreads);
- perfCounters = new MuxDemuxPerformanceCounters();
- }
-
/**
* Starts listening for remote connections and is capable of initiating connections.
*
* @throws IOException
*/
public void start() throws IOException {
- if (localAddress == null) {
- tcpEndpoint.start();
- } else {
- tcpEndpoint.start(localAddress);
- }
+ tcpEndpoint.start(localAddress);
}
/**
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index 3a3252c..a9061e1 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -45,26 +45,23 @@
}
public void start(InetSocketAddress localAddress) throws IOException {
- serverSocketChannel = ServerSocketChannel.open();
- ServerSocket serverSocket = serverSocketChannel.socket();
- serverSocket.bind(localAddress);
- this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
- ioThreads = new IOThread[nThreads];
- for (int i = 0; i < ioThreads.length; ++i) {
- ioThreads[i] = new IOThread();
+ // Setup a server socket listening channel only if the TCPEndpoint is a listening endpoint.
+ if (localAddress != null) {
+ serverSocketChannel = ServerSocketChannel.open();
+ ServerSocket serverSocket = serverSocketChannel.socket();
+ serverSocket.bind(localAddress);
+ this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
}
- ioThreads[0].registerServerSocket(serverSocketChannel);
- for (int i = 0; i < ioThreads.length; ++i) {
- ioThreads[i].start();
- }
- }
- public void start() throws IOException {
- this.localAddress = null;
ioThreads = new IOThread[nThreads];
for (int i = 0; i < ioThreads.length; ++i) {
ioThreads[i] = new IOThread();
}
+
+ if (localAddress != null) {
+ ioThreads[0].registerServerSocket(serverSocketChannel);
+ }
+
for (int i = 0; i < ioThreads.length; ++i) {
ioThreads[i].start();
}