Fixed issues on Mac OS X
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1179 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/tools/WriteValueTest.java b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/tools/WriteValueTest.java
index 73dc16d..18f0e75 100644
--- a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/tools/WriteValueTest.java
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/tools/WriteValueTest.java
@@ -80,7 +80,7 @@
dout.writeUTF(str);
baaos.reset();
WriteValueTools.writeUTF8String(interm.getByteArray(), 0, interm.size(), baaos);
- byte[] b = str.getBytes();
+ byte[] b = str.getBytes("UTF-8");
if (baaos.size() != b.length + 2) {
throw new Exception("Expecting to write " + b + " in " + b.length + " bytes, but found " + baaos.size()
+ " bytes.");
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index d6f3fd1..4b55d4b 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -21,7 +21,6 @@
import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -152,14 +151,14 @@
private final class WriteInterface implements IChannelWriteInterface {
private final Queue<ByteBuffer> wiFullQueue;
- private int channelWriteEventCount;
+ private boolean channelWritabilityState;
private final ICloseableBufferAcceptor fba = new ICloseableBufferAcceptor() {
@Override
public void accept(ByteBuffer buffer) {
synchronized (ChannelControlBlock.this) {
wiFullQueue.add(buffer);
- incrementLocalWriteEventCount();
+ adjustChannelWritability();
}
}
@@ -173,7 +172,7 @@
return;
}
eos = true;
- incrementLocalWriteEventCount();
+ adjustChannelWritability();
}
}
@@ -181,14 +180,14 @@
public void error(int ecode) {
synchronized (ChannelControlBlock.this) {
WriteInterface.this.ecode = ecode;
- incrementLocalWriteEventCount();
+ adjustChannelWritability();
}
}
};
private IBufferAcceptor eba;
- private final AtomicInteger credits;
+ private int credits;
private boolean eos;
@@ -202,7 +201,7 @@
WriteInterface() {
wiFullQueue = new ArrayDeque<ByteBuffer>();
- credits = new AtomicInteger();
+ credits = 0;
eos = false;
eosSent = false;
ecode = -1;
@@ -224,30 +223,32 @@
currentWriteBuffer = wiFullQueue.poll();
}
if (currentWriteBuffer != null) {
- int size = Math.min(currentWriteBuffer.remaining(), credits.get());
+ int size = Math.min(currentWriteBuffer.remaining(), credits);
if (size > 0) {
- credits.addAndGet(-size);
+ credits -= size;
writerState.command.setChannelId(channelId);
writerState.command.setCommandType(MuxDemuxCommand.CommandType.DATA);
writerState.command.setData(size);
writerState.reset(currentWriteBuffer, size, ChannelControlBlock.this);
+ } else {
+ adjustChannelWritability();
}
} else if (ecode >= 0 && !ecodeSent) {
- decrementLocalWriteEventCount();
writerState.command.setChannelId(channelId);
writerState.command.setCommandType(MuxDemuxCommand.CommandType.ERROR);
writerState.command.setData(ecode);
writerState.reset(null, 0, null);
ecodeSent = true;
localClose.set(true);
+ adjustChannelWritability();
} else if (wi.eos && !wi.eosSent) {
- decrementLocalWriteEventCount();
writerState.command.setChannelId(channelId);
writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
writerState.command.setData(0);
writerState.reset(null, 0, null);
eosSent = true;
localClose.set(true);
+ adjustChannelWritability();
}
}
@@ -255,23 +256,37 @@
if (currentWriteBuffer.remaining() <= 0) {
currentWriteBuffer.clear();
eba.accept(currentWriteBuffer);
- decrementLocalWriteEventCount();
currentWriteBuffer = null;
+ adjustChannelWritability();
}
}
- void incrementLocalWriteEventCount() {
- ++channelWriteEventCount;
- if (channelWriteEventCount == 1) {
- cSet.markPendingWrite(channelId);
+ private boolean computeWritability() {
+ boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty();
+ if (writableDataPresent) {
+ return credits > 0;
}
+ if (eos && !eosSent) {
+ return true;
+ }
+ if (ecode >= 0 && !ecodeSent) {
+ return true;
+ }
+ return false;
}
- void decrementLocalWriteEventCount() {
- --channelWriteEventCount;
- if (channelWriteEventCount == 0) {
- cSet.unmarkPendingWrite(channelId);
+ void adjustChannelWritability() {
+ boolean writable = computeWritability();
+ if (writable) {
+ if (!channelWritabilityState) {
+ cSet.markPendingWrite(channelId);
+ }
+ } else {
+ if (channelWritabilityState) {
+ cSet.unmarkPendingWrite(channelId);
+ }
}
+ channelWritabilityState = writable;
}
}
@@ -295,8 +310,9 @@
this.ri.credits = credits;
}
- void addWriteCredits(int delta) {
- wi.credits.addAndGet(delta);
+ synchronized void addWriteCredits(int delta) {
+ wi.credits += delta;
+ wi.adjustChannelWritability();
}
synchronized void reportRemoteEOS() {