Reduced object construction in message handling

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1123 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index 053ac6b..eef068b 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -25,6 +25,7 @@
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -113,12 +114,10 @@
 
     private synchronized void collectOutstandingWork() {
         if (!pendingConnections.isEmpty()) {
-            workingPendingConnections.addAll(pendingConnections);
-            pendingConnections.clear();
+            moveAll(pendingConnections, workingPendingConnections);
         }
         if (!sendList.isEmpty()) {
-            workingSendList.addAll(sendList);
-            sendList.clear();
+            moveAll(sendList, workingSendList);
         }
     }
 
@@ -164,6 +163,8 @@
             } catch (ClosedChannelException e) {
                 throw new RuntimeException(e);
             }
+            BitSet unsentMessagesBitmap = new BitSet();
+            List<Message> tempUnsentMessages = new ArrayList<Message>();
             while (!stopped) {
                 try {
                     if (LOGGER.isLoggable(Level.FINE)) {
@@ -189,35 +190,40 @@
                         workingPendingConnections.clear();
                     }
                     if (!workingSendList.isEmpty()) {
-                        for (Iterator<Message> i = workingSendList.iterator(); i.hasNext();) {
-                            Message msg = i.next();
+                        unsentMessagesBitmap.clear();
+                        int len = workingSendList.size();
+                        for (int i = 0; i < len; ++i) {
+                            Message msg = workingSendList.get(i);
                             if (LOGGER.isLoggable(Level.FINE)) {
                                 LOGGER.fine("Processing send of message: " + msg);
                             }
                             IPCHandle handle = msg.getIPCHandle();
-                            if (handle.getState() == HandleState.CLOSED) {
-                                i.remove();
-                            } else if (!handle.full()) {
-                                while (true) {
-                                    ByteBuffer buffer = handle.getOutBuffer();
-                                    buffer.compact();
-                                    boolean success = msg.write(buffer);
-                                    buffer.flip();
-                                    if (success) {
-                                        i.remove();
-                                        SelectionKey key = handle.getKey();
-                                        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                                    } else {
-                                        if (!buffer.hasRemaining()) {
-                                            handle.resizeOutBuffer();
-                                            continue;
+                            if (handle.getState() != HandleState.CLOSED) {
+                                if (!handle.full()) {
+                                    while (true) {
+                                        ByteBuffer buffer = handle.getOutBuffer();
+                                        buffer.compact();
+                                        boolean success = msg.write(buffer);
+                                        buffer.flip();
+                                        if (success) {
+                                            SelectionKey key = handle.getKey();
+                                            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+                                        } else {
+                                            if (!buffer.hasRemaining()) {
+                                                handle.resizeOutBuffer();
+                                                continue;
+                                            }
+                                            handle.markFull();
+                                            unsentMessagesBitmap.set(i);
                                         }
-                                        handle.markFull();
+                                        break;
                                     }
-                                    break;
+                                } else {
+                                    unsentMessagesBitmap.set(i);
                                 }
                             }
                         }
+                        copyUnsentMessages(unsentMessagesBitmap, tempUnsentMessages);
                     }
                     if (n > 0) {
                         for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
@@ -281,5 +287,25 @@
                 }
             }
         }
+
+        private void copyUnsentMessages(BitSet unsentMessagesBitmap, List<Message> tempUnsentMessages) {
+            tempUnsentMessages.clear();
+            for (int i = unsentMessagesBitmap.nextSetBit(0); i >= 0; i = unsentMessagesBitmap.nextSetBit(i + 1)) {
+                tempUnsentMessages.add(workingSendList.get(i));
+            }
+            workingSendList.clear();
+            int tempLen = tempUnsentMessages.size();
+            for (int i = 0; i < tempLen; ++i) {
+                workingSendList.add(tempUnsentMessages.get(i));
+            }
+        }
+    }
+
+    private <T> void moveAll(List<T> source, List<T> target) {
+        int len = source.size();
+        for (int i = 0; i < len; ++i) {
+            target.add(source.get(i));
+        }
+        source.clear();
     }
 }
\ No newline at end of file