Fixed premature write-suspend bug in network event loop

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1048 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 99140db..7f6853b 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -21,12 +21,16 @@
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
 import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
 import edu.uci.ics.hyracks.net.exceptions.NetException;
 
 public class ChannelControlBlock {
+    private static final Logger LOGGER = Logger.getLogger(ChannelControlBlock.class.getName());
+
     private final ChannelSet cSet;
 
     private final int channelId;
@@ -166,6 +170,9 @@
             public void close() {
                 synchronized (ChannelControlBlock.this) {
                     if (eos) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Received duplicate close() on channel: " + channelId);
+                        }
                         return;
                     }
                     eos = true;
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 238f2ee..1d71fac 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -178,6 +178,7 @@
             if (!writerState.performPendingWrite(sc)) {
                 return;
             }
+            pendingWriteEventsCounter.decrement();
         }
         int numCycles;
 
@@ -239,9 +240,11 @@
             }
             writeCCB.write(writerState);
             if (writerState.writePending()) {
+                pendingWriteEventsCounter.increment();
                 if (!writerState.performPendingWrite(sc)) {
                     return;
                 }
+                pendingWriteEventsCounter.decrement();
             }
         }
     }