Added the ability to retry connections on failure to connect

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1211 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index ab3d5c1..c905f57 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -45,6 +45,10 @@
 
     private int lastChannelWritten;
 
+    private int nConnectionAttempts;
+
+    private boolean connectionFailure;
+
     public MultiplexedConnection(MuxDemux muxDemux) {
         this.muxDemux = muxDemux;
         pendingWriteEventsCounter = new IEventCounter() {
@@ -73,6 +77,15 @@
         readerState = new ReaderState();
         writerState = new WriterState();
         lastChannelWritten = -1;
+        connectionFailure = false;
+    }
+
+    int getConnectionAttempts() {
+        return nConnectionAttempts;
+    }
+
+    void setConnectionAttempts(int nConnectionAttempts) {
+        this.nConnectionAttempts = nConnectionAttempts;
     }
 
     synchronized void setTCPConnection(TCPConnection tcpConnection) {
@@ -81,10 +94,18 @@
         notifyAll();
     }
 
-    synchronized void waitUntilConnected() throws InterruptedException {
-        while (tcpConnection == null) {
+    synchronized void setConnectionFailure() {
+        this.connectionFailure = true;
+        notifyAll();
+    }
+
+    synchronized void waitUntilConnected() throws InterruptedException, NetException {
+        while (tcpConnection == null && !connectionFailure) {
             wait();
         }
+        if (connectionFailure) {
+            throw new NetException("Connection failure");
+        }
     }
 
     @Override
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index 64ef6ae..8548bb8 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -19,6 +19,7 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import edu.uci.ics.hyracks.net.exceptions.NetException;
 import edu.uci.ics.hyracks.net.protocols.tcp.ITCPConnectionListener;
 import edu.uci.ics.hyracks.net.protocols.tcp.TCPConnection;
 import edu.uci.ics.hyracks.net.protocols.tcp.TCPEndpoint;
@@ -58,6 +59,23 @@
                 connection.setEventListener(mConn);
                 connection.setAttachment(mConn);
             }
+
+            @Override
+            public void connectionFailure(InetSocketAddress remoteAddress) {
+                MultiplexedConnection mConn;
+                synchronized (MuxDemux.this) {
+                    mConn = connectionMap.get(remoteAddress);
+                    assert mConn != null;
+                    int nConnectionAttempts = mConn.getConnectionAttempts();
+                    if (nConnectionAttempts > 5) {
+                        connectionMap.remove(remoteAddress);
+                        mConn.setConnectionFailure();
+                    } else {
+                        mConn.setConnectionAttempts(nConnectionAttempts + 1);
+                        tcpEndpoint.initiateConnection(remoteAddress);
+                    }
+                }
+            }
         }, nThreads);
         perfCounters = new MuxDemuxPerformanceCounters();
     }
@@ -66,7 +84,7 @@
         tcpEndpoint.start(localAddress);
     }
 
-    public MultiplexedConnection connect(InetSocketAddress remoteAddress) throws InterruptedException {
+    public MultiplexedConnection connect(InetSocketAddress remoteAddress) throws InterruptedException, NetException {
         MultiplexedConnection mConn = null;
         synchronized (this) {
             mConn = connectionMap.get(remoteAddress);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java
index cdaabf4..ead2a1f 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionListener.java
@@ -14,8 +14,12 @@
  */
 package edu.uci.ics.hyracks.net.protocols.tcp;
 
+import java.net.InetSocketAddress;
+
 public interface ITCPConnectionListener {
     public void acceptedConnection(TCPConnection connection);
 
     public void connectionEstablished(TCPConnection connection);
+
+    public void connectionFailure(InetSocketAddress remoteAddress);
 }
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index d73ba21..1a73bdc 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -110,11 +110,22 @@
                         for (InetSocketAddress address : workingPendingConnections) {
                             SocketChannel channel = SocketChannel.open();
                             channel.configureBlocking(false);
-                            if (!channel.connect(address)) {
-                                channel.register(selector, SelectionKey.OP_CONNECT);
-                            } else {
-                                SelectionKey key = channel.register(selector, 0);
-                                createConnection(key, channel);
+                            boolean connect = false;
+                            boolean failure = false;
+                            try {
+                                connect = channel.connect(address);
+                            } catch (IOException e) {
+                                failure = true;
+                                connectionListener.connectionFailure(address);
+                            }
+                            if (!failure) {
+                                if (!connect) {
+                                    SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);
+                                    key.attach(address);
+                                } else {
+                                    SelectionKey key = channel.register(selector, 0);
+                                    createConnection(key, channel);
+                                }
                             }
                         }
                         workingPendingConnections.clear();
@@ -150,7 +161,15 @@
                                 distributeIncomingConnection(channel);
                             } else if (key.isConnectable()) {
                                 SocketChannel channel = (SocketChannel) sc;
-                                if (channel.finishConnect()) {
+                                boolean finishConnect = false;
+                                try {
+                                    finishConnect = channel.finishConnect();
+                                } catch (Exception e) {
+                                    e.printStackTrace();
+                                    key.cancel();
+                                    connectionListener.connectionFailure((InetSocketAddress) key.attachment());
+                                }
+                                if (finishConnect) {
                                     createConnection(key, channel);
                                 }
                             }