Allow retries on IPCSystem.getHandle().
NC will retry indefinitely to connect CC.
Change-Id: I0f4c15cacd265c3fbe85307af9f5c33577035447
Reviewed-on: https://asterix-gerrit.ics.uci.edu/250
Reviewed-by: Chris Hillery <ceej@lambda.nu>
Tested-by: Chris Hillery <ceej@lambda.nu>
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 02cdd70..88c05be 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -22,9 +22,7 @@
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
@@ -37,8 +35,6 @@
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -260,7 +256,7 @@
init();
datasetNetworkManager.start();
- IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
+ IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), -1);
this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
for (int i = 0; i < gcInfos.length; ++i) {
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
index 8a5b979..d597741 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/HandleState.java
@@ -19,5 +19,6 @@
CONNECT_SENT,
CONNECT_RECEIVED,
CONNECTED,
+ CONNECT_FAILED,
CLOSED,
}
\ No newline at end of file
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index 81294c2..00f1f46 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -87,18 +87,39 @@
serverSocketChannel.close();
}
- IPCHandle getIPCHandle(InetSocketAddress remoteAddress) throws IOException, InterruptedException {
+ IPCHandle getIPCHandle(InetSocketAddress remoteAddress, int retries) throws IOException, InterruptedException {
IPCHandle handle;
- synchronized (this) {
- handle = ipcHandleMap.get(remoteAddress);
- if (handle == null) {
- handle = new IPCHandle(system, remoteAddress);
- pendingConnections.add(handle);
- networkThread.selector.wakeup();
+ int attempt = 1;
+ while (true) {
+ synchronized (this) {
+ handle = ipcHandleMap.get(remoteAddress);
+ if (handle == null) {
+ handle = new IPCHandle(system, remoteAddress);
+ pendingConnections.add(handle);
+ networkThread.selector.wakeup();
+ }
+ }
+ if (handle.waitTillConnected()) {
+ return handle;
+ }
+ if (retries < 0) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Connection to " + remoteAddress + " failed, retrying...");
+ attempt++;
+ Thread.sleep(5000);
+ }
+ } else if (attempt < retries) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Connection to " + remoteAddress +
+ " failed (Attempt " + attempt + " of " + retries + ")");
+ attempt++;
+ Thread.sleep(5000);
+ }
+ } else {
+ throw new IOException("Connection failed to " + remoteAddress);
}
}
- handle.waitTillConnected();
- return handle;
+
}
synchronized void registerHandle(IPCHandle handle) {
@@ -278,13 +299,21 @@
handle.setState(HandleState.CONNECT_RECEIVED);
} else if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) sc;
- if (channel.finishConnect()) {
- IPCHandle handle = (IPCHandle) key.attachment();
- handle.setState(HandleState.CONNECT_SENT);
- registerHandle(handle);
- key.interestOps(SelectionKey.OP_READ);
- write(createInitialReqMessage(handle));
+ IPCHandle handle = (IPCHandle) key.attachment();
+ try {
+ if (!channel.finishConnect()) {
+ throw new Exception("Connection did not finish");
+ }
}
+ catch (Exception e) {
+ e.printStackTrace();
+ handle.setState(HandleState.CONNECT_FAILED);
+ continue;
+ }
+ handle.setState(HandleState.CONNECT_SENT);
+ registerHandle(handle);
+ key.interestOps(SelectionKey.OP_READ);
+ write(createInitialReqMessage(handle));
}
}
}
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
index 4908091..337440f 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -113,10 +113,11 @@
notifyAll();
}
- synchronized void waitTillConnected() throws InterruptedException {
- while (!isConnected()) {
+ synchronized boolean waitTillConnected() throws InterruptedException {
+ while (state != HandleState.CONNECTED && state != HandleState.CONNECT_FAILED) {
wait();
}
+ return state == HandleState.CONNECTED;
}
ByteBuffer getInBuffer() {
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index 9e7198b..35525c0 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -56,8 +56,12 @@
}
public IIPCHandle getHandle(InetSocketAddress remoteAddress) throws IPCException {
+ return getHandle(remoteAddress, 0);
+ }
+
+ public IIPCHandle getHandle(InetSocketAddress remoteAddress, int retries) throws IPCException {
try {
- return cMgr.getIPCHandle(remoteAddress);
+ return cMgr.getIPCHandle(remoteAddress, retries);
} catch (IOException e) {
throw new IPCException(e);
} catch (InterruptedException e) {