[NO ISSUE][STO] Unpin pages when interrupted during reads
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- This change fixes a bug that happens when a thread pinning
a page that is not already in the buffer cache is interrupted.
- The fix is that if a failure happens during the read call,
the page is unpinned.
- A test case is added
Change-Id: I8d1c52fcf89ed90e8ef6019cd77842dd7468df49
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2274
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
index f93df0a..bb31885 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
@@ -68,5 +68,13 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
index 26ad457..f94914c 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
@@ -26,8 +26,15 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -39,11 +46,15 @@
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
import org.apache.hyracks.test.support.TestUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
public class BufferCacheTest {
+ private static final Logger LOGGER = LogManager.getLogger();
protected static final List<String> openedFiles = new ArrayList<>();
protected static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
@@ -62,6 +73,102 @@
}
@Test
+ public void interruptPinTest() throws Exception {
+ /*
+ * This test will create a buffer cache of a small size (4 pages)
+ * and then will create a file of size = 16 pages and have 4 threads
+ * pin and unpin the pages one by one. and another thread interrupts them
+ * for some time.. It then will close the file and ensure that all the pages are
+ * unpinned and that no problems are found
+ */
+ final int bufferCacheNumPages = 4;
+ TestStorageManagerComponentHolder.init(PAGE_SIZE, bufferCacheNumPages, MAX_OPEN_FILES);
+ IIOManager ioManager = TestStorageManagerComponentHolder.getIOManager();
+ IBufferCache bufferCache =
+ TestStorageManagerComponentHolder.getBufferCache(ctx.getJobletContext().getServiceContext());
+ final long duration = TimeUnit.SECONDS.toMillis(20);
+ final String fileName = getFileName();
+ final FileReference file = ioManager.resolve(fileName);
+ final int fileId = bufferCache.createFile(file);
+ final int numPages = 16;
+ bufferCache.openFile(fileId);
+ for (int i = 0; i < numPages; i++) {
+ long dpid = BufferedFileHandle.getDiskPageId(fileId, i);
+ ICachedPage page = bufferCache.confiscatePage(dpid);
+ page.getBuffer().putInt(0, i);
+ bufferCache.createFIFOQueue().put(page);
+ }
+ bufferCache.finishQueue();
+ bufferCache.closeFile(fileId);
+ ExecutorService executor = Executors.newFixedThreadPool(bufferCacheNumPages);
+ MutableObject<Thread>[] readers = new MutableObject[bufferCacheNumPages];
+ Future<Void>[] futures = new Future[bufferCacheNumPages];
+ for (int i = 0; i < bufferCacheNumPages; i++) {
+ readers[i] = new MutableObject<>();
+ final int threadNumber = i;
+ futures[i] = executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ synchronized (readers[threadNumber]) {
+ readers[threadNumber].setValue(Thread.currentThread());
+ readers[threadNumber].notifyAll();
+ }
+ // for duration, just read the pages one by one.
+ // At the end, close the file
+ bufferCache.openFile(fileId);
+ final long start = System.currentTimeMillis();
+ int pageNumber = 0;
+ int totalReads = 0;
+ int successfulReads = 0;
+ int interruptedReads = 0;
+ while (System.currentTimeMillis() - start < duration) {
+ totalReads++;
+ pageNumber = (pageNumber + 1) % numPages;
+ try {
+ long dpid = BufferedFileHandle.getDiskPageId(fileId, pageNumber);
+ ICachedPage page = bufferCache.pin(dpid, false);
+ successfulReads++;
+ bufferCache.unpin(page);
+ } catch (HyracksDataException hde) {
+ interruptedReads++;
+ // clear
+ Thread.interrupted();
+ }
+ }
+ bufferCache.closeFile(fileId);
+ LOGGER.log(Level.INFO, "Total reads = " + totalReads + " Successful Reads = " + successfulReads
+ + " Interrupted Reads = " + interruptedReads);
+ return null;
+ }
+ });
+ }
+
+ for (int i = 0; i < bufferCacheNumPages; i++) {
+ synchronized (readers[i]) {
+ while (readers[i].getValue() == null) {
+ readers[i].wait();
+ }
+ }
+ }
+ final long start = System.currentTimeMillis();
+
+ while (System.currentTimeMillis() - start < duration) {
+ for (int i = 0; i < bufferCacheNumPages; i++) {
+ readers[i].getValue().interrupt();
+ }
+ Thread.sleep(25); // NOSONAR Sleep so some reads are successful
+ }
+ try {
+ for (int i = 0; i < bufferCacheNumPages; i++) {
+ futures[i].get();
+ }
+ } finally {
+ bufferCache.deleteFile(fileId);
+ bufferCache.close();
+ }
+ }
+
+ @Test
public void simpleOpenPinCloseTest() throws HyracksException {
TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES, MAX_OPEN_FILES);
IBufferCache bufferCache =