[ASTERIXDB-2478][NET] Calculate Buffer Remaining Before Reusing It
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- When recycling a buffer, calculate the buffer remaining before
releasing it for reuse to prevent other threads from changing
its remaining.
- Add test case.
Change-Id: Icca3284feae800dd6c37694bdefec3516cd4c506
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3036
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
index 1040e81..4ca20ca 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
@@ -60,5 +60,11 @@
<artifactId>hyracks-util</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>2.0.2-beta</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index 32bf77e..3ba8627 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -36,7 +36,7 @@
private final BlockingDeque<ByteBuffer> riEmptyStack;
private final IChannelControlBlock ccb;
- FullFrameChannelReadInterface(IChannelControlBlock ccb) {
+ public FullFrameChannelReadInterface(IChannelControlBlock ccb) {
this.ccb = ccb;
riEmptyStack = new LinkedBlockingDeque<>();
credits = 0;
@@ -45,8 +45,8 @@
if (ccb.isRemotelyClosed()) {
return;
}
- riEmptyStack.push(buffer);
final int delta = buffer.remaining();
+ riEmptyStack.push(buffer);
ccb.addPendingCredits(delta);
};
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
new file mode 100644
index 0000000..f9a610c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.net.tests;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hyracks.api.comm.IBufferFactory;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelReadInterface;
+import org.apache.hyracks.util.StorageUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+
+@RunWith(Parameterized.class)
+public class FullFrameChannelReadInterfaceTest {
+
+ private static final int TEST_RUNS = 100;
+ private static final int RECEIVER_BUFFER_COUNT = 50;
+ private static int FRAMES_TO_READ_COUNT = 10000;
+ private static final int FRAME_SIZE = StorageUtil.getIntSizeInBytes(32, StorageUtil.StorageUnit.KILOBYTE);
+ private static final int EXPECTED_CHANNEL_CREDIT = FRAME_SIZE * RECEIVER_BUFFER_COUNT;
+
+ @Parameterized.Parameters
+ public static Object[][] data() {
+ return new Object[TEST_RUNS][0];
+ }
+
+ @Test
+ public void bufferRecycleTest() throws Exception {
+ final AtomicInteger channelCredit = new AtomicInteger();
+ final IChannelControlBlock ccb = mockChannelControlBlock(channelCredit);
+ final ReadBufferFactory bufferFactory = new ReadBufferFactory(RECEIVER_BUFFER_COUNT, FRAME_SIZE);
+ final FullFrameChannelReadInterface readInterface = new FullFrameChannelReadInterface(ccb);
+ final LinkedBlockingDeque<ByteBuffer> fullBufferQ = new LinkedBlockingDeque<>();
+ readInterface.setFullBufferAcceptor(new ReadFullBufferAcceptor(fullBufferQ));
+ readInterface.setBufferFactory(bufferFactory, RECEIVER_BUFFER_COUNT, FRAME_SIZE);
+ Assert.assertEquals(EXPECTED_CHANNEL_CREDIT, channelCredit.get());
+ final SocketChannel socketChannel = mockSocketChannel(ccb);
+ final Thread networkFrameReader = new Thread(() -> {
+ try {
+ int framesRead = FRAMES_TO_READ_COUNT;
+ while (framesRead > 0) {
+ while (channelCredit.get() == 0) {
+ synchronized (channelCredit) {
+ channelCredit.wait(10000);
+ if (channelCredit.get() == 0) {
+ System.err.println("Sender doesn't have any write credit");
+ System.exit(1);
+ }
+ }
+ }
+ readInterface.read(socketChannel, FRAME_SIZE);
+ framesRead--;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ final Thread frameProcessor = new Thread(() -> {
+ int framesProcessed = 0;
+ try {
+ while (true) {
+ final ByteBuffer fullFrame = fullBufferQ.take();
+ fullFrame.clear();
+ readInterface.getEmptyBufferAcceptor().accept(fullFrame);
+ framesProcessed++;
+ if (framesProcessed == FRAMES_TO_READ_COUNT) {
+ return;
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ });
+ networkFrameReader.start();
+ frameProcessor.start();
+ networkFrameReader.join();
+ frameProcessor.join();
+ if (channelCredit.get() != EXPECTED_CHANNEL_CREDIT) {
+ System.err
+ .println("Expected channel credit " + EXPECTED_CHANNEL_CREDIT + " , found " + channelCredit.get());
+ System.exit(1);
+ }
+ }
+
+ private IChannelControlBlock mockChannelControlBlock(AtomicInteger credit) {
+ final ChannelControlBlock ccb = Mockito.mock(ChannelControlBlock.class);
+ Mockito.when(ccb.isRemotelyClosed()).thenReturn(false);
+ Mockito.doAnswer(invocation -> {
+ final Integer delta = invocation.getArgumentAt(0, Integer.class);
+ credit.addAndGet(delta);
+ synchronized (credit) {
+ credit.notifyAll();
+ }
+ return null;
+ }).when(ccb).addPendingCredits(Mockito.anyInt());
+ return ccb;
+ }
+
+ private SocketChannel mockSocketChannel(IChannelControlBlock ccb) throws IOException {
+ final SocketChannel sc = Mockito.mock(SocketChannel.class);
+ Mockito.when(sc.read(Mockito.any(ByteBuffer.class))).thenAnswer(invocation -> {
+ ccb.addPendingCredits(-FRAME_SIZE);
+ final ByteBuffer buffer = invocation.getArgumentAt(0, ByteBuffer.class);
+ while (buffer.hasRemaining()) {
+ buffer.put((byte) 0);
+ }
+ return FRAME_SIZE;
+ });
+ return sc;
+ }
+
+ private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+ private final BlockingQueue<ByteBuffer> fullBufferQ;
+
+ ReadFullBufferAcceptor(BlockingQueue<ByteBuffer> fullBuffer) {
+ this.fullBufferQ = fullBuffer;
+ }
+
+ @Override
+ public void accept(ByteBuffer buffer) {
+ fullBufferQ.add(buffer);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void error(int ecode) {
+ }
+ }
+
+ public class ReadBufferFactory implements IBufferFactory {
+ private final int limit;
+ private final int frameSize;
+ private int counter = 0;
+
+ ReadBufferFactory(int limit, int frameSize) {
+ this.limit = limit;
+ this.frameSize = frameSize;
+ }
+
+ @Override
+ public ByteBuffer createBuffer() {
+ if (counter >= limit) {
+ throw new IllegalStateException("Buffer limit exceeded");
+ }
+ counter++;
+ return ByteBuffer.allocate(frameSize);
+ }
+ }
+}
\ No newline at end of file