[NO ISSUE][NET] IPC Connections Improvements
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Refactor IPCConnectionManager.
- Halt on IPC message serialization failures.
- Ensure channels and handles are closed on connection
failures.
- Remove IPCHandle unneeded CONNECT_FAILED state.
- Fix RegisterNodeWork failure handling.
- Consistently use NodeControllerRemoteProxy for NC RPC.
Change-Id: I4049b16573c13fcdb1b12c0b6b2a97ee1fcc709e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2617
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index b44a6bb8..2b03324 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -105,9 +105,7 @@
failNode(nodeId);
}
try {
- // TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
- IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress());
- ncIPCHandle.send(-1, new AbortCCJobsFunction(ccConfig.getCcId()), null);
+ ncState.getNodeController().abortJobs(ccs.getCcId());
} catch (IPCException e) {
throw HyracksDataException.create(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index de7d941..00693df 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -28,11 +28,8 @@
import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.common.controllers.NodeParameters;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
-import org.apache.hyracks.control.common.ipc.CCNCFunctions;
import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
import org.apache.hyracks.control.common.work.SynchronizableWork;
-import org.apache.hyracks.ipc.api.IIPCHandle;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,36 +49,33 @@
@Override
protected void doRun() throws Exception {
String id = reg.getNodeId();
- // TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
- IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
- CCNCFunctions.NodeRegistrationResult result;
- Map<IOption, Object> ncConfiguration = new HashMap<>();
+ LOGGER.warn("Registering node: {}", id);
+ NodeControllerRemoteProxy nc = new NodeControllerRemoteProxy(ccs.getCcId(),
+ ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
+ INodeManager nodeManager = ccs.getNodeManager();
try {
- LOGGER.log(Level.WARN, "Registering INodeController: id = " + id);
- NodeControllerRemoteProxy nc = new NodeControllerRemoteProxy(ccs.getCcId(),
- ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
NodeControllerState state = new NodeControllerState(nc, reg);
- INodeManager nodeManager = ccs.getNodeManager();
nodeManager.addNode(id, state);
IApplicationConfig cfg = state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id);
+ final Map<IOption, Object> ncConfiguration = new HashMap<>();
for (IOption option : cfg.getOptions()) {
ncConfiguration.put(option, cfg.get(option));
}
- LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
+ LOGGER.warn("Registered node: {}", id);
NodeParameters params = new NodeParameters();
params.setClusterControllerInfo(ccs.getClusterControllerInfo());
params.setDistributedState(ccs.getContext().getDistributedState());
params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
params.setRegistrationId(registrationId);
- result = new CCNCFunctions.NodeRegistrationResult(params, null);
+ LOGGER.warn("sending registration response to node {}", id);
+ nc.sendRegistrationResult(params, null);
+ LOGGER.warn("notifying node {} joined", id);
+ ccs.getContext().notifyNodeJoin(id, ncConfiguration);
} catch (Exception e) {
- LOGGER.log(Level.WARN, "Node registration failed", e);
- result = new CCNCFunctions.NodeRegistrationResult(null, e);
+ LOGGER.error("Node {} registration failed", id, e);
+ nodeManager.removeNode(id);
+ nc.sendRegistrationResult(null, e);
}
- LOGGER.warn("sending registration response to node");
- ncIPCHandle.send(-1, result, null);
- LOGGER.warn("notifying node join");
- ccs.getContext().notifyNodeJoin(id, ncConfiguration);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index 9959e34..cc2ed46 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -134,7 +134,6 @@
IPCSystem ipcSystem = Mockito.mock(IPCSystem.class);
IIPCHandle ipcHandle = Mockito.mock(IIPCHandle.class);
Mockito.when(ccs.getClusterIPC()).thenReturn(ipcSystem);
- Mockito.when(ipcSystem.getHandle(Mockito.any())).thenReturn(ipcHandle);
Mockito.when(ipcSystem.getHandle(Mockito.any(), Mockito.anyInt())).thenReturn(ipcHandle);
Mockito.when(ccs.getExecutor()).thenReturn(Executors.newCachedThreadPool());
return ccs;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 92764a7..78cd44d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -24,6 +24,7 @@
import java.util.Set;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
@@ -33,7 +34,9 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.control.common.controllers.NodeParameters;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+import org.apache.hyracks.ipc.exceptions.IPCException;
public interface INodeController {
void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
@@ -62,4 +65,22 @@
void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
void takeThreadDump(String requestId) throws Exception;
+
+ /**
+ * Sends a request to this {@link INodeController} to abort all jobs
+ * started by cluster controller with id {@code ccId}
+ *
+ * @param ccId
+ * @throws IPCException
+ */
+ void abortJobs(CcId ccId) throws IPCException;
+
+ /**
+ * Sends node registration result to this {@link INodeController}.
+ *
+ * @param parameters
+ * @param regFailure
+ * @throws IPCException
+ */
+ void sendRegistrationResult(NodeParameters parameters, Exception regFailure) throws IPCException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index b78e53f..d6867eb 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.common.base.INodeController;
+import org.apache.hyracks.control.common.controllers.NodeParameters;
import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortTasksFunction;
import org.apache.hyracks.control.common.ipc.CCNCFunctions.CleanupJobletFunction;
import org.apache.hyracks.control.common.ipc.CCNCFunctions.DeployBinaryFunction;
@@ -50,6 +51,7 @@
import org.apache.hyracks.control.common.ipc.CCNCFunctions.UndeployJobSpecFunction;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
public class NodeControllerRemoteProxy implements INodeController {
private final CcId ccId;
@@ -136,6 +138,16 @@
ipcHandle.send(-1, fn, null);
}
+ @Override
+ public void abortJobs(CcId ccId) throws IPCException {
+ ipcHandle.send(-1, new CCNCFunctions.AbortCCJobsFunction(ccId), null);
+ }
+
+ @Override
+ public void sendRegistrationResult(NodeParameters parameters, Exception regFailure) throws IPCException {
+ ipcHandle.send(-1, new CCNCFunctions.NodeRegistrationResult(parameters, regFailure), null);
+ }
+
public InetSocketAddress getAddress() {
return ipcHandle.getRemoteAddress();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
index 33d1d60..6d4f173 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -46,7 +46,7 @@
public void run() {
NCServiceContext ctx = ncs.getContext();
try {
- IMessage data = (IMessage) DeploymentUtils.deserialize(message, deploymentId, ctx);;
+ IMessage data = (IMessage) DeploymentUtils.deserialize(message, deploymentId, ctx);
if (ctx.getMessageBroker() != null) {
ctx.getMessageBroker().receivedMessage(data, nodeId);
} else {
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
index 7f59db1..1f9ec37 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
@@ -41,10 +41,6 @@
</properties>
<dependencies>
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HandleState.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HandleState.java
index 912c267..39a29af 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HandleState.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HandleState.java
@@ -23,6 +23,5 @@
CONNECT_SENT,
CONNECT_RECEIVED,
CONNECTED,
- CONNECT_FAILED,
CLOSED,
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 3e6c64b..040fe03 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -18,12 +18,13 @@
*/
package org.apache.hyracks.ipc.impl;
+import static org.apache.hyracks.util.ExitUtil.EC_IMMEDIATE_HALT;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
-import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@@ -39,7 +40,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.IOUtils;
+import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.NetworkUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -100,7 +101,7 @@
void stop() {
stopped = true;
- IOUtils.closeQuietly(serverSocketChannel);
+ NetworkUtil.closeQuietly(serverSocketChannel);
networkThread.selector.wakeup();
}
@@ -121,8 +122,10 @@
return handle;
}
if (maxRetries < 0 || retries++ < maxRetries) {
- LOGGER.warn("Connection to " + remoteAddress + " failed; retrying" + (maxRetries <= 0 ? ""
- : " (retry attempt " + retries + " of " + maxRetries + ") after " + delay + "ms"));
+ if (LOGGER.isWarnEnabled()) {
+ LOGGER.warn("Connection to " + remoteAddress + " failed; retrying" + (maxRetries <= 0 ? ""
+ : " (retry attempt " + retries + " of " + maxRetries + ") after " + delay + "ms"));
+ }
Thread.sleep(delay);
delay = Math.min(MAX_RETRY_DELAY_MILLIS, (int) (delay * 1.5));
} else {
@@ -144,24 +147,6 @@
networkThread.selector.wakeup();
}
- private synchronized void collectOutstandingWork() {
- if (!pendingConnections.isEmpty()) {
- moveAll(pendingConnections, workingPendingConnections);
- }
- if (!sendList.isEmpty()) {
- moveAll(sendList, workingSendList);
- }
- }
-
- private Message createInitialReqMessage(IPCHandle handle) {
- Message msg = new Message(handle);
- msg.setMessageId(system.createMessageId());
- msg.setRequestMessageId(-1);
- msg.setFlag(Message.INITIAL_REQ);
- msg.setPayload(address);
- return msg;
- }
-
private Message createInitialAckMessage(IPCHandle handle, Message req) {
Message msg = new Message(handle);
msg.setMessageId(system.createMessageId());
@@ -177,16 +162,18 @@
private class NetworkThread extends Thread {
private final Selector selector;
-
private final Set<SocketChannel> openChannels = new HashSet<>();
+ private final BitSet unsentMessagesBitmap = new BitSet();
+ private final List<Message> tempUnsentMessages = new ArrayList<>();
- public NetworkThread() {
+ NetworkThread() {
super("IPC Network Listener Thread [" + address + "]");
setDaemon(true);
try {
selector = Selector.open();
+ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw new IllegalStateException(e);
}
}
@@ -200,105 +187,19 @@
}
private void doRun() {
- try {
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- } catch (ClosedChannelException e) {
- throw new RuntimeException(e);
- }
- BitSet unsentMessagesBitmap = new BitSet();
- List<Message> tempUnsentMessages = new ArrayList<>();
int failingLoops = 0;
while (!stopped) {
try {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Starting Select");
- }
int n = selector.select();
collectOutstandingWork();
if (!workingPendingConnections.isEmpty()) {
- for (IPCHandle handle : workingPendingConnections) {
- SocketChannel channel = SocketChannel.open();
- register(channel);
- SelectionKey cKey;
- if (channel.connect(handle.getRemoteAddress())) {
- cKey = channel.register(selector, SelectionKey.OP_READ);
- handle.setState(HandleState.CONNECT_SENT);
- IPCConnectionManager.this.write(createInitialReqMessage(handle));
- } else {
- cKey = channel.register(selector, SelectionKey.OP_CONNECT);
- }
- handle.setKey(cKey);
- cKey.attach(handle);
- }
- workingPendingConnections.clear();
+ establishPendingConnections();
}
if (!workingSendList.isEmpty()) {
- unsentMessagesBitmap.clear();
- int len = workingSendList.size();
- for (int i = 0; i < len; ++i) {
- Message msg = workingSendList.get(i);
- LOGGER.debug(() -> "Processing send of message: " + msg);
- IPCHandle handle = msg.getIPCHandle();
- if (handle.getState() != HandleState.CLOSED) {
- if (!handle.full()) {
- while (true) {
- ByteBuffer buffer = handle.getOutBuffer();
- buffer.compact();
- boolean success = msg.write(buffer);
- buffer.flip();
- if (success) {
- system.getPerformanceCounters().addMessageSentCount(1);
- SelectionKey key = handle.getKey();
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- } else {
- if (!buffer.hasRemaining()) {
- handle.resizeOutBuffer();
- continue;
- }
- handle.markFull();
- unsentMessagesBitmap.set(i);
- }
- break;
- }
- } else {
- unsentMessagesBitmap.set(i);
- }
- }
- }
- copyUnsentMessages(unsentMessagesBitmap, tempUnsentMessages);
+ sendPendingMessages();
}
if (n > 0) {
- for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
- SelectionKey key = i.next();
- i.remove();
- final SelectableChannel sc = key.channel();
- if (key.isReadable()) {
- read(key);
- } else if (key.isWritable()) {
- write(key);
- } else if (key.isAcceptable()) {
- assert sc == serverSocketChannel;
- SocketChannel channel = serverSocketChannel.accept();
- register(channel);
- IPCHandle handle = new IPCHandle(system, null);
- SelectionKey cKey = channel.register(selector, SelectionKey.OP_READ);
- handle.setKey(cKey);
- cKey.attach(handle);
- handle.setState(HandleState.CONNECT_RECEIVED);
- } else if (key.isConnectable()) {
- SocketChannel channel = (SocketChannel) sc;
- IPCHandle handle = (IPCHandle) key.attachment();
- if (!finishConnect(channel)) {
- handle.setState(HandleState.CONNECT_FAILED);
- continue;
- }
-
- handle.setState(HandleState.CONNECT_SENT);
- registerHandle(handle);
- key.interestOps(SelectionKey.OP_READ);
- IPCConnectionManager.this.write(createInitialReqMessage(handle));
- }
- }
+ processSelectedKeys();
}
// reset failingLoops on a good loop
failingLoops = 0;
@@ -314,25 +215,146 @@
}
}
- private void cleanup() {
- for (Channel channel : openChannels) {
- IOUtils.closeQuietly(channel);
+ private void processSelectedKeys() {
+ for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
+ SelectionKey key = i.next();
+ i.remove();
+ final SelectableChannel sc = key.channel();
+ if (key.isReadable()) {
+ read(key);
+ } else if (key.isWritable()) {
+ write(key);
+ } else if (key.isAcceptable()) {
+ assert sc == serverSocketChannel;
+ accept();
+ } else if (key.isConnectable()) {
+ finishConnect(key);
+ }
}
- openChannels.clear();
- IOUtils.closeQuietly(selector);
}
- private boolean finishConnect(SocketChannel channel) {
- boolean connectFinished = false;
+ private void finishConnect(SelectionKey connectableKey) {
+ SocketChannel channel = (SocketChannel) connectableKey.channel();
+ IPCHandle handle = (IPCHandle) connectableKey.attachment();
+ boolean connected = false;
try {
- connectFinished = channel.finishConnect();
- if (!connectFinished) {
- LOGGER.log(Level.WARN, "Channel connect did not finish");
+ connected = channel.finishConnect();
+ if (connected) {
+ connectableKey.interestOps(SelectionKey.OP_READ);
+ connectionEstablished(handle);
}
} catch (IOException e) {
- LOGGER.log(Level.WARN, "Exception finishing channel connect", e);
+ LOGGER.warn("Exception finishing connect", e);
+ } finally {
+ if (!connected) {
+ LOGGER.warn("Failed to finish connect to {}", handle.getRemoteAddress());
+ close(connectableKey, channel);
+ }
}
- return connectFinished;
+ }
+
+ private void accept() {
+ SocketChannel channel = null;
+ SelectionKey channelKey = null;
+ try {
+ channel = serverSocketChannel.accept();
+ register(channel);
+ channelKey = channel.register(selector, SelectionKey.OP_READ);
+ IPCHandle handle = new IPCHandle(system, null);
+ handle.setKey(channelKey);
+ channelKey.attach(handle);
+ handle.setState(HandleState.CONNECT_RECEIVED);
+ } catch (IOException e) {
+ LOGGER.error("Failed to accept channel ", e);
+ close(channelKey, channel);
+ }
+ }
+
+ private void establishPendingConnections() {
+ for (IPCHandle handle : workingPendingConnections) {
+ SocketChannel channel = null;
+ SelectionKey channelKey = null;
+ try {
+ channel = SocketChannel.open();
+ register(channel);
+ if (channel.connect(handle.getRemoteAddress())) {
+ channelKey = channel.register(selector, SelectionKey.OP_READ);
+ connectionEstablished(handle);
+ } else {
+ channelKey = channel.register(selector, SelectionKey.OP_CONNECT);
+ }
+ handle.setKey(channelKey);
+ channelKey.attach(handle);
+ } catch (IOException e) {
+ LOGGER.error("Failed to accept channel ", e);
+ close(channelKey, channel);
+ }
+ }
+ workingPendingConnections.clear();
+ }
+
+ private void connectionEstablished(IPCHandle handle) {
+ handle.setState(HandleState.CONNECT_SENT);
+ registerHandle(handle);
+ IPCConnectionManager.this.write(createInitialReqMessage(handle));
+ }
+
+ private void sendPendingMessages() {
+ unsentMessagesBitmap.clear();
+ int len = workingSendList.size();
+ for (int i = 0; i < len; ++i) {
+ Message msg = workingSendList.get(i);
+ final boolean sent = sendMessage(msg);
+ if (!sent) {
+ unsentMessagesBitmap.set(i);
+ }
+ }
+ copyUnsentMessages(unsentMessagesBitmap, tempUnsentMessages);
+ }
+
+ private boolean sendMessage(Message msg) {
+ LOGGER.debug("Processing send of message: {}", msg);
+ IPCHandle handle = msg.getIPCHandle();
+ if (handle.getState() == HandleState.CLOSED) {
+ // message will never be sent
+ return true;
+ }
+ if (handle.full()) {
+ return false;
+ }
+ try {
+ while (true) {
+ ByteBuffer buffer = handle.getOutBuffer();
+ buffer.compact();
+ boolean success = msg.write(buffer);
+ buffer.flip();
+ if (success) {
+ system.getPerformanceCounters().addMessageSentCount(1);
+ SelectionKey key = handle.getKey();
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ return true;
+ } else {
+ if (!buffer.hasRemaining()) {
+ handle.resizeOutBuffer();
+ continue;
+ }
+ handle.markFull();
+ return false;
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.fatal("Unrecoverable networking failure; Halting...", e);
+ ExitUtil.halt(EC_IMMEDIATE_HALT);
+ }
+ return false;
+ }
+
+ private void cleanup() {
+ for (Channel channel : openChannels) {
+ NetworkUtil.closeQuietly(channel);
+ }
+ openChannels.clear();
+ NetworkUtil.closeQuietly(selector);
}
private void copyUnsentMessages(BitSet unsentMessagesBitmap, List<Message> tempUnsentMessages) {
@@ -396,19 +418,42 @@
}
private void close(SelectionKey key, SocketChannel sc) {
- key.cancel();
- NetworkUtil.closeQuietly(sc);
- openChannels.remove(sc);
- final IPCHandle handle = (IPCHandle) key.attachment();
- handle.close();
+ if (key != null) {
+ final Object attachment = key.attachment();
+ if (attachment != null) {
+ ((IPCHandle) attachment).close();
+ }
+ key.cancel();
+ }
+ if (sc != null) {
+ NetworkUtil.closeQuietly(sc);
+ openChannels.remove(sc);
+ }
}
- }
- private <T> void moveAll(List<T> source, List<T> target) {
- int len = source.size();
- for (int i = 0; i < len; ++i) {
- target.add(source.get(i));
+ private void collectOutstandingWork() {
+ synchronized (IPCConnectionManager.this) {
+ if (!pendingConnections.isEmpty()) {
+ moveAll(pendingConnections, workingPendingConnections);
+ }
+ if (!sendList.isEmpty()) {
+ moveAll(sendList, workingSendList);
+ }
+ }
}
- source.clear();
+
+ private Message createInitialReqMessage(IPCHandle handle) {
+ Message msg = new Message(handle);
+ msg.setMessageId(system.createMessageId());
+ msg.setRequestMessageId(-1);
+ msg.setFlag(Message.INITIAL_REQ);
+ msg.setPayload(address);
+ return msg;
+ }
+
+ private <T> void moveAll(List<T> source, List<T> target) {
+ target.addAll(source);
+ source.clear();
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
index d63bfbd..09c7c97 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
@@ -62,10 +62,6 @@
return system;
}
- void setRemoteAddress(InetSocketAddress remoteAddress) {
- this.remoteAddress = remoteAddress;
- }
-
@Override
public long send(long requestId, Object req, Exception exception) throws IPCException {
if (!isConnected()) {
@@ -127,7 +123,6 @@
wait();
break;
case CONNECTED:
- case CONNECT_FAILED:
case CLOSED:
return state == HandleState.CONNECTED;
default:
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index b36e645..b7dcf05 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -66,10 +66,6 @@
cMgr.stop();
}
- public IIPCHandle getHandle(InetSocketAddress remoteAddress) throws IPCException {
- return getHandle(remoteAddress, 0);
- }
-
public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries) throws IPCException {
return getHandle(remoteAddress, maxRetries, 0);
}
@@ -78,16 +74,6 @@
return getReconnectingHandle(remoteAddress, 1);
}
- public IIPCHandle getReconnectingHandle(InetSocketAddress remoteAddress, int reconnectAttempts)
- throws IPCException {
- return getHandle(remoteAddress, 0, reconnectAttempts, NoOpIPCEventListener.INSTANCE);
- }
-
- public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries, int reconnectAttempts)
- throws IPCException {
- return getHandle(remoteAddress, maxRetries, reconnectAttempts, NoOpIPCEventListener.INSTANCE);
- }
-
public IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries, int reconnectAttempts,
IIPCEventListener eventListener) throws IPCException {
if (reconnectAttempts > 0) {
@@ -132,4 +118,14 @@
public IPCPerformanceCounters getPerformanceCounters() {
return perfCounters;
}
+
+ private IIPCHandle getReconnectingHandle(InetSocketAddress remoteAddress, int reconnectAttempts)
+ throws IPCException {
+ return getHandle(remoteAddress, 0, reconnectAttempts, NoOpIPCEventListener.INSTANCE);
+ }
+
+ private IIPCHandle getHandle(InetSocketAddress remoteAddress, int maxRetries, int reconnectAttempts)
+ throws IPCException {
+ return getHandle(remoteAddress, maxRetries, reconnectAttempts, NoOpIPCEventListener.INSTANCE);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
index cca3abe..70a0e18 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
@@ -43,7 +43,7 @@
IPCSystem client = createClientIPCSystem(rpci);
client.start();
- IIPCHandle handle = client.getHandle(serverAddr);
+ IIPCHandle handle = client.getHandle(serverAddr, 0);
for (int i = 0; i < 100; ++i) {
Assert.assertEquals(rpci.call(handle, Integer.valueOf(i)), Integer.valueOf(2 * i));
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
index 06aeef5..1863b28 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.util;
+import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
@@ -45,12 +46,12 @@
sc.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
}
- public static void closeQuietly(SocketChannel sc) {
- if (sc.isOpen()) {
+ public static void closeQuietly(Closeable closeable) {
+ if (closeable != null) {
try {
- sc.close();
+ closeable.close();
} catch (IOException e) {
- LOGGER.warn("Failed to close socket", e);
+ LOGGER.warn("Failed to close", e);
}
}
}