Fixed potential infinite wait bug in writer state machine. Fixed credit transmission to respect command data bounds.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1069 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 7f6853b..e3d057a 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -76,27 +76,23 @@
                 if (remoteClose.get()) {
                     return;
                 }
-                int delta;
+                int delta = buffer.remaining();
                 synchronized (ChannelControlBlock.this) {
                     riEmptyQueue.add(buffer);
-                    delta = buffer.remaining();
                 }
-                credits.addAndGet(delta);
-                if (delta != 0) {
-                    cSet.markPendingCredits(channelId);
-                }
+                cSet.addPendingCredits(channelId, delta);
             }
         };
 
         private ICloseableBufferAcceptor fba;
 
-        private final AtomicInteger credits;
+        private volatile int credits;
 
         private ByteBuffer currentReadBuffer;
 
         ReadInterface() {
             riEmptyQueue = new LinkedList<ByteBuffer>();
-            credits = new AtomicInteger();
+            credits = 0;
         }
 
         @Override
@@ -290,12 +286,12 @@
         return ri.read(sc, size);
     }
 
-    void addReadCredits(int delta) {
-        ri.credits.addAndGet(delta);
+    int getReadCredits() {
+        return ri.credits;
     }
 
-    int getAndResetReadCredits() {
-        return ri.credits.getAndSet(0);
+    void setReadCredits(int credits) {
+        this.ri.credits = credits;
     }
 
     void addWriteCredits(int delta) {
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
index 9411d42..104f98a 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -131,9 +131,16 @@
         }
     }
 
-    void markPendingCredits(int channelId) {
+    void addPendingCredits(int channelId, int delta) {
+        if (delta <= 0) {
+            return;
+        }
         synchronized (mConn) {
-            if (!pendingChannelCreditsBitmap.get(channelId)) {
+            ChannelControlBlock ccb = ccbArray[channelId];
+            int oldCredits = ccb.getReadCredits();
+            ccb.setReadCredits(oldCredits + delta);
+            if (oldCredits == 0) {
+                assert !pendingChannelCreditsBitmap.get(channelId);
                 pendingChannelCreditsBitmap.set(channelId);
                 pendingWriteEventsCounter.increment();
             }
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 1d71fac..ab3d5c1 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -192,7 +192,6 @@
                 BitSet pendingChannelSynBitmap = cSet.getPendingChannelSynBitmap();
                 for (int j = pendingChannelSynBitmap.nextSetBit(0); j >= 0; j = pendingChannelSynBitmap.nextSetBit(j)) {
                     pendingChannelSynBitmap.clear(j);
-                    pendingWriteEventsCounter.decrement();
                     writerState.command.setChannelId(j);
                     writerState.command.setCommandType(MuxDemuxCommand.CommandType.OPEN_CHANNEL);
                     writerState.command.setData(0);
@@ -200,26 +199,36 @@
                     if (!writerState.performPendingWrite(sc)) {
                         return;
                     }
+                    pendingWriteEventsCounter.decrement();
                 }
                 BitSet pendingChannelCreditsBitmap = cSet.getPendingChannelCreditsBitmap();
                 for (int j = pendingChannelCreditsBitmap.nextSetBit(0); j >= 0; j = pendingChannelCreditsBitmap
                         .nextSetBit(j)) {
-                    pendingChannelCreditsBitmap.clear(j);
-                    pendingWriteEventsCounter.decrement();
                     writerState.command.setChannelId(j);
                     writerState.command.setCommandType(MuxDemuxCommand.CommandType.ADD_CREDITS);
                     ChannelControlBlock ccb = cSet.getCCB(j);
-                    int credits = ccb.getAndResetReadCredits();
-                    writerState.command.setData(credits);
+                    int credits = ccb.getReadCredits();
+                    int effectiveCredits;
+                    if (credits <= MuxDemuxCommand.MAX_DATA_VALUE) {
+                        effectiveCredits = credits;
+                        ccb.setReadCredits(0);
+                        pendingChannelCreditsBitmap.clear(j);
+                    } else {
+                        effectiveCredits = MuxDemuxCommand.MAX_DATA_VALUE;
+                        ccb.setReadCredits(credits - effectiveCredits);
+                    }
+                    writerState.command.setData(effectiveCredits);
                     writerState.reset(null, 0, null);
                     if (!writerState.performPendingWrite(sc)) {
                         return;
                     }
+                    if (credits == effectiveCredits) {
+                        pendingWriteEventsCounter.decrement();
+                    }
                 }
                 BitSet pendingEOSAckBitmap = cSet.getPendingEOSAckBitmap();
                 for (int j = pendingEOSAckBitmap.nextSetBit(0); j >= 0; j = pendingEOSAckBitmap.nextSetBit(j)) {
                     pendingEOSAckBitmap.clear(j);
-                    pendingWriteEventsCounter.decrement();
                     writerState.command.setChannelId(j);
                     writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL_ACK);
                     writerState.command.setData(0);
@@ -227,6 +236,7 @@
                     if (!writerState.performPendingWrite(sc)) {
                         return;
                     }
+                    pendingWriteEventsCounter.decrement();
                 }
                 BitSet pendingChannelWriteBitmap = cSet.getPendingChannelWriteBitmap();
                 lastChannelWritten = pendingChannelWriteBitmap.nextSetBit(lastChannelWritten + 1);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
index 9124aad..2e2636b 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
@@ -50,7 +50,7 @@
     }
 
     public void setData(int data) throws NetException {
-        if (channelId > MAX_DATA_VALUE) {
+        if (data > MAX_DATA_VALUE) {
             throw new NetException("data " + data + " exceeds " + MAX_DATA_VALUE);
         }
         this.data = data;