Allow TCPEndpoints without listeners by overloading the required methods.
Constructors and methods that mandate TCPEndpoints supporting listeners
are overloaded with methods that don't need listener socket information.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2513 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index c719bc4..541253f 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -103,13 +103,61 @@
perfCounters = new MuxDemuxPerformanceCounters();
}
+ // Unidirectional MuxDemux: If this constructor is called, this MuxDemux will not listen to any incoming connections.
+ public MuxDemux(int nThreads, int maxConnectionAttempts) {
+ this.localAddress = null;
+ this.channelOpenListener = null;
+ this.maxConnectionAttempts = maxConnectionAttempts;
+ connectionMap = new HashMap<InetSocketAddress, MultiplexedConnection>();
+ this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() {
+ @Override
+ public void connectionEstablished(TCPConnection connection) {
+ MultiplexedConnection mConn;
+ synchronized (MuxDemux.this) {
+ mConn = connectionMap.get(connection.getRemoteAddress());
+ }
+ assert mConn != null;
+ mConn.setTCPConnection(connection);
+ connection.setEventListener(mConn);
+ connection.setAttachment(mConn);
+ }
+
+ @Override
+ public void acceptedConnection(TCPConnection connection) {
+ // No implementation because we don't accept any connections.
+ }
+
+ @Override
+ public void connectionFailure(InetSocketAddress remoteAddress) {
+ MultiplexedConnection mConn;
+ synchronized (MuxDemux.this) {
+ mConn = connectionMap.get(remoteAddress);
+ assert mConn != null;
+ int nConnectionAttempts = mConn.getConnectionAttempts();
+ if (nConnectionAttempts > MuxDemux.this.maxConnectionAttempts) {
+ connectionMap.remove(remoteAddress);
+ mConn.setConnectionFailure();
+ } else {
+ mConn.setConnectionAttempts(nConnectionAttempts + 1);
+ tcpEndpoint.initiateConnection(remoteAddress);
+ }
+ }
+ }
+ }, nThreads);
+ perfCounters = new MuxDemuxPerformanceCounters();
+ }
+
/**
* Starts listening for remote connections and is capable of initiating connections.
*
* @throws IOException
*/
public void start() throws IOException {
- tcpEndpoint.start(localAddress);
+ if (localAddress == null) {
+ tcpEndpoint.start();
+ } else {
+ tcpEndpoint.start(localAddress);
+ }
}
/**
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index d13a17e..3a3252c 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -59,6 +59,17 @@
}
}
+ public void start() throws IOException {
+ this.localAddress = null;
+ ioThreads = new IOThread[nThreads];
+ for (int i = 0; i < ioThreads.length; ++i) {
+ ioThreads[i] = new IOThread();
+ }
+ for (int i = 0; i < ioThreads.length; ++i) {
+ ioThreads[i].start();
+ }
+ }
+
private synchronized int getNextThread() {
int result = nextThread;
nextThread = (nextThread + 1) % nThreads;