Fixed issues on Mac OS X

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1179 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/tools/WriteValueTest.java b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/tools/WriteValueTest.java
index 73dc16d..18f0e75 100644
--- a/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/tools/WriteValueTest.java
+++ b/hyracks-algebricks/hyracks-algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/tools/WriteValueTest.java
@@ -80,7 +80,7 @@
         dout.writeUTF(str);
         baaos.reset();
         WriteValueTools.writeUTF8String(interm.getByteArray(), 0, interm.size(), baaos);
-        byte[] b = str.getBytes();
+        byte[] b = str.getBytes("UTF-8");
         if (baaos.size() != b.length + 2) {
             throw new Exception("Expecting to write " + b + " in " + b.length + " bytes, but found " + baaos.size()
                     + " bytes.");
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index d6f3fd1..4b55d4b 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -21,7 +21,6 @@
 import java.util.Deque;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -152,14 +151,14 @@
     private final class WriteInterface implements IChannelWriteInterface {
         private final Queue<ByteBuffer> wiFullQueue;
 
-        private int channelWriteEventCount;
+        private boolean channelWritabilityState;
 
         private final ICloseableBufferAcceptor fba = new ICloseableBufferAcceptor() {
             @Override
             public void accept(ByteBuffer buffer) {
                 synchronized (ChannelControlBlock.this) {
                     wiFullQueue.add(buffer);
-                    incrementLocalWriteEventCount();
+                    adjustChannelWritability();
                 }
             }
 
@@ -173,7 +172,7 @@
                         return;
                     }
                     eos = true;
-                    incrementLocalWriteEventCount();
+                    adjustChannelWritability();
                 }
             }
 
@@ -181,14 +180,14 @@
             public void error(int ecode) {
                 synchronized (ChannelControlBlock.this) {
                     WriteInterface.this.ecode = ecode;
-                    incrementLocalWriteEventCount();
+                    adjustChannelWritability();
                 }
             }
         };
 
         private IBufferAcceptor eba;
 
-        private final AtomicInteger credits;
+        private int credits;
 
         private boolean eos;
 
@@ -202,7 +201,7 @@
 
         WriteInterface() {
             wiFullQueue = new ArrayDeque<ByteBuffer>();
-            credits = new AtomicInteger();
+            credits = 0;
             eos = false;
             eosSent = false;
             ecode = -1;
@@ -224,30 +223,32 @@
                 currentWriteBuffer = wiFullQueue.poll();
             }
             if (currentWriteBuffer != null) {
-                int size = Math.min(currentWriteBuffer.remaining(), credits.get());
+                int size = Math.min(currentWriteBuffer.remaining(), credits);
                 if (size > 0) {
-                    credits.addAndGet(-size);
+                    credits -= size;
                     writerState.command.setChannelId(channelId);
                     writerState.command.setCommandType(MuxDemuxCommand.CommandType.DATA);
                     writerState.command.setData(size);
                     writerState.reset(currentWriteBuffer, size, ChannelControlBlock.this);
+                } else {
+                    adjustChannelWritability();
                 }
             } else if (ecode >= 0 && !ecodeSent) {
-                decrementLocalWriteEventCount();
                 writerState.command.setChannelId(channelId);
                 writerState.command.setCommandType(MuxDemuxCommand.CommandType.ERROR);
                 writerState.command.setData(ecode);
                 writerState.reset(null, 0, null);
                 ecodeSent = true;
                 localClose.set(true);
+                adjustChannelWritability();
             } else if (wi.eos && !wi.eosSent) {
-                decrementLocalWriteEventCount();
                 writerState.command.setChannelId(channelId);
                 writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
                 writerState.command.setData(0);
                 writerState.reset(null, 0, null);
                 eosSent = true;
                 localClose.set(true);
+                adjustChannelWritability();
             }
         }
 
@@ -255,23 +256,37 @@
             if (currentWriteBuffer.remaining() <= 0) {
                 currentWriteBuffer.clear();
                 eba.accept(currentWriteBuffer);
-                decrementLocalWriteEventCount();
                 currentWriteBuffer = null;
+                adjustChannelWritability();
             }
         }
 
-        void incrementLocalWriteEventCount() {
-            ++channelWriteEventCount;
-            if (channelWriteEventCount == 1) {
-                cSet.markPendingWrite(channelId);
+        private boolean computeWritability() {
+            boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty();
+            if (writableDataPresent) {
+                return credits > 0;
             }
+            if (eos && !eosSent) {
+                return true;
+            }
+            if (ecode >= 0 && !ecodeSent) {
+                return true;
+            }
+            return false;
         }
 
-        void decrementLocalWriteEventCount() {
-            --channelWriteEventCount;
-            if (channelWriteEventCount == 0) {
-                cSet.unmarkPendingWrite(channelId);
+        void adjustChannelWritability() {
+            boolean writable = computeWritability();
+            if (writable) {
+                if (!channelWritabilityState) {
+                    cSet.markPendingWrite(channelId);
+                }
+            } else {
+                if (channelWritabilityState) {
+                    cSet.unmarkPendingWrite(channelId);
+                }
             }
+            channelWritabilityState = writable;
         }
     }
 
@@ -295,8 +310,9 @@
         this.ri.credits = credits;
     }
 
-    void addWriteCredits(int delta) {
-        wi.credits.addAndGet(delta);
+    synchronized void addWriteCredits(int delta) {
+        wi.credits += delta;
+        wi.adjustChannelWritability();
     }
 
     synchronized void reportRemoteEOS() {