Some optimizations in the network layer

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1006 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
index 1abcdf6..8d5f475 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -35,7 +35,7 @@
         this.ccb = ccb;
         emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
         for (int i = 0; i < nBuffers; ++i) {
-            emptyQueue.add(ctx.allocateFrame());
+            emptyQueue.add(ByteBuffer.allocateDirect(ctx.getFrameSize()));
         }
         ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
     }
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index e124851..be81ab2 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -115,7 +115,7 @@
         private ChannelControlBlock ccb;
 
         public WriterState() {
-            writeBuffer = ByteBuffer.allocate(MuxDemuxCommand.COMMAND_SIZE);
+            writeBuffer = ByteBuffer.allocateDirect(MuxDemuxCommand.COMMAND_SIZE);
             writeBuffer.flip();
             command = new MuxDemuxCommand();
             ccb = null;
@@ -254,7 +254,7 @@
         private ChannelControlBlock ccb;
 
         ReaderState() {
-            readBuffer = ByteBuffer.allocate(MuxDemuxCommand.COMMAND_SIZE);
+            readBuffer = ByteBuffer.allocateDirect(MuxDemuxCommand.COMMAND_SIZE);
             command = new MuxDemuxCommand();
         }
 
@@ -267,82 +267,86 @@
 
     void driveReaderStateMachine() throws IOException, NetException {
         SocketChannel sc = tcpConnection.getSocketChannel();
-        if (readerState.readBuffer.remaining() > 0) {
-            int read = sc.read(readerState.readBuffer);
-            if (read < 0) {
-                throw new NetException("Socket Closed");
-            }
-            muxDemux.getPerformanceCounters().addSignalingBytesRead(read);
+        boolean yield = false;
+        while (!yield) {
             if (readerState.readBuffer.remaining() > 0) {
-                return;
-            }
-            readerState.readBuffer.flip();
-            readerState.command.read(readerState.readBuffer);
-            if (LOGGER.isLoggable(Level.FINE)) {
-                LOGGER.fine("Received command: " + readerState.command);
-            }
-            ChannelControlBlock ccb = null;
-            switch (readerState.command.getCommandType()) {
-                case ADD_CREDITS: {
-                    synchronized (MultiplexedConnection.this) {
-                        ccb = cSet.getCCB(readerState.command.getChannelId());
-                    }
-                    ccb.addWriteCredits(readerState.command.getData());
-                    break;
+                int read = sc.read(readerState.readBuffer);
+                if (read < 0) {
+                    throw new NetException("Socket Closed");
                 }
-                case CLOSE_CHANNEL: {
-                    synchronized (MultiplexedConnection.this) {
-                        ccb = cSet.getCCB(readerState.command.getChannelId());
-                    }
-                    ccb.reportRemoteEOS();
-                    int channelId = ccb.getChannelId();
-                    cSet.markEOSAck(channelId);
-                    cSet.unmarkPendingCredits(channelId);
-                    break;
+                muxDemux.getPerformanceCounters().addSignalingBytesRead(read);
+                if (readerState.readBuffer.remaining() > 0) {
+                    return;
                 }
-                case CLOSE_CHANNEL_ACK: {
-                    synchronized (MultiplexedConnection.this) {
-                        ccb = cSet.getCCB(readerState.command.getChannelId());
-                    }
-                    ccb.reportLocalEOSAck();
-                    break;
+                readerState.readBuffer.flip();
+                readerState.command.read(readerState.readBuffer);
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine("Received command: " + readerState.command);
                 }
-                case DATA: {
-                    synchronized (MultiplexedConnection.this) {
-                        ccb = cSet.getCCB(readerState.command.getChannelId());
+                ChannelControlBlock ccb = null;
+                switch (readerState.command.getCommandType()) {
+                    case ADD_CREDITS: {
+                        synchronized (MultiplexedConnection.this) {
+                            ccb = cSet.getCCB(readerState.command.getChannelId());
+                        }
+                        ccb.addWriteCredits(readerState.command.getData());
+                        break;
                     }
-                    readerState.pendingReadSize = readerState.command.getData();
-                    readerState.ccb = ccb;
-                    break;
-                }
-                case ERROR: {
-                    synchronized (MultiplexedConnection.this) {
-                        ccb = cSet.getCCB(readerState.command.getChannelId());
+                    case CLOSE_CHANNEL: {
+                        synchronized (MultiplexedConnection.this) {
+                            ccb = cSet.getCCB(readerState.command.getChannelId());
+                        }
+                        ccb.reportRemoteEOS();
+                        int channelId = ccb.getChannelId();
+                        cSet.markEOSAck(channelId);
+                        cSet.unmarkPendingCredits(channelId);
+                        break;
                     }
-                    ccb.reportRemoteError(readerState.command.getData());
-                    int channelId = ccb.getChannelId();
-                    cSet.markEOSAck(channelId);
-                    cSet.unmarkPendingCredits(channelId);
-                    break;
+                    case CLOSE_CHANNEL_ACK: {
+                        synchronized (MultiplexedConnection.this) {
+                            ccb = cSet.getCCB(readerState.command.getChannelId());
+                        }
+                        ccb.reportLocalEOSAck();
+                        break;
+                    }
+                    case DATA: {
+                        synchronized (MultiplexedConnection.this) {
+                            ccb = cSet.getCCB(readerState.command.getChannelId());
+                        }
+                        readerState.pendingReadSize = readerState.command.getData();
+                        readerState.ccb = ccb;
+                        break;
+                    }
+                    case ERROR: {
+                        synchronized (MultiplexedConnection.this) {
+                            ccb = cSet.getCCB(readerState.command.getChannelId());
+                        }
+                        ccb.reportRemoteError(readerState.command.getData());
+                        int channelId = ccb.getChannelId();
+                        cSet.markEOSAck(channelId);
+                        cSet.unmarkPendingCredits(channelId);
+                        break;
+                    }
+                    case OPEN_CHANNEL: {
+                        int channelId = readerState.command.getChannelId();
+                        ccb = cSet.registerChannel(channelId);
+                        muxDemux.getChannelOpenListener().channelOpened(ccb);
+                    }
                 }
-                case OPEN_CHANNEL: {
-                    int channelId = readerState.command.getChannelId();
-                    ccb = cSet.registerChannel(channelId);
-                    muxDemux.getChannelOpenListener().channelOpened(ccb);
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine("Applied command: " + readerState.command + " on " + ccb);
                 }
             }
-            if (LOGGER.isLoggable(Level.FINE)) {
-                LOGGER.fine("Applied command: " + readerState.command + " on " + ccb);
-            }
-        }
-        if (readerState.pendingReadSize > 0) {
-            int newPendingReadSize = readerState.ccb.read(sc, readerState.pendingReadSize);
-            muxDemux.getPerformanceCounters().addPayloadBytesRead(readerState.pendingReadSize - newPendingReadSize);
-            readerState.pendingReadSize = newPendingReadSize;
             if (readerState.pendingReadSize > 0) {
-                return;
+                yield = true;
+                int newPendingReadSize = readerState.ccb.read(sc, readerState.pendingReadSize);
+                muxDemux.getPerformanceCounters().addPayloadBytesRead(readerState.pendingReadSize - newPendingReadSize);
+                readerState.pendingReadSize = newPendingReadSize;
+                if (readerState.pendingReadSize > 0) {
+                    return;
+                }
             }
+            readerState.reset();
         }
-        readerState.reset();
     }
 }
\ No newline at end of file