Fixed potential infinite wait bug in writer state machine. Fixed credit transmission to respect command data bounds.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1069 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 7f6853b..e3d057a 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -76,27 +76,23 @@
if (remoteClose.get()) {
return;
}
- int delta;
+ int delta = buffer.remaining();
synchronized (ChannelControlBlock.this) {
riEmptyQueue.add(buffer);
- delta = buffer.remaining();
}
- credits.addAndGet(delta);
- if (delta != 0) {
- cSet.markPendingCredits(channelId);
- }
+ cSet.addPendingCredits(channelId, delta);
}
};
private ICloseableBufferAcceptor fba;
- private final AtomicInteger credits;
+ private volatile int credits;
private ByteBuffer currentReadBuffer;
ReadInterface() {
riEmptyQueue = new LinkedList<ByteBuffer>();
- credits = new AtomicInteger();
+ credits = 0;
}
@Override
@@ -290,12 +286,12 @@
return ri.read(sc, size);
}
- void addReadCredits(int delta) {
- ri.credits.addAndGet(delta);
+ int getReadCredits() {
+ return ri.credits;
}
- int getAndResetReadCredits() {
- return ri.credits.getAndSet(0);
+ void setReadCredits(int credits) {
+ this.ri.credits = credits;
}
void addWriteCredits(int delta) {
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
index 9411d42..104f98a 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -131,9 +131,16 @@
}
}
- void markPendingCredits(int channelId) {
+ void addPendingCredits(int channelId, int delta) {
+ if (delta <= 0) {
+ return;
+ }
synchronized (mConn) {
- if (!pendingChannelCreditsBitmap.get(channelId)) {
+ ChannelControlBlock ccb = ccbArray[channelId];
+ int oldCredits = ccb.getReadCredits();
+ ccb.setReadCredits(oldCredits + delta);
+ if (oldCredits == 0) {
+ assert !pendingChannelCreditsBitmap.get(channelId);
pendingChannelCreditsBitmap.set(channelId);
pendingWriteEventsCounter.increment();
}
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 1d71fac..ab3d5c1 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -192,7 +192,6 @@
BitSet pendingChannelSynBitmap = cSet.getPendingChannelSynBitmap();
for (int j = pendingChannelSynBitmap.nextSetBit(0); j >= 0; j = pendingChannelSynBitmap.nextSetBit(j)) {
pendingChannelSynBitmap.clear(j);
- pendingWriteEventsCounter.decrement();
writerState.command.setChannelId(j);
writerState.command.setCommandType(MuxDemuxCommand.CommandType.OPEN_CHANNEL);
writerState.command.setData(0);
@@ -200,26 +199,36 @@
if (!writerState.performPendingWrite(sc)) {
return;
}
+ pendingWriteEventsCounter.decrement();
}
BitSet pendingChannelCreditsBitmap = cSet.getPendingChannelCreditsBitmap();
for (int j = pendingChannelCreditsBitmap.nextSetBit(0); j >= 0; j = pendingChannelCreditsBitmap
.nextSetBit(j)) {
- pendingChannelCreditsBitmap.clear(j);
- pendingWriteEventsCounter.decrement();
writerState.command.setChannelId(j);
writerState.command.setCommandType(MuxDemuxCommand.CommandType.ADD_CREDITS);
ChannelControlBlock ccb = cSet.getCCB(j);
- int credits = ccb.getAndResetReadCredits();
- writerState.command.setData(credits);
+ int credits = ccb.getReadCredits();
+ int effectiveCredits;
+ if (credits <= MuxDemuxCommand.MAX_DATA_VALUE) {
+ effectiveCredits = credits;
+ ccb.setReadCredits(0);
+ pendingChannelCreditsBitmap.clear(j);
+ } else {
+ effectiveCredits = MuxDemuxCommand.MAX_DATA_VALUE;
+ ccb.setReadCredits(credits - effectiveCredits);
+ }
+ writerState.command.setData(effectiveCredits);
writerState.reset(null, 0, null);
if (!writerState.performPendingWrite(sc)) {
return;
}
+ if (credits == effectiveCredits) {
+ pendingWriteEventsCounter.decrement();
+ }
}
BitSet pendingEOSAckBitmap = cSet.getPendingEOSAckBitmap();
for (int j = pendingEOSAckBitmap.nextSetBit(0); j >= 0; j = pendingEOSAckBitmap.nextSetBit(j)) {
pendingEOSAckBitmap.clear(j);
- pendingWriteEventsCounter.decrement();
writerState.command.setChannelId(j);
writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL_ACK);
writerState.command.setData(0);
@@ -227,6 +236,7 @@
if (!writerState.performPendingWrite(sc)) {
return;
}
+ pendingWriteEventsCounter.decrement();
}
BitSet pendingChannelWriteBitmap = cSet.getPendingChannelWriteBitmap();
lastChannelWritten = pendingChannelWriteBitmap.nextSetBit(lastChannelWritten + 1);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
index 9124aad..2e2636b 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
@@ -50,7 +50,7 @@
}
public void setData(int data) throws NetException {
- if (channelId > MAX_DATA_VALUE) {
+ if (data > MAX_DATA_VALUE) {
throw new NetException("data " + data + " exceeds " + MAX_DATA_VALUE);
}
this.data = data;