[NO ISSUE][NET] Ensure Thread Safety in FullFrameChannelReadInterface
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Use a blocking deque in FullFrameChannelReadInterface
to ensure thread safety between frame consumer and
the networking thread.
Change-Id: I33f0171e49b0ff972730a678e8b61a2070dc8832
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2921
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index 049cfd8..32bf77e 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -21,8 +21,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.Deque;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
import org.apache.hyracks.api.comm.IBufferFactory;
import org.apache.hyracks.api.comm.IChannelControlBlock;
@@ -33,22 +33,20 @@
public class FullFrameChannelReadInterface extends AbstractChannelReadInterface {
private static final Logger LOGGER = LogManager.getLogger();
- private final Deque<ByteBuffer> riEmptyStack;
+ private final BlockingDeque<ByteBuffer> riEmptyStack;
private final IChannelControlBlock ccb;
FullFrameChannelReadInterface(IChannelControlBlock ccb) {
this.ccb = ccb;
- riEmptyStack = new ArrayDeque<>();
+ riEmptyStack = new LinkedBlockingDeque<>();
credits = 0;
emptyBufferAcceptor = buffer -> {
- int delta = buffer.remaining();
- synchronized (ccb) {
- if (ccb.isRemotelyClosed()) {
- return;
- }
- riEmptyStack.push(buffer);
+ if (ccb.isRemotelyClosed()) {
+ return;
}
+ riEmptyStack.push(buffer);
+ final int delta = buffer.remaining();
ccb.addPendingCredits(delta);
};
}