Added the ability to retry connections on failure to connect
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1211 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index ab3d5c1..c905f57 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -45,6 +45,10 @@
private int lastChannelWritten;
+ private int nConnectionAttempts;
+
+ private boolean connectionFailure;
+
public MultiplexedConnection(MuxDemux muxDemux) {
this.muxDemux = muxDemux;
pendingWriteEventsCounter = new IEventCounter() {
@@ -73,6 +77,15 @@
readerState = new ReaderState();
writerState = new WriterState();
lastChannelWritten = -1;
+ connectionFailure = false;
+ }
+
+ int getConnectionAttempts() {
+ return nConnectionAttempts;
+ }
+
+ void setConnectionAttempts(int nConnectionAttempts) {
+ this.nConnectionAttempts = nConnectionAttempts;
}
synchronized void setTCPConnection(TCPConnection tcpConnection) {
@@ -81,10 +94,18 @@
notifyAll();
}
- synchronized void waitUntilConnected() throws InterruptedException {
- while (tcpConnection == null) {
+ synchronized void setConnectionFailure() {
+ this.connectionFailure = true;
+ notifyAll();
+ }
+
+ synchronized void waitUntilConnected() throws InterruptedException, NetException {
+ while (tcpConnection == null && !connectionFailure) {
wait();
}
+ if (connectionFailure) {
+ throw new NetException("Connection failure");
+ }
}
@Override
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index 64ef6ae..8548bb8 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -19,6 +19,7 @@
import java.util.HashMap;
import java.util.Map;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
import edu.uci.ics.hyracks.net.protocols.tcp.ITCPConnectionListener;
import edu.uci.ics.hyracks.net.protocols.tcp.TCPConnection;
import edu.uci.ics.hyracks.net.protocols.tcp.TCPEndpoint;
@@ -58,6 +59,23 @@
connection.setEventListener(mConn);
connection.setAttachment(mConn);
}
+
+ @Override
+ public void connectionFailure(InetSocketAddress remoteAddress) {
+ MultiplexedConnection mConn;
+ synchronized (MuxDemux.this) {
+ mConn = connectionMap.get(remoteAddress);
+ assert mConn != null;
+ int nConnectionAttempts = mConn.getConnectionAttempts();
+ if (nConnectionAttempts > 5) {
+ connectionMap.remove(remoteAddress);
+ mConn.setConnectionFailure();
+ } else {
+ mConn.setConnectionAttempts(nConnectionAttempts + 1);
+ tcpEndpoint.initiateConnection(remoteAddress);
+ }
+ }
+ }
}, nThreads);
perfCounters = new MuxDemuxPerformanceCounters();
}
@@ -66,7 +84,7 @@
tcpEndpoint.start(localAddress);
}
- public MultiplexedConnection connect(InetSocketAddress remoteAddress) throws InterruptedException {
+ public MultiplexedConnection connect(InetSocketAddress remoteAddress) throws InterruptedException, NetException {
MultiplexedConnection mConn = null;
synchronized (this) {
mConn = connectionMap.get(remoteAddress);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java
index cdaabf4..ead2a1f 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java
@@ -14,8 +14,12 @@
*/
package edu.uci.ics.hyracks.net.protocols.tcp;
+import java.net.InetSocketAddress;
+
public interface ITCPConnectionListener {
public void acceptedConnection(TCPConnection connection);
public void connectionEstablished(TCPConnection connection);
+
+ public void connectionFailure(InetSocketAddress remoteAddress);
}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index d73ba21..1a73bdc 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -110,11 +110,22 @@
for (InetSocketAddress address : workingPendingConnections) {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
- if (!channel.connect(address)) {
- channel.register(selector, SelectionKey.OP_CONNECT);
- } else {
- SelectionKey key = channel.register(selector, 0);
- createConnection(key, channel);
+ boolean connect = false;
+ boolean failure = false;
+ try {
+ connect = channel.connect(address);
+ } catch (IOException e) {
+ failure = true;
+ connectionListener.connectionFailure(address);
+ }
+ if (!failure) {
+ if (!connect) {
+ SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);
+ key.attach(address);
+ } else {
+ SelectionKey key = channel.register(selector, 0);
+ createConnection(key, channel);
+ }
}
}
workingPendingConnections.clear();
@@ -150,7 +161,15 @@
distributeIncomingConnection(channel);
} else if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) sc;
- if (channel.finishConnect()) {
+ boolean finishConnect = false;
+ try {
+ finishConnect = channel.finishConnect();
+ } catch (Exception e) {
+ e.printStackTrace();
+ key.cancel();
+ connectionListener.connectionFailure((InetSocketAddress) key.attachment());
+ }
+ if (finishConnect) {
createConnection(key, channel);
}
}