Fixed potential race condition in IPC layer. Added more logging
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1027 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index 99ddad5..3ce3475 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -43,13 +43,13 @@
private final Map<InetSocketAddress, IPCHandle> ipcHandleMap;
- private final List<IPCHandle>[] pendingConnections;
+ private final List<IPCHandle> pendingConnections;
- private final List<Message>[] sendList;
+ private final List<IPCHandle> workingPendingConnections;
- private int writerIndex;
+ private final List<Message> sendList;
- private int readerIndex;
+ private final List<Message> workingSendList;
private final InetSocketAddress address;
@@ -65,10 +65,10 @@
socket.bind(socketAddress);
address = new InetSocketAddress(socket.getInetAddress(), socket.getLocalPort());
ipcHandleMap = new HashMap<InetSocketAddress, IPCHandle>();
- pendingConnections = new ArrayList[] { new ArrayList<IPCHandle>(), new ArrayList<IPCHandle>() };
- sendList = new ArrayList[] { new ArrayList<Message>(), new ArrayList<Message>() };
- writerIndex = 0;
- readerIndex = 1;
+ pendingConnections = new ArrayList<IPCHandle>();
+ workingPendingConnections = new ArrayList<IPCHandle>();
+ sendList = new ArrayList<Message>();
+ workingSendList = new ArrayList<Message>();
}
InetSocketAddress getAddress() {
@@ -91,7 +91,7 @@
handle = ipcHandleMap.get(remoteAddress);
if (handle == null) {
handle = new IPCHandle(system, remoteAddress);
- pendingConnections[writerIndex].add(handle);
+ pendingConnections.add(handle);
networkThread.selector.wakeup();
}
}
@@ -104,14 +104,20 @@
}
synchronized void write(Message msg) {
- sendList[writerIndex].add(msg);
+ LOGGER.fine("Enqueued message: " + msg);
+ sendList.add(msg);
networkThread.selector.wakeup();
}
- private synchronized void swapReadersAndWriters() {
- int temp = readerIndex;
- readerIndex = writerIndex;
- writerIndex = temp;
+ private synchronized void collectOutstandingWork() {
+ if (!pendingConnections.isEmpty()) {
+ workingPendingConnections.addAll(pendingConnections);
+ pendingConnections.clear();
+ }
+ if (!sendList.isEmpty()) {
+ workingSendList.addAll(sendList);
+ sendList.clear();
+ }
}
private Message createInitialReqMessage(IPCHandle handle) {
@@ -162,11 +168,9 @@
LOGGER.fine("Starting Select");
}
int n = selector.select();
- if (pendingConnections[readerIndex].isEmpty() && sendList[readerIndex].isEmpty()) {
- swapReadersAndWriters();
- }
- if (!pendingConnections[readerIndex].isEmpty()) {
- for (IPCHandle handle : pendingConnections[readerIndex]) {
+ collectOutstandingWork();
+ if (!workingPendingConnections.isEmpty()) {
+ for (IPCHandle handle : workingPendingConnections) {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
SelectionKey cKey = null;
@@ -180,11 +184,12 @@
handle.setKey(cKey);
cKey.attach(handle);
}
- pendingConnections[readerIndex].clear();
+ workingPendingConnections.clear();
}
- if (!sendList[readerIndex].isEmpty()) {
- for (Iterator<Message> i = sendList[readerIndex].iterator(); i.hasNext();) {
+ if (!workingSendList.isEmpty()) {
+ for (Iterator<Message> i = workingSendList.iterator(); i.hasNext();) {
Message msg = i.next();
+ LOGGER.fine("Processing send of message: " + msg);
IPCHandle handle = msg.getIPCHandle();
if (handle.getState() == HandleState.CLOSED) {
i.remove();
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
index ab3428e..6bff56f 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
@@ -125,4 +125,9 @@
}
return false;
}
+
+ @Override
+ public String toString() {
+ return "MSG[" + messageId + ":" + requestMessageId + ":" + flag + ":" + payload + "]";
+ }
}
\ No newline at end of file