[NO ISSUE][NET] Ensure Recycling Buffer and Notifying Sender is Atomic

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- To avoid synchronization issues that might occur
  due to JVM reordering, ensure that both recycling
  read buffers and notifying the sender  of their
  availability are done atomically before the next
  buffer is received from the sender.

Change-Id: Ia3b1920f33bf7d4e7efbd2ea3405cbc4310a78c7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3520
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
(cherry picked from commit 32eed5f384c5851eae1c613fcb3b9532744ed595)
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3525
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index 3ba8627..53be212 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -21,8 +21,8 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
+import java.util.ArrayDeque;
+import java.util.Deque;
 
 import org.apache.hyracks.api.comm.IBufferFactory;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
@@ -33,65 +33,69 @@
 public class FullFrameChannelReadInterface extends AbstractChannelReadInterface {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private final BlockingDeque<ByteBuffer> riEmptyStack;
+    private final Deque<ByteBuffer> riEmptyStack;
     private final IChannelControlBlock ccb;
+    private final Object bufferRecycleLock = new Object();
 
     public FullFrameChannelReadInterface(IChannelControlBlock ccb) {
         this.ccb = ccb;
-        riEmptyStack = new LinkedBlockingDeque<>();
+        riEmptyStack = new ArrayDeque<>();
         credits = 0;
-
         emptyBufferAcceptor = buffer -> {
-            if (ccb.isRemotelyClosed()) {
-                return;
-            }
             final int delta = buffer.remaining();
-            riEmptyStack.push(buffer);
-            ccb.addPendingCredits(delta);
+            synchronized (bufferRecycleLock) {
+                if (ccb.isRemotelyClosed()) {
+                    return;
+                }
+                riEmptyStack.push(buffer);
+                ccb.addPendingCredits(delta);
+            }
         };
     }
 
     @Override
     public int read(SocketChannel sc, int size) throws IOException, NetException {
-        while (true) {
-            if (size <= 0) {
-                return size;
-            }
-            if (currentReadBuffer == null) {
-                currentReadBuffer = riEmptyStack.poll();
-                //if current buffer == null and limit not reached
-                // factory.createBuffer factory
-                if (currentReadBuffer == null) {
-                    currentReadBuffer = bufferFactory.createBuffer();
-                }
-            }
-            if (currentReadBuffer == null) {
-                if (LOGGER.isWarnEnabled()) {
-                    LOGGER.warn("{} read buffers exceeded. Current empty buffers: {}", ccb, riEmptyStack.size());
-                }
-                throw new IllegalStateException(ccb + " read buffers exceeded");
-            }
-            int rSize = Math.min(size, currentReadBuffer.remaining());
-            if (rSize > 0) {
-                currentReadBuffer.limit(currentReadBuffer.position() + rSize);
-                int len;
-                try {
-                    len = sc.read(currentReadBuffer);
-                    if (len < 0) {
-                        throw new NetException("Socket Closed");
-                    }
-                } finally {
-                    currentReadBuffer.limit(currentReadBuffer.capacity());
-                }
-                size -= len;
-                if (len < rSize) {
+        synchronized (bufferRecycleLock) {
+            while (true) {
+                if (size <= 0) {
                     return size;
                 }
-            } else {
-                return size;
-            }
-            if (currentReadBuffer.remaining() <= 0) {
-                flush();
+                if (currentReadBuffer == null) {
+                    currentReadBuffer = riEmptyStack.poll();
+                    //if current buffer == null and limit not reached
+                    // factory.createBuffer factory
+                    if (currentReadBuffer == null) {
+                        currentReadBuffer = bufferFactory.createBuffer();
+                    }
+                }
+                if (currentReadBuffer == null) {
+                    if (LOGGER.isWarnEnabled()) {
+                        LOGGER.warn("{} read buffers exceeded. Current empty buffers: {}", ccb, riEmptyStack.size());
+                    }
+                    throw new IllegalStateException(ccb + " read buffers exceeded");
+                }
+                int rSize = Math.min(size, currentReadBuffer.remaining());
+                if (rSize > 0) {
+                    currentReadBuffer.limit(currentReadBuffer.position() + rSize);
+                    int len;
+                    try {
+                        len = sc.read(currentReadBuffer);
+                        if (len < 0) {
+                            throw new NetException("Socket Closed");
+                        }
+                    } finally {
+                        currentReadBuffer.limit(currentReadBuffer.capacity());
+                    }
+                    size -= len;
+                    if (len < rSize) {
+                        return size;
+                    }
+                } else {
+                    return size;
+                }
+                if (currentReadBuffer.remaining() <= 0) {
+                    flush();
+                }
             }
         }
     }