[NO ISSUE][STO] Unpin pages when interrupted during reads

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- This change fixes a bug that happens when a thread pinning
  a page that is not already in the buffer cache is interrupted.
- The fix is that if a failure happens during the read call,
  the page is unpinned.
- A test case is added

Change-Id: I8d1c52fcf89ed90e8ef6019cd77842dd7468df49
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2274
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
index f93df0a..bb31885 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
@@ -68,5 +68,13 @@
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
index 26ad457..f94914c 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
@@ -26,8 +26,15 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -39,11 +46,15 @@
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
 import org.apache.hyracks.test.support.TestUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class BufferCacheTest {
+    private static final Logger LOGGER = LogManager.getLogger();
     protected static final List<String> openedFiles = new ArrayList<>();
     protected static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
 
@@ -62,6 +73,102 @@
     }
 
     @Test
+    public void interruptPinTest() throws Exception {
+        /*
+         * This test will create a buffer cache of a small size (4 pages)
+         * and then will create a file of size = 16 pages and have 4 threads
+         * pin and unpin the pages one by one. and another thread interrupts them
+         * for some time.. It then will close the file and ensure that all the pages are
+         * unpinned and that no problems are found
+         */
+        final int bufferCacheNumPages = 4;
+        TestStorageManagerComponentHolder.init(PAGE_SIZE, bufferCacheNumPages, MAX_OPEN_FILES);
+        IIOManager ioManager = TestStorageManagerComponentHolder.getIOManager();
+        IBufferCache bufferCache =
+                TestStorageManagerComponentHolder.getBufferCache(ctx.getJobletContext().getServiceContext());
+        final long duration = TimeUnit.SECONDS.toMillis(20);
+        final String fileName = getFileName();
+        final FileReference file = ioManager.resolve(fileName);
+        final int fileId = bufferCache.createFile(file);
+        final int numPages = 16;
+        bufferCache.openFile(fileId);
+        for (int i = 0; i < numPages; i++) {
+            long dpid = BufferedFileHandle.getDiskPageId(fileId, i);
+            ICachedPage page = bufferCache.confiscatePage(dpid);
+            page.getBuffer().putInt(0, i);
+            bufferCache.createFIFOQueue().put(page);
+        }
+        bufferCache.finishQueue();
+        bufferCache.closeFile(fileId);
+        ExecutorService executor = Executors.newFixedThreadPool(bufferCacheNumPages);
+        MutableObject<Thread>[] readers = new MutableObject[bufferCacheNumPages];
+        Future<Void>[] futures = new Future[bufferCacheNumPages];
+        for (int i = 0; i < bufferCacheNumPages; i++) {
+            readers[i] = new MutableObject<>();
+            final int threadNumber = i;
+            futures[i] = executor.submit(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    synchronized (readers[threadNumber]) {
+                        readers[threadNumber].setValue(Thread.currentThread());
+                        readers[threadNumber].notifyAll();
+                    }
+                    // for duration, just read the pages one by one.
+                    // At the end, close the file
+                    bufferCache.openFile(fileId);
+                    final long start = System.currentTimeMillis();
+                    int pageNumber = 0;
+                    int totalReads = 0;
+                    int successfulReads = 0;
+                    int interruptedReads = 0;
+                    while (System.currentTimeMillis() - start < duration) {
+                        totalReads++;
+                        pageNumber = (pageNumber + 1) % numPages;
+                        try {
+                            long dpid = BufferedFileHandle.getDiskPageId(fileId, pageNumber);
+                            ICachedPage page = bufferCache.pin(dpid, false);
+                            successfulReads++;
+                            bufferCache.unpin(page);
+                        } catch (HyracksDataException hde) {
+                            interruptedReads++;
+                            // clear
+                            Thread.interrupted();
+                        }
+                    }
+                    bufferCache.closeFile(fileId);
+                    LOGGER.log(Level.INFO, "Total reads = " + totalReads + " Successful Reads = " + successfulReads
+                            + " Interrupted Reads = " + interruptedReads);
+                    return null;
+                }
+            });
+        }
+
+        for (int i = 0; i < bufferCacheNumPages; i++) {
+            synchronized (readers[i]) {
+                while (readers[i].getValue() == null) {
+                    readers[i].wait();
+                }
+            }
+        }
+        final long start = System.currentTimeMillis();
+
+        while (System.currentTimeMillis() - start < duration) {
+            for (int i = 0; i < bufferCacheNumPages; i++) {
+                readers[i].getValue().interrupt();
+            }
+            Thread.sleep(25); // NOSONAR Sleep so some reads are successful
+        }
+        try {
+            for (int i = 0; i < bufferCacheNumPages; i++) {
+                futures[i].get();
+            }
+        } finally {
+            bufferCache.deleteFile(fileId);
+            bufferCache.close();
+        }
+    }
+
+    @Test
     public void simpleOpenPinCloseTest() throws HyracksException {
         TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES, MAX_OPEN_FILES);
         IBufferCache bufferCache =