Fixed potential race condition in IPC layer. Added more logging

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1027 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index 99ddad5..3ce3475 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -43,13 +43,13 @@
 
     private final Map<InetSocketAddress, IPCHandle> ipcHandleMap;
 
-    private final List<IPCHandle>[] pendingConnections;
+    private final List<IPCHandle> pendingConnections;
 
-    private final List<Message>[] sendList;
+    private final List<IPCHandle> workingPendingConnections;
 
-    private int writerIndex;
+    private final List<Message> sendList;
 
-    private int readerIndex;
+    private final List<Message> workingSendList;
 
     private final InetSocketAddress address;
 
@@ -65,10 +65,10 @@
         socket.bind(socketAddress);
         address = new InetSocketAddress(socket.getInetAddress(), socket.getLocalPort());
         ipcHandleMap = new HashMap<InetSocketAddress, IPCHandle>();
-        pendingConnections = new ArrayList[] { new ArrayList<IPCHandle>(), new ArrayList<IPCHandle>() };
-        sendList = new ArrayList[] { new ArrayList<Message>(), new ArrayList<Message>() };
-        writerIndex = 0;
-        readerIndex = 1;
+        pendingConnections = new ArrayList<IPCHandle>();
+        workingPendingConnections = new ArrayList<IPCHandle>();
+        sendList = new ArrayList<Message>();
+        workingSendList = new ArrayList<Message>();
     }
 
     InetSocketAddress getAddress() {
@@ -91,7 +91,7 @@
             handle = ipcHandleMap.get(remoteAddress);
             if (handle == null) {
                 handle = new IPCHandle(system, remoteAddress);
-                pendingConnections[writerIndex].add(handle);
+                pendingConnections.add(handle);
                 networkThread.selector.wakeup();
             }
         }
@@ -104,14 +104,20 @@
     }
 
     synchronized void write(Message msg) {
-        sendList[writerIndex].add(msg);
+        LOGGER.fine("Enqueued message: " + msg);
+        sendList.add(msg);
         networkThread.selector.wakeup();
     }
 
-    private synchronized void swapReadersAndWriters() {
-        int temp = readerIndex;
-        readerIndex = writerIndex;
-        writerIndex = temp;
+    private synchronized void collectOutstandingWork() {
+        if (!pendingConnections.isEmpty()) {
+            workingPendingConnections.addAll(pendingConnections);
+            pendingConnections.clear();
+        }
+        if (!sendList.isEmpty()) {
+            workingSendList.addAll(sendList);
+            sendList.clear();
+        }
     }
 
     private Message createInitialReqMessage(IPCHandle handle) {
@@ -162,11 +168,9 @@
                         LOGGER.fine("Starting Select");
                     }
                     int n = selector.select();
-                    if (pendingConnections[readerIndex].isEmpty() && sendList[readerIndex].isEmpty()) {
-                        swapReadersAndWriters();
-                    }
-                    if (!pendingConnections[readerIndex].isEmpty()) {
-                        for (IPCHandle handle : pendingConnections[readerIndex]) {
+                    collectOutstandingWork();
+                    if (!workingPendingConnections.isEmpty()) {
+                        for (IPCHandle handle : workingPendingConnections) {
                             SocketChannel channel = SocketChannel.open();
                             channel.configureBlocking(false);
                             SelectionKey cKey = null;
@@ -180,11 +184,12 @@
                             handle.setKey(cKey);
                             cKey.attach(handle);
                         }
-                        pendingConnections[readerIndex].clear();
+                        workingPendingConnections.clear();
                     }
-                    if (!sendList[readerIndex].isEmpty()) {
-                        for (Iterator<Message> i = sendList[readerIndex].iterator(); i.hasNext();) {
+                    if (!workingSendList.isEmpty()) {
+                        for (Iterator<Message> i = workingSendList.iterator(); i.hasNext();) {
                             Message msg = i.next();
+                            LOGGER.fine("Processing send of message: " + msg);
                             IPCHandle handle = msg.getIPCHandle();
                             if (handle.getState() == HandleState.CLOSED) {
                                 i.remove();
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
index ab3428e..6bff56f 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
@@ -125,4 +125,9 @@
         }
         return false;
     }
+
+    @Override
+    public String toString() {
+        return "MSG[" + messageId + ":" + requestMessageId + ":" + flag + ":" + payload + "]";
+    }
 }
\ No newline at end of file