Allow retries on IPCSystem.getHandle().

NC will retry indefinitely to connect CC.

Change-Id: I0f4c15cacd265c3fbe85307af9f5c33577035447
Reviewed-on: https://asterix-gerrit.ics.uci.edu/250
Reviewed-by: Chris Hillery <ceej@lambda.nu>
Tested-by: Chris Hillery <ceej@lambda.nu>
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 02cdd70..88c05be 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -22,9 +22,7 @@
 import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Hashtable;
 import java.util.List;
@@ -37,8 +35,6 @@
 import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -260,7 +256,7 @@
         init();
 
         datasetNetworkManager.start();
-        IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
+        IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), -1);
         this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
index 8a5b979..d597741 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
@@ -19,5 +19,6 @@
     CONNECT_SENT,
     CONNECT_RECEIVED,
     CONNECTED,
+    CONNECT_FAILED,
     CLOSED,
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index 81294c2..00f1f46 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -87,18 +87,39 @@
         serverSocketChannel.close();
     }
 
-    IPCHandle getIPCHandle(InetSocketAddress remoteAddress) throws IOException, InterruptedException {
+    IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int retries) throws IOException, InterruptedException {
         IPCHandle handle;
-        synchronized (this) {
-            handle = ipcHandleMap.get(remoteAddress);
-            if (handle == null) {
-                handle = new IPCHandle(system, remoteAddress);
-                pendingConnections.add(handle);
-                networkThread.selector.wakeup();
+        int attempt = 1;
+        while (true) {
+            synchronized (this) {
+                handle = ipcHandleMap.get(remoteAddress);
+                if (handle == null) {
+                    handle = new IPCHandle(system, remoteAddress);
+                    pendingConnections.add(handle);
+                    networkThread.selector.wakeup();
+                }
+            }
+            if (handle.waitTillConnected()) {
+                return handle;
+            }
+            if (retries < 0) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Connection to " + remoteAddress + " failed, retrying...");
+                    attempt++;
+                    Thread.sleep(5000);
+                }
+            } else if (attempt < retries) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Connection to " + remoteAddress +
+                            " failed (Attempt " + attempt + " of " + retries + ")");
+                    attempt++;
+                    Thread.sleep(5000);
+                }
+            } else {
+                throw new IOException("Connection failed to " + remoteAddress);
             }
         }
-        handle.waitTillConnected();
-        return handle;
+
     }
 
     synchronized void registerHandle(IPCHandle handle) {
@@ -278,13 +299,21 @@
                                 handle.setState(HandleState.CONNECT_RECEIVED);
                             } else if (key.isConnectable()) {
                                 SocketChannel channel = (SocketChannel) sc;
-                                if (channel.finishConnect()) {
-                                    IPCHandle handle = (IPCHandle) key.attachment();
-                                    handle.setState(HandleState.CONNECT_SENT);
-                                    registerHandle(handle);
-                                    key.interestOps(SelectionKey.OP_READ);
-                                    write(createInitialReqMessage(handle));
+                                IPCHandle handle = (IPCHandle) key.attachment();
+                                try {
+                                    if (!channel.finishConnect()) {
+                                        throw new Exception("Connection did not finish");
+                                    }
                                 }
+                                catch (Exception e) {
+                                    e.printStackTrace();
+                                    handle.setState(HandleState.CONNECT_FAILED);
+                                    continue;
+                                }
+                                handle.setState(HandleState.CONNECT_SENT);
+                                registerHandle(handle);
+                                key.interestOps(SelectionKey.OP_READ);
+                                write(createInitialReqMessage(handle));
                             }
                         }
                     }
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
index 4908091..337440f 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -113,10 +113,11 @@
         notifyAll();
     }
 
-    synchronized void waitTillConnected() throws InterruptedException {
-        while (!isConnected()) {
+    synchronized boolean waitTillConnected() throws InterruptedException {
+        while (state != HandleState.CONNECTED && state != HandleState.CONNECT_FAILED) {
             wait();
         }
+        return state == HandleState.CONNECTED;
     }
 
     ByteBuffer getInBuffer() {
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index 9e7198b..35525c0 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -56,8 +56,12 @@
     }
 
     public IIPCHandle getHandle(InetSocketAddress remoteAddress) throws IPCException {
+        return getHandle(remoteAddress, 0);
+    }
+
+    public IIPCHandle getHandle(InetSocketAddress remoteAddress, int retries) throws IPCException {
         try {
-            return cMgr.getIPCHandle(remoteAddress);
+            return cMgr.getIPCHandle(remoteAddress, retries);
         } catch (IOException e) {
             throw new IPCException(e);
         } catch (InterruptedException e) {