ASTERIXDB-1438: BufferCache spins indefinitely...

Fixes ASTERIXDB-1438: BufferCache spins indefinitely when cache is exceeded

After three unsuccessful cycles (each traversing clock pages three times)
and waiting for dirty pages to be cleaned, an exception is thrown

Change-Id: I327a7423bd630c96e16601b1a3a2a21f558f9f50
Reviewed-on: https://asterix-gerrit.ics.uci.edu/894
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java
index 1bbe6db..114dcb8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java
@@ -125,6 +125,7 @@
                 }
             }
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             // TODO what do we do here?
             e.printStackTrace();
         }
@@ -140,6 +141,7 @@
             try {
                 entry = queue.take();
             } catch(InterruptedException e) {
+                Thread.currentThread().interrupt();
                 break;
             }
             if (entry.getQueueInfo() != null && entry.getQueueInfo().hasWaiters()){
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 2b6dd73..201526b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -54,6 +54,7 @@
 
     private static final int MIN_CLEANED_COUNT_DIFF = 3;
     private static final int PIN_MAX_WAIT_TIME = 50;
+    private static final int MAX_PIN_ATTEMPT_CYCLES = 3;
     public static final boolean DEBUG = false;
 
     private final int pageSize;
@@ -74,7 +75,7 @@
     private ConcurrentHashMap<CachedPage, StackTraceElement[]> pinnedPageOwner;
     //!DEBUG
     private IIOReplicationManager ioReplicationManager;
-    private List<ICachedPageInternal> cachedPages = new ArrayList<>();
+    private final List<ICachedPageInternal> cachedPages = new ArrayList<>();
 
     private boolean closed;
 
@@ -94,16 +95,16 @@
         this.fileMapManager = fileMapManager;
 
         Executor executor = Executors.newCachedThreadPool(threadFactory);
-        fileInfoMap = new HashMap<Integer, BufferedFileHandle>();
-        virtualFiles = new HashSet<Integer>();
+        fileInfoMap = new HashMap<>();
+        virtualFiles = new HashSet<>();
         cleanerThread = new CleanerThread();
         executor.execute(cleanerThread);
         closed = false;
 
         fifoWriter = new AsyncFIFOPageQueueManager(this);
         if( DEBUG ) {
-            confiscatedPages = new ArrayList<CachedPage>();
-            confiscatedPagesOwner = new HashMap<CachedPage, StackTraceElement[]>();
+            confiscatedPages = new ArrayList<>();
+            confiscatedPagesOwner = new HashMap<>();
             confiscateLock = new ReentrantLock();
             pinnedPageOwner = new ConcurrentHashMap<>();
         }
@@ -135,7 +136,7 @@
 
         // check whether file has been created and opened
         int fileId = BufferedFileHandle.getFileId(dpid);
-        BufferedFileHandle fInfo = null;
+        BufferedFileHandle fInfo;
         synchronized (fileInfoMap) {
             fInfo = fileInfoMap.get(fileId);
         }
@@ -185,7 +186,7 @@
         if (DEBUG) {
             pinSanityCheck(dpid);
         }
-        CachedPage cPage = findPage(dpid, false);
+        CachedPage cPage = findPage(dpid);
         if (!newPage) {
             if (DEBUG) {
                 confiscateLock.lock();
@@ -220,13 +221,9 @@
         return cPage;
     }
 
-    private boolean isVirtual(long vpid) throws HyracksDataException {
-        CachedPage virtPage = findPage(vpid, true);
-        return virtPage.confiscated.get();
-    }
+    private CachedPage findPage(long dpid) throws HyracksDataException {
 
-    private CachedPage findPage(long dpid, boolean virtual) throws HyracksDataException {
-        while (true) {
+        for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) {
             int startCleanedCount = cleanerThread.cleanedCount;
 
             CachedPage cPage = null;
@@ -428,11 +425,11 @@
                             victimBucket.cachedPage = victim.next;
                         } else {
                             CachedPage victimPrev = victimBucket.cachedPage;
-                            while (victimPrev != null && victimPrev.next != victim) {
+                            while (victimPrev.next != victim) {
                                 victimPrev = victimPrev.next;
-                            }
-                            if(DEBUG) {
-                                assert victimPrev != null;
+                                if (victimPrev == null) {
+                                    throw new IllegalStateException();
+                                }
                             }
                             victimPrev.next = victim.next;
                         }
@@ -449,8 +446,13 @@
                     return victim;
                 }
             }
-            synchronized (cleanerThread) {
-                pageCleanerPolicy.notifyVictimNotFound(cleanerThread);
+            synchronized (cleanerThread.threadLock) {
+                try {
+                    pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock);
+                } catch (InterruptedException e) {
+                    // Re-interrupt the thread so this gets handled later
+                    Thread.currentThread().interrupt();
+                }
             }
             // Heuristic optimization. Check whether the cleaner thread has
             // cleaned pages since we did our last pin attempt.
@@ -461,12 +463,18 @@
             }
             synchronized (cleanerThread.cleanNotification) {
                 try {
-                    cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
+                    // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify
+                    do {
+                        cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
+                    } while (false);
                 } catch (InterruptedException e) {
-                    // Do nothing
+                    // Re-interrupt the thread so this gets handled later
+                    Thread.currentThread().interrupt();
                 }
             }
         }
+        throw new HyracksDataException("Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES +
+                " cycles (buffer cache undersized?)");
     }
 
     private String dumpState() {
@@ -651,21 +659,15 @@
         }
     }
 
-    private class CleanerThread extends Thread {
-        private boolean shutdownStart = false;
-        private boolean shutdownComplete = false;
+    private class CleanerThread implements Runnable {
+        private volatile boolean shutdownStart = false;
+        private volatile boolean shutdownComplete = false;
+        private final Object threadLock = new Object();
         private final Object cleanNotification = new Object();
         // Simply keeps incrementing this counter when a page is cleaned.
         // Used to implement wait-for-cleanerthread heuristic optimizations.
         // A waiter can detect whether pages have been cleaned.
-        // No need to make this var volatile or synchronize it's access in any
-        // way because it is used for heuristics.
-        private int cleanedCount = 0;
-
-        public CleanerThread() {
-            setPriority(Thread.NORM_PRIORITY);
-            setDaemon(true);
-        }
+        private volatile int cleanedCount = 0;
 
         public void cleanPage(CachedPage cPage, boolean force) {
             if (cPage.dirty.get() && !cPage.confiscated.get()) {
@@ -678,24 +680,7 @@
                 }
                 if (proceed) {
                     try {
-                        // Make sure page is still dirty.
-                        if (!cPage.dirty.get()) {
-                            return;
-                        }
-                        boolean cleaned = true;
-                        try {
-                            write(cPage);
-                        } catch (HyracksDataException e) {
-                            cleaned = false;
-                        }
-                        if (cleaned) {
-                            cPage.dirty.set(false);
-                            cPage.pinCount.decrementAndGet();
-                            cleanedCount++;
-                            synchronized (cleanNotification) {
-                                cleanNotification.notifyAll();
-                            }
-                        }
+                        cleanPageLocked(cPage);
                     } finally {
                         if (force) {
                             cPage.latch.writeLock().unlock();
@@ -710,34 +695,63 @@
             }
         }
 
-        @Override
-        public synchronized void run() {
+        private void cleanPageLocked(CachedPage cPage) {
+            // Make sure page is still dirty.
+            if (!cPage.dirty.get()) {
+                return;
+            }
+            boolean cleaned = true;
             try {
-                while (true) {
-                    pageCleanerPolicy.notifyCleanCycleStart(this);
-                    int curPage = 0;
-                    while (true) {
-                        synchronized (cachedPages) {
-                            if (curPage >= cachedPages.size()) {
-                                break;
-                            }
-                            CachedPage cPage = (CachedPage) cachedPages.get(curPage);
-                            if (cPage != null) {
-                                cleanPage(cPage, false);
-                            }
-                        }
-                        curPage++;
+                write(cPage);
+            } catch (HyracksDataException e) {
+                LOGGER.log(Level.WARNING, "Unable to write dirty page", e);
+                cleaned = false;
+            }
+            if (cleaned) {
+                cPage.dirty.set(false);
+                cPage.pinCount.decrementAndGet();
+                // this increment of a volatile is OK as there is only one writer
+                cleanedCount++;
+                synchronized (cleanNotification) {
+                    cleanNotification.notifyAll();
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            synchronized (threadLock) {
+                try {
+                    while (!shutdownStart) {
+                        runCleanCycle();
                     }
-                    if (shutdownStart) {
+                } catch (InterruptedException e) {
+
+                    Thread.currentThread().interrupt();
+                } finally {
+                    shutdownComplete = true;
+                    threadLock.notifyAll();
+                }
+            }
+        }
+
+        private void runCleanCycle() throws InterruptedException {
+            pageCleanerPolicy.notifyCleanCycleStart(threadLock);
+            int curPage = 0;
+            while (true) {
+                synchronized (cachedPages) {
+                    if (curPage >= cachedPages.size()) {
                         break;
                     }
-                    pageCleanerPolicy.notifyCleanCycleFinish(this);
+                    CachedPage cPage = (CachedPage) cachedPages.get(curPage);
+                    if (cPage != null) {
+                        cleanPage(cPage, false);
+                    }
                 }
-            } catch (Exception e) {
-                e.printStackTrace();
-            } finally {
-                shutdownComplete = true;
-                notifyAll();
+                curPage++;
+            }
+            if (!shutdownStart) {
+                pageCleanerPolicy.notifyCleanCycleFinish(threadLock);
             }
         }
     }
@@ -746,29 +760,31 @@
     public void close() {
         closed = true;
         fifoWriter.destroyQueue();
-        synchronized (cleanerThread) {
-            cleanerThread.shutdownStart = true;
-            cleanerThread.notifyAll();
-            while (!cleanerThread.shutdownComplete) {
-                try {
-                    cleanerThread.wait();
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
+        try {
+            synchronized (cleanerThread.threadLock) {
+                cleanerThread.shutdownStart = true;
+                cleanerThread.threadLock.notifyAll();
+                while (!cleanerThread.shutdownComplete) {
+                    cleanerThread.threadLock.wait();
                 }
             }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
         }
 
         synchronized (fileInfoMap) {
-            try {
-                for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
+            for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
+                try {
                     boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
                     sweepAndFlush(entry.getKey(), !fileHasBeenDeleted);
                     if (!fileHasBeenDeleted) {
                         ioManager.close(entry.getValue().getFileHandle());
                     }
+                } catch (HyracksDataException e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.log(Level.WARNING, "Error flushing file id: " + entry.getKey(), e);
+                    }
                 }
-            } catch (HyracksDataException e) {
-                e.printStackTrace();
             }
             fileInfoMap.clear();
         }
@@ -848,8 +864,7 @@
     }
 
     private void sweepAndFlush(int fileId, boolean flushDirtyPages) throws HyracksDataException {
-        for (int i = 0; i < pageMap.length; ++i) {
-            final CacheBucket bucket = pageMap[i];
+        for (final CacheBucket bucket : pageMap) {
             bucket.bucketLock.lock();
             try {
                 CachedPage prev = bucket.cachedPage;
@@ -882,7 +897,7 @@
     private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage, boolean flushDirtyPages)
             throws HyracksDataException {
         if (BufferedFileHandle.getFileId(cPage.dpid) == fileId) {
-            int pinCount = -1;
+            int pinCount;
             if (cPage.dirty.get()) {
                 if (flushDirtyPages) {
                     write(cPage);
@@ -935,7 +950,7 @@
 
     @Override
     public void force(int fileId, boolean metadata) throws HyracksDataException {
-        BufferedFileHandle fInfo = null;
+        BufferedFileHandle fInfo;
         synchronized (fileInfoMap) {
             fInfo = fileInfoMap.get(fileId);
         }
@@ -1016,10 +1031,7 @@
         synchronized (cachedPages) {
             final int cpid = page.getCachedPageId();
             if (cpid < cachedPages.size()) {
-                ICachedPageInternal old = cachedPages.set(cpid, page);
-                if (DEBUG) {
-                    assert old == null;
-                }
+                cachedPages.set(cpid, page);
             } else {
                 if (cpid > cachedPages.size()) {
                     // 4 > 1 -> [exiting, null, null, null, new]
@@ -1135,7 +1147,7 @@
 
     @Override
     public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
-        return confiscatePage(dpid, () -> pageReplacementStrategy.findVictim());
+        return confiscatePage(dpid, pageReplacementStrategy::findVictim);
     }
 
     @Override
@@ -1145,7 +1157,7 @@
 
     private ICachedPage confiscatePage(long dpid, Supplier<ICachedPageInternal> victimSupplier)
             throws HyracksDataException {
-        while (true) {
+        for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) {
             int startCleanedCount = cleanerThread.cleanedCount;
             ICachedPage returnPage = null;
             CachedPage victim = (CachedPage) victimSupplier.get();
@@ -1238,8 +1250,13 @@
                 return returnPage;
             }
             // no page available to confiscate. try kicking the cleaner thread.
-            synchronized (cleanerThread) {
-                pageCleanerPolicy.notifyVictimNotFound(cleanerThread);
+            synchronized (cleanerThread.threadLock) {
+                try {
+                    pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock);
+                } catch (InterruptedException e) {
+                    // Re-interrupt the thread so this gets handled later
+                    Thread.currentThread().interrupt();
+                }
             }
             // Heuristic optimization. Check whether the cleaner thread has
             // cleaned pages since we did our last pin attempt.
@@ -1250,12 +1267,18 @@
             }
             synchronized (cleanerThread.cleanNotification) {
                 try {
-                    cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
+                    // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify
+                    do {
+                        cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
+                    } while (false);
                 } catch (InterruptedException e) {
-                    // Do nothing
+                    // Re-interrupt the thread so this gets handled later
+                    Thread.currentThread().interrupt();
                 }
             }
         }
+        throw new HyracksDataException("Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES +
+                " cycles (buffer cache undersized?)");
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java
index 276e26a..ceb9c36 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.storage.common.buffercache;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
 public class DelayPageCleanerPolicy implements IPageCleanerPolicy {
     private long delay;
 
@@ -28,21 +26,16 @@
     }
 
     @Override
-    public void notifyCleanCycleStart(Object monitor) throws HyracksDataException {
-
+    public void notifyCleanCycleStart(Object monitor) throws InterruptedException {
     }
 
     @Override
-    public void notifyCleanCycleFinish(Object monitor) throws HyracksDataException {
-        try {
-            monitor.wait(delay);
-        } catch (InterruptedException e) {
-            throw new HyracksDataException(e);
-        }
+    public void notifyCleanCycleFinish(Object monitor) throws InterruptedException {
+        monitor.wait(delay);
     }
 
     @Override
-    public void notifyVictimNotFound(Object monitor) throws HyracksDataException {
+    public void notifyVictimNotFound(Object monitor) {
         monitor.notifyAll();
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageCleanerPolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageCleanerPolicy.java
index 897ae75..e62547c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageCleanerPolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageCleanerPolicy.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hyracks.storage.common.buffercache;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
 /**
  * Allows customization of the page cleaning strategy by the cleaner thread.
  *
@@ -31,25 +29,22 @@
      *
      * @param monitor
      *            - The monitor on which a mutex is held while in this call
-     * @throws HyracksDataException
      */
-    public void notifyCleanCycleStart(Object monitor) throws HyracksDataException;
+    void notifyCleanCycleStart(Object monitor) throws InterruptedException;
 
     /**
      * Callback from the cleaner just after the finish of a cleaning cycle.
      *
      * @param monitor
      *            - The monitor on which a mutex is held while in this call.
-     * @throws HyracksDataException
      */
-    public void notifyCleanCycleFinish(Object monitor) throws HyracksDataException;
+    void notifyCleanCycleFinish(Object monitor) throws InterruptedException;
 
     /**
      * Callback to indicate that no victim was found.
      *
      * @param monitor
      *            - The monitor on which a mutex is held while in this call.
-     * @throws HyracksDataException
      */
-    public void notifyVictimNotFound(Object monitor) throws HyracksDataException;
+    void notifyVictimNotFound(Object monitor) throws InterruptedException;
 }