Some optimizations in the network layer
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1006 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
index 1abcdf6..8d5f475 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -35,7 +35,7 @@
this.ccb = ccb;
emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
for (int i = 0; i < nBuffers; ++i) {
- emptyQueue.add(ctx.allocateFrame());
+ emptyQueue.add(ByteBuffer.allocateDirect(ctx.getFrameSize()));
}
ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
}
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index e124851..be81ab2 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -115,7 +115,7 @@
private ChannelControlBlock ccb;
public WriterState() {
- writeBuffer = ByteBuffer.allocate(MuxDemuxCommand.COMMAND_SIZE);
+ writeBuffer = ByteBuffer.allocateDirect(MuxDemuxCommand.COMMAND_SIZE);
writeBuffer.flip();
command = new MuxDemuxCommand();
ccb = null;
@@ -254,7 +254,7 @@
private ChannelControlBlock ccb;
ReaderState() {
- readBuffer = ByteBuffer.allocate(MuxDemuxCommand.COMMAND_SIZE);
+ readBuffer = ByteBuffer.allocateDirect(MuxDemuxCommand.COMMAND_SIZE);
command = new MuxDemuxCommand();
}
@@ -267,82 +267,86 @@
void driveReaderStateMachine() throws IOException, NetException {
SocketChannel sc = tcpConnection.getSocketChannel();
- if (readerState.readBuffer.remaining() > 0) {
- int read = sc.read(readerState.readBuffer);
- if (read < 0) {
- throw new NetException("Socket Closed");
- }
- muxDemux.getPerformanceCounters().addSignalingBytesRead(read);
+ boolean yield = false;
+ while (!yield) {
if (readerState.readBuffer.remaining() > 0) {
- return;
- }
- readerState.readBuffer.flip();
- readerState.command.read(readerState.readBuffer);
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Received command: " + readerState.command);
- }
- ChannelControlBlock ccb = null;
- switch (readerState.command.getCommandType()) {
- case ADD_CREDITS: {
- synchronized (MultiplexedConnection.this) {
- ccb = cSet.getCCB(readerState.command.getChannelId());
- }
- ccb.addWriteCredits(readerState.command.getData());
- break;
+ int read = sc.read(readerState.readBuffer);
+ if (read < 0) {
+ throw new NetException("Socket Closed");
}
- case CLOSE_CHANNEL: {
- synchronized (MultiplexedConnection.this) {
- ccb = cSet.getCCB(readerState.command.getChannelId());
- }
- ccb.reportRemoteEOS();
- int channelId = ccb.getChannelId();
- cSet.markEOSAck(channelId);
- cSet.unmarkPendingCredits(channelId);
- break;
+ muxDemux.getPerformanceCounters().addSignalingBytesRead(read);
+ if (readerState.readBuffer.remaining() > 0) {
+ return;
}
- case CLOSE_CHANNEL_ACK: {
- synchronized (MultiplexedConnection.this) {
- ccb = cSet.getCCB(readerState.command.getChannelId());
- }
- ccb.reportLocalEOSAck();
- break;
+ readerState.readBuffer.flip();
+ readerState.command.read(readerState.readBuffer);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Received command: " + readerState.command);
}
- case DATA: {
- synchronized (MultiplexedConnection.this) {
- ccb = cSet.getCCB(readerState.command.getChannelId());
+ ChannelControlBlock ccb = null;
+ switch (readerState.command.getCommandType()) {
+ case ADD_CREDITS: {
+ synchronized (MultiplexedConnection.this) {
+ ccb = cSet.getCCB(readerState.command.getChannelId());
+ }
+ ccb.addWriteCredits(readerState.command.getData());
+ break;
}
- readerState.pendingReadSize = readerState.command.getData();
- readerState.ccb = ccb;
- break;
- }
- case ERROR: {
- synchronized (MultiplexedConnection.this) {
- ccb = cSet.getCCB(readerState.command.getChannelId());
+ case CLOSE_CHANNEL: {
+ synchronized (MultiplexedConnection.this) {
+ ccb = cSet.getCCB(readerState.command.getChannelId());
+ }
+ ccb.reportRemoteEOS();
+ int channelId = ccb.getChannelId();
+ cSet.markEOSAck(channelId);
+ cSet.unmarkPendingCredits(channelId);
+ break;
}
- ccb.reportRemoteError(readerState.command.getData());
- int channelId = ccb.getChannelId();
- cSet.markEOSAck(channelId);
- cSet.unmarkPendingCredits(channelId);
- break;
+ case CLOSE_CHANNEL_ACK: {
+ synchronized (MultiplexedConnection.this) {
+ ccb = cSet.getCCB(readerState.command.getChannelId());
+ }
+ ccb.reportLocalEOSAck();
+ break;
+ }
+ case DATA: {
+ synchronized (MultiplexedConnection.this) {
+ ccb = cSet.getCCB(readerState.command.getChannelId());
+ }
+ readerState.pendingReadSize = readerState.command.getData();
+ readerState.ccb = ccb;
+ break;
+ }
+ case ERROR: {
+ synchronized (MultiplexedConnection.this) {
+ ccb = cSet.getCCB(readerState.command.getChannelId());
+ }
+ ccb.reportRemoteError(readerState.command.getData());
+ int channelId = ccb.getChannelId();
+ cSet.markEOSAck(channelId);
+ cSet.unmarkPendingCredits(channelId);
+ break;
+ }
+ case OPEN_CHANNEL: {
+ int channelId = readerState.command.getChannelId();
+ ccb = cSet.registerChannel(channelId);
+ muxDemux.getChannelOpenListener().channelOpened(ccb);
+ }
}
- case OPEN_CHANNEL: {
- int channelId = readerState.command.getChannelId();
- ccb = cSet.registerChannel(channelId);
- muxDemux.getChannelOpenListener().channelOpened(ccb);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Applied command: " + readerState.command + " on " + ccb);
}
}
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Applied command: " + readerState.command + " on " + ccb);
- }
- }
- if (readerState.pendingReadSize > 0) {
- int newPendingReadSize = readerState.ccb.read(sc, readerState.pendingReadSize);
- muxDemux.getPerformanceCounters().addPayloadBytesRead(readerState.pendingReadSize - newPendingReadSize);
- readerState.pendingReadSize = newPendingReadSize;
if (readerState.pendingReadSize > 0) {
- return;
+ yield = true;
+ int newPendingReadSize = readerState.ccb.read(sc, readerState.pendingReadSize);
+ muxDemux.getPerformanceCounters().addPayloadBytesRead(readerState.pendingReadSize - newPendingReadSize);
+ readerState.pendingReadSize = newPendingReadSize;
+ if (readerState.pendingReadSize > 0) {
+ return;
+ }
}
+ readerState.reset();
}
- readerState.reset();
}
}
\ No newline at end of file