Reduced object construction in message handling
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1123 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index 053ac6b..eef068b 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -25,6 +25,7 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -113,12 +114,10 @@
private synchronized void collectOutstandingWork() {
if (!pendingConnections.isEmpty()) {
- workingPendingConnections.addAll(pendingConnections);
- pendingConnections.clear();
+ moveAll(pendingConnections, workingPendingConnections);
}
if (!sendList.isEmpty()) {
- workingSendList.addAll(sendList);
- sendList.clear();
+ moveAll(sendList, workingSendList);
}
}
@@ -164,6 +163,8 @@
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
}
+ BitSet unsentMessagesBitmap = new BitSet();
+ List<Message> tempUnsentMessages = new ArrayList<Message>();
while (!stopped) {
try {
if (LOGGER.isLoggable(Level.FINE)) {
@@ -189,35 +190,40 @@
workingPendingConnections.clear();
}
if (!workingSendList.isEmpty()) {
- for (Iterator<Message> i = workingSendList.iterator(); i.hasNext();) {
- Message msg = i.next();
+ unsentMessagesBitmap.clear();
+ int len = workingSendList.size();
+ for (int i = 0; i < len; ++i) {
+ Message msg = workingSendList.get(i);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Processing send of message: " + msg);
}
IPCHandle handle = msg.getIPCHandle();
- if (handle.getState() == HandleState.CLOSED) {
- i.remove();
- } else if (!handle.full()) {
- while (true) {
- ByteBuffer buffer = handle.getOutBuffer();
- buffer.compact();
- boolean success = msg.write(buffer);
- buffer.flip();
- if (success) {
- i.remove();
- SelectionKey key = handle.getKey();
- key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
- } else {
- if (!buffer.hasRemaining()) {
- handle.resizeOutBuffer();
- continue;
+ if (handle.getState() != HandleState.CLOSED) {
+ if (!handle.full()) {
+ while (true) {
+ ByteBuffer buffer = handle.getOutBuffer();
+ buffer.compact();
+ boolean success = msg.write(buffer);
+ buffer.flip();
+ if (success) {
+ SelectionKey key = handle.getKey();
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ } else {
+ if (!buffer.hasRemaining()) {
+ handle.resizeOutBuffer();
+ continue;
+ }
+ handle.markFull();
+ unsentMessagesBitmap.set(i);
}
- handle.markFull();
+ break;
}
- break;
+ } else {
+ unsentMessagesBitmap.set(i);
}
}
}
+ copyUnsentMessages(unsentMessagesBitmap, tempUnsentMessages);
}
if (n > 0) {
for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
@@ -281,5 +287,25 @@
}
}
}
+
+ private void copyUnsentMessages(BitSet unsentMessagesBitmap, List<Message> tempUnsentMessages) {
+ tempUnsentMessages.clear();
+ for (int i = unsentMessagesBitmap.nextSetBit(0); i >= 0; i = unsentMessagesBitmap.nextSetBit(i + 1)) {
+ tempUnsentMessages.add(workingSendList.get(i));
+ }
+ workingSendList.clear();
+ int tempLen = tempUnsentMessages.size();
+ for (int i = 0; i < tempLen; ++i) {
+ workingSendList.add(tempUnsentMessages.get(i));
+ }
+ }
+ }
+
+ private <T> void moveAll(List<T> source, List<T> target) {
+ int len = source.size();
+ for (int i = 0; i < len; ++i) {
+ target.add(source.get(i));
+ }
+ source.clear();
}
}
\ No newline at end of file