Fixed channel cleanup.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@997 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 51ab199..a281fe8 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -24,6 +24,7 @@
 
 import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
 import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
 
 public class ChannelControlBlock {
     private final ChannelSet cSet;
@@ -36,6 +37,8 @@
 
     private final AtomicBoolean localClose;
 
+    private final AtomicBoolean localCloseAck;
+
     private final AtomicBoolean remoteClose;
 
     ChannelControlBlock(ChannelSet cSet, int channelId) {
@@ -44,6 +47,7 @@
         this.ri = new ReadInterface();
         this.wi = new WriteInterface();
         localClose = new AtomicBoolean();
+        localCloseAck = new AtomicBoolean();
         remoteClose = new AtomicBoolean();
     }
 
@@ -205,7 +209,7 @@
             return fba;
         }
 
-        void write(MultiplexedConnection.WriterState writerState) {
+        void write(MultiplexedConnection.WriterState writerState) throws NetException {
             if (currentWriteBuffer == null) {
                 currentWriteBuffer = wiFullQueue.poll();
             }
@@ -261,7 +265,7 @@
         }
     }
 
-    synchronized void write(MultiplexedConnection.WriterState writerState) {
+    synchronized void write(MultiplexedConnection.WriterState writerState) throws NetException {
         wi.write(writerState);
     }
 
@@ -291,6 +295,10 @@
         remoteClose.set(true);
     }
 
+    synchronized void reportLocalEOSAck() {
+        localCloseAck.set(true);
+    }
+
     synchronized void reportRemoteError(int ecode) {
         ri.flush();
         ri.fba.error(ecode);
@@ -298,12 +306,12 @@
     }
 
     boolean completelyClosed() {
-        return localClose.get() && remoteClose.get();
+        return localCloseAck.get() && remoteClose.get();
     }
 
     @Override
     public String toString() {
-        return "Channel:" + channelId + "[localClose: " + localClose + " remoteClose: " + remoteClose
-                + " readCredits: " + ri.credits + " writeCredits: " + wi.credits + "]";
+        return "Channel:" + channelId + "[localClose: " + localClose + " localCloseAck: " + localCloseAck
+                + " remoteClose: " + remoteClose + " readCredits: " + ri.credits + " writeCredits: " + wi.credits + "]";
     }
 }
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
index b13fc94..ab5fd40 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -16,10 +16,14 @@
 
 import java.util.Arrays;
 import java.util.BitSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.net.exceptions.NetException;
 
 public class ChannelSet {
+    private static final Logger LOGGER = Logger.getLogger(ChannelSet.class.getName());
+
     private static final int MAX_OPEN_CHANNELS = 1024;
 
     private static final int INITIAL_SIZE = 16;
@@ -36,6 +40,8 @@
 
     private final BitSet pendingChannelSynBitmap;
 
+    private final BitSet pendingEOSAckBitmap;
+
     private int openChannelCount;
 
     private final IEventCounter pendingWriteEventsCounter;
@@ -47,6 +53,7 @@
         pendingChannelWriteBitmap = new BitSet();
         pendingChannelCreditsBitmap = new BitSet();
         pendingChannelSynBitmap = new BitSet();
+        pendingEOSAckBitmap = new BitSet();
         this.pendingWriteEventsCounter = pendingWriteEventsCounter;
         openChannelCount = 0;
     }
@@ -54,10 +61,10 @@
     ChannelControlBlock allocateChannel() throws NetException {
         synchronized (mConn) {
             int idx = allocationBitmap.nextClearBit(0);
-            if (idx < 0) {
+            if (idx < 0 || idx == ccbArray.length) {
                 cleanupClosedChannels();
                 idx = allocationBitmap.nextClearBit(0);
-                if (idx < 0) {
+                if (idx < 0 || idx == ccbArray.length) {
                     idx = ccbArray.length;
                 }
             }
@@ -70,6 +77,9 @@
             ChannelControlBlock ccb = ccbArray[i];
             if (ccb != null) {
                 if (ccb.completelyClosed()) {
+                    if (LOGGER.isLoggable(Level.FINE)) {
+                        LOGGER.fine("Cleaning free channel: " + ccb);
+                    }
                     freeChannel(ccb);
                 }
             }
@@ -77,7 +87,9 @@
     }
 
     ChannelControlBlock registerChannel(int channelId) throws NetException {
-        return createChannel(channelId);
+        synchronized (mConn) {
+            return createChannel(channelId);
+        }
     }
 
     private void freeChannel(ChannelControlBlock channel) {
@@ -103,6 +115,10 @@
         return pendingChannelSynBitmap;
     }
 
+    BitSet getPendingEOSAckBitmap() {
+        return pendingEOSAckBitmap;
+    }
+
     int getOpenChannelCount() {
         return openChannelCount;
     }
@@ -132,7 +148,7 @@
         }
     }
 
-    public void unmarkPendingWrite(int channelId) {
+    void unmarkPendingWrite(int channelId) {
         synchronized (mConn) {
             assert pendingChannelWriteBitmap.get(channelId);
             pendingChannelWriteBitmap.clear(channelId);
@@ -140,6 +156,14 @@
         }
     }
 
+    void markEOSAck(int channelId) {
+        synchronized (mConn) {
+            assert !pendingEOSAckBitmap.get(channelId);
+            pendingEOSAckBitmap.set(channelId);
+            pendingWriteEventsCounter.increment();
+        }
+    }
+
     private ChannelControlBlock createChannel(int idx) throws NetException {
         if (idx >= ccbArray.length) {
             expand(idx);
@@ -147,6 +171,15 @@
         if (idx > MAX_OPEN_CHANNELS) {
             throw new NetException("More than " + MAX_OPEN_CHANNELS + " opened concurrently");
         }
+        if (ccbArray[idx] != null) {
+            assert ccbArray[idx].completelyClosed();
+            if (ccbArray[idx].completelyClosed()) {
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine("Cleaning free channel: " + ccbArray[idx]);
+                }
+                freeChannel(ccbArray[idx]);
+            }
+        }
         assert idx < ccbArray.length;
         assert !allocationBitmap.get(idx);
         ChannelControlBlock channel = new ChannelControlBlock(this, idx);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index e7e65e7..7f610de 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -86,7 +86,8 @@
     }
 
     @Override
-    public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException, NetException {
+    public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException,
+            NetException {
         if (readable) {
             driveReaderStateMachine();
         }
@@ -146,7 +147,7 @@
                     assert pendingWriteSize <= pendingBuffer.remaining();
                     int oldLimit = pendingBuffer.limit();
                     try {
-                        pendingBuffer.limit(pendingWriteSize);
+                        pendingBuffer.limit(pendingWriteSize + pendingBuffer.position());
                         int written = sc.write(pendingBuffer);
                         pendingWriteSize -= written;
                     } finally {
@@ -167,7 +168,7 @@
         }
     }
 
-    void driveWriterStateMachine() throws IOException {
+    void driveWriterStateMachine() throws IOException, NetException {
         SocketChannel sc = tcpConnection.getSocketChannel();
         if (writerState.writePending()) {
             if (!writerState.performPendingWrite(sc)) {
@@ -210,6 +211,18 @@
                         return;
                     }
                 }
+                BitSet pendingEOSAckBitmap = cSet.getPendingEOSAckBitmap();
+                for (int j = pendingEOSAckBitmap.nextSetBit(0); j >= 0; j = pendingEOSAckBitmap.nextSetBit(j)) {
+                    pendingEOSAckBitmap.clear(j);
+                    pendingWriteEventsCounter.decrement();
+                    writerState.command.setChannelId(j);
+                    writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL_ACK);
+                    writerState.command.setData(0);
+                    writerState.reset(null, 0, null);
+                    if (!writerState.performPendingWrite(sc)) {
+                        return;
+                    }
+                }
                 BitSet pendingChannelWriteBitmap = cSet.getPendingChannelWriteBitmap();
                 lastChannelWritten = pendingChannelWriteBitmap.nextSetBit(lastChannelWritten + 1);
                 if (lastChannelWritten < 0) {
@@ -262,9 +275,9 @@
             if (LOGGER.isLoggable(Level.FINE)) {
                 LOGGER.fine("Received command: " + readerState.command);
             }
+            ChannelControlBlock ccb = null;
             switch (readerState.command.getCommandType()) {
                 case ADD_CREDITS: {
-                    ChannelControlBlock ccb;
                     synchronized (MultiplexedConnection.this) {
                         ccb = cSet.getCCB(readerState.command.getChannelId());
                     }
@@ -272,15 +285,21 @@
                     break;
                 }
                 case CLOSE_CHANNEL: {
-                    ChannelControlBlock ccb;
                     synchronized (MultiplexedConnection.this) {
                         ccb = cSet.getCCB(readerState.command.getChannelId());
                     }
                     ccb.reportRemoteEOS();
+                    cSet.markEOSAck(ccb.getChannelId());
+                    break;
+                }
+                case CLOSE_CHANNEL_ACK: {
+                    synchronized (MultiplexedConnection.this) {
+                        ccb = cSet.getCCB(readerState.command.getChannelId());
+                    }
+                    ccb.reportLocalEOSAck();
                     break;
                 }
                 case DATA: {
-                    ChannelControlBlock ccb;
                     synchronized (MultiplexedConnection.this) {
                         ccb = cSet.getCCB(readerState.command.getChannelId());
                     }
@@ -289,22 +308,22 @@
                     break;
                 }
                 case ERROR: {
-                    ChannelControlBlock ccb;
                     synchronized (MultiplexedConnection.this) {
                         ccb = cSet.getCCB(readerState.command.getChannelId());
                     }
                     ccb.reportRemoteError(readerState.command.getData());
+                    cSet.markEOSAck(ccb.getChannelId());
                     break;
                 }
                 case OPEN_CHANNEL: {
                     int channelId = readerState.command.getChannelId();
-                    ChannelControlBlock ccb;
-                    synchronized (MultiplexedConnection.this) {
-                        ccb = cSet.registerChannel(channelId);
-                    }
+                    ccb = cSet.registerChannel(channelId);
                     muxDemux.getChannelOpenListener().channelOpened(ccb);
                 }
             }
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Applied command: " + readerState.command + " on " + ccb);
+            }
         }
         if (readerState.pendingReadSize > 0) {
             readerState.pendingReadSize = readerState.ccb.read(sc, readerState.pendingReadSize);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
index 8fee065..9124aad 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
@@ -2,12 +2,19 @@
 
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+
 class MuxDemuxCommand {
+    static final int MAX_CHANNEL_ID = 0x3ff;
+
     static final int COMMAND_SIZE = 4;
 
+    static final int MAX_DATA_VALUE = 0x7ffff;
+
     enum CommandType {
         OPEN_CHANNEL,
         CLOSE_CHANNEL,
+        CLOSE_CHANNEL_ACK,
         ERROR,
         ADD_CREDITS,
         DATA,
@@ -23,7 +30,10 @@
         return channelId;
     }
 
-    public void setChannelId(int channelId) {
+    public void setChannelId(int channelId) throws NetException {
+        if (channelId > MAX_CHANNEL_ID) {
+            throw new NetException("channelId " + channelId + " exceeds " + MAX_CHANNEL_ID);
+        }
         this.channelId = channelId;
     }
 
@@ -39,7 +49,10 @@
         return data;
     }
 
-    public void setData(int data) {
+    public void setData(int data) throws NetException {
+        if (channelId > MAX_DATA_VALUE) {
+            throw new NetException("data " + data + " exceeds " + MAX_DATA_VALUE);
+        }
         this.data = data;
     }