Fixed channel cleanup.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@997 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 51ab199..a281fe8 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
public class ChannelControlBlock {
private final ChannelSet cSet;
@@ -36,6 +37,8 @@
private final AtomicBoolean localClose;
+ private final AtomicBoolean localCloseAck;
+
private final AtomicBoolean remoteClose;
ChannelControlBlock(ChannelSet cSet, int channelId) {
@@ -44,6 +47,7 @@
this.ri = new ReadInterface();
this.wi = new WriteInterface();
localClose = new AtomicBoolean();
+ localCloseAck = new AtomicBoolean();
remoteClose = new AtomicBoolean();
}
@@ -205,7 +209,7 @@
return fba;
}
- void write(MultiplexedConnection.WriterState writerState) {
+ void write(MultiplexedConnection.WriterState writerState) throws NetException {
if (currentWriteBuffer == null) {
currentWriteBuffer = wiFullQueue.poll();
}
@@ -261,7 +265,7 @@
}
}
- synchronized void write(MultiplexedConnection.WriterState writerState) {
+ synchronized void write(MultiplexedConnection.WriterState writerState) throws NetException {
wi.write(writerState);
}
@@ -291,6 +295,10 @@
remoteClose.set(true);
}
+ synchronized void reportLocalEOSAck() {
+ localCloseAck.set(true);
+ }
+
synchronized void reportRemoteError(int ecode) {
ri.flush();
ri.fba.error(ecode);
@@ -298,12 +306,12 @@
}
boolean completelyClosed() {
- return localClose.get() && remoteClose.get();
+ return localCloseAck.get() && remoteClose.get();
}
@Override
public String toString() {
- return "Channel:" + channelId + "[localClose: " + localClose + " remoteClose: " + remoteClose
- + " readCredits: " + ri.credits + " writeCredits: " + wi.credits + "]";
+ return "Channel:" + channelId + "[localClose: " + localClose + " localCloseAck: " + localCloseAck
+ + " remoteClose: " + remoteClose + " readCredits: " + ri.credits + " writeCredits: " + wi.credits + "]";
}
}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
index b13fc94..ab5fd40 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -16,10 +16,14 @@
import java.util.Arrays;
import java.util.BitSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.net.exceptions.NetException;
public class ChannelSet {
+ private static final Logger LOGGER = Logger.getLogger(ChannelSet.class.getName());
+
private static final int MAX_OPEN_CHANNELS = 1024;
private static final int INITIAL_SIZE = 16;
@@ -36,6 +40,8 @@
private final BitSet pendingChannelSynBitmap;
+ private final BitSet pendingEOSAckBitmap;
+
private int openChannelCount;
private final IEventCounter pendingWriteEventsCounter;
@@ -47,6 +53,7 @@
pendingChannelWriteBitmap = new BitSet();
pendingChannelCreditsBitmap = new BitSet();
pendingChannelSynBitmap = new BitSet();
+ pendingEOSAckBitmap = new BitSet();
this.pendingWriteEventsCounter = pendingWriteEventsCounter;
openChannelCount = 0;
}
@@ -54,10 +61,10 @@
ChannelControlBlock allocateChannel() throws NetException {
synchronized (mConn) {
int idx = allocationBitmap.nextClearBit(0);
- if (idx < 0) {
+ if (idx < 0 || idx == ccbArray.length) {
cleanupClosedChannels();
idx = allocationBitmap.nextClearBit(0);
- if (idx < 0) {
+ if (idx < 0 || idx == ccbArray.length) {
idx = ccbArray.length;
}
}
@@ -70,6 +77,9 @@
ChannelControlBlock ccb = ccbArray[i];
if (ccb != null) {
if (ccb.completelyClosed()) {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Cleaning free channel: " + ccb);
+ }
freeChannel(ccb);
}
}
@@ -77,7 +87,9 @@
}
ChannelControlBlock registerChannel(int channelId) throws NetException {
- return createChannel(channelId);
+ synchronized (mConn) {
+ return createChannel(channelId);
+ }
}
private void freeChannel(ChannelControlBlock channel) {
@@ -103,6 +115,10 @@
return pendingChannelSynBitmap;
}
+ BitSet getPendingEOSAckBitmap() {
+ return pendingEOSAckBitmap;
+ }
+
int getOpenChannelCount() {
return openChannelCount;
}
@@ -132,7 +148,7 @@
}
}
- public void unmarkPendingWrite(int channelId) {
+ void unmarkPendingWrite(int channelId) {
synchronized (mConn) {
assert pendingChannelWriteBitmap.get(channelId);
pendingChannelWriteBitmap.clear(channelId);
@@ -140,6 +156,14 @@
}
}
+ void markEOSAck(int channelId) {
+ synchronized (mConn) {
+ assert !pendingEOSAckBitmap.get(channelId);
+ pendingEOSAckBitmap.set(channelId);
+ pendingWriteEventsCounter.increment();
+ }
+ }
+
private ChannelControlBlock createChannel(int idx) throws NetException {
if (idx >= ccbArray.length) {
expand(idx);
@@ -147,6 +171,15 @@
if (idx > MAX_OPEN_CHANNELS) {
throw new NetException("More than " + MAX_OPEN_CHANNELS + " opened concurrently");
}
+ if (ccbArray[idx] != null) {
+ assert ccbArray[idx].completelyClosed();
+ if (ccbArray[idx].completelyClosed()) {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Cleaning free channel: " + ccbArray[idx]);
+ }
+ freeChannel(ccbArray[idx]);
+ }
+ }
assert idx < ccbArray.length;
assert !allocationBitmap.get(idx);
ChannelControlBlock channel = new ChannelControlBlock(this, idx);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index e7e65e7..7f610de 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -86,7 +86,8 @@
}
@Override
- public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException, NetException {
+ public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException,
+ NetException {
if (readable) {
driveReaderStateMachine();
}
@@ -146,7 +147,7 @@
assert pendingWriteSize <= pendingBuffer.remaining();
int oldLimit = pendingBuffer.limit();
try {
- pendingBuffer.limit(pendingWriteSize);
+ pendingBuffer.limit(pendingWriteSize + pendingBuffer.position());
int written = sc.write(pendingBuffer);
pendingWriteSize -= written;
} finally {
@@ -167,7 +168,7 @@
}
}
- void driveWriterStateMachine() throws IOException {
+ void driveWriterStateMachine() throws IOException, NetException {
SocketChannel sc = tcpConnection.getSocketChannel();
if (writerState.writePending()) {
if (!writerState.performPendingWrite(sc)) {
@@ -210,6 +211,18 @@
return;
}
}
+ BitSet pendingEOSAckBitmap = cSet.getPendingEOSAckBitmap();
+ for (int j = pendingEOSAckBitmap.nextSetBit(0); j >= 0; j = pendingEOSAckBitmap.nextSetBit(j)) {
+ pendingEOSAckBitmap.clear(j);
+ pendingWriteEventsCounter.decrement();
+ writerState.command.setChannelId(j);
+ writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL_ACK);
+ writerState.command.setData(0);
+ writerState.reset(null, 0, null);
+ if (!writerState.performPendingWrite(sc)) {
+ return;
+ }
+ }
BitSet pendingChannelWriteBitmap = cSet.getPendingChannelWriteBitmap();
lastChannelWritten = pendingChannelWriteBitmap.nextSetBit(lastChannelWritten + 1);
if (lastChannelWritten < 0) {
@@ -262,9 +275,9 @@
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Received command: " + readerState.command);
}
+ ChannelControlBlock ccb = null;
switch (readerState.command.getCommandType()) {
case ADD_CREDITS: {
- ChannelControlBlock ccb;
synchronized (MultiplexedConnection.this) {
ccb = cSet.getCCB(readerState.command.getChannelId());
}
@@ -272,15 +285,21 @@
break;
}
case CLOSE_CHANNEL: {
- ChannelControlBlock ccb;
synchronized (MultiplexedConnection.this) {
ccb = cSet.getCCB(readerState.command.getChannelId());
}
ccb.reportRemoteEOS();
+ cSet.markEOSAck(ccb.getChannelId());
+ break;
+ }
+ case CLOSE_CHANNEL_ACK: {
+ synchronized (MultiplexedConnection.this) {
+ ccb = cSet.getCCB(readerState.command.getChannelId());
+ }
+ ccb.reportLocalEOSAck();
break;
}
case DATA: {
- ChannelControlBlock ccb;
synchronized (MultiplexedConnection.this) {
ccb = cSet.getCCB(readerState.command.getChannelId());
}
@@ -289,22 +308,22 @@
break;
}
case ERROR: {
- ChannelControlBlock ccb;
synchronized (MultiplexedConnection.this) {
ccb = cSet.getCCB(readerState.command.getChannelId());
}
ccb.reportRemoteError(readerState.command.getData());
+ cSet.markEOSAck(ccb.getChannelId());
break;
}
case OPEN_CHANNEL: {
int channelId = readerState.command.getChannelId();
- ChannelControlBlock ccb;
- synchronized (MultiplexedConnection.this) {
- ccb = cSet.registerChannel(channelId);
- }
+ ccb = cSet.registerChannel(channelId);
muxDemux.getChannelOpenListener().channelOpened(ccb);
}
}
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Applied command: " + readerState.command + " on " + ccb);
+ }
}
if (readerState.pendingReadSize > 0) {
readerState.pendingReadSize = readerState.ccb.read(sc, readerState.pendingReadSize);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
index 8fee065..9124aad 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
@@ -2,12 +2,19 @@
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+
class MuxDemuxCommand {
+ static final int MAX_CHANNEL_ID = 0x3ff;
+
static final int COMMAND_SIZE = 4;
+ static final int MAX_DATA_VALUE = 0x7ffff;
+
enum CommandType {
OPEN_CHANNEL,
CLOSE_CHANNEL,
+ CLOSE_CHANNEL_ACK,
ERROR,
ADD_CREDITS,
DATA,
@@ -23,7 +30,10 @@
return channelId;
}
- public void setChannelId(int channelId) {
+ public void setChannelId(int channelId) throws NetException {
+ if (channelId > MAX_CHANNEL_ID) {
+ throw new NetException("channelId " + channelId + " exceeds " + MAX_CHANNEL_ID);
+ }
this.channelId = channelId;
}
@@ -39,7 +49,10 @@
return data;
}
- public void setData(int data) {
+ public void setData(int data) throws NetException {
+ if (channelId > MAX_DATA_VALUE) {
+ throw new NetException("data " + data + " exceeds " + MAX_DATA_VALUE);
+ }
this.data = data;
}