ASTERIXDB-1516: Unable to find free page in buffer cache after 3 cycles

Update exhaustion logic to be two-tiered:
- emit warning when cycle count exceeds warning threshold (3)
- fail if cycle count reaches the failure threshold (1000)

Change-Id: I46fa6bbda8c2f81e5e570dd6c07e4f4b794ef5bb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1038
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 080c76f..1b75f94 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -37,7 +37,6 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Supplier;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -57,7 +56,8 @@
 
     private static final int MIN_CLEANED_COUNT_DIFF = 3;
     private static final int PIN_MAX_WAIT_TIME = 50;
-    private static final int MAX_PIN_ATTEMPT_CYCLES = 3;
+    private static final int PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD = 3;
+    private static final int MAX_PIN_ATTEMPT_CYCLES = 1000;
     public static final boolean DEBUG = false;
 
     private final int pageSize;
@@ -107,7 +107,7 @@
         closed = false;
 
         fifoWriter = new AsyncFIFOPageQueueManager(this);
-        if( DEBUG ) {
+        if ( DEBUG ) {
             confiscatedPages = new ArrayList<>();
             confiscatedPagesOwner = new HashMap<>();
             confiscateLock = new ReentrantLock();
@@ -200,7 +200,7 @@
                             throw new IllegalStateException();
                         }
                     }
-                } finally{
+                } finally {
                     confiscateLock.unlock();
                 }
             }
@@ -216,267 +216,227 @@
             cPage.valid = true;
         }
         pageReplacementStrategy.notifyCachePageAccess(cPage);
-        if(DEBUG){
+        if (DEBUG){
             pinnedPageOwner.put(cPage, Thread.currentThread().getStackTrace());
         }
         return cPage;
     }
 
     private CachedPage findPage(long dpid) throws HyracksDataException {
+        return (CachedPage) getPageLoop(dpid, -1, false);
+    }
 
-        for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) {
-            int startCleanedCount = cleanerThread.cleanedCount;
-
-            CachedPage cPage = null;
+    private ICachedPage findPageInner(long dpid) {
+        CachedPage cPage;
+        /*
+         * Hash dpid to get a bucket and then check if the page exists in
+         * the bucket.
+         */
+        int hash = hash(dpid);
+        CacheBucket bucket = pageMap[hash];
+        bucket.bucketLock.lock();
+        try {
+            cPage = bucket.cachedPage;
+            while (cPage != null) {
+                if (DEBUG) {
+                    assert bucket.cachedPage != bucket.cachedPage.next;
+                }
+                if (cPage.dpid == dpid) {
+                    if (DEBUG) {
+                        assert !cPage.confiscated.get();
+                    }
+                    cPage.pinCount.incrementAndGet();
+                    return cPage;
+                }
+                cPage = cPage.next;
+            }
+        } finally {
+            bucket.bucketLock.unlock();
+        }
+        /*
+         * If we got here, the page was not in the hash table. Now we ask
+         * the page replacement strategy to find us a victim.
+         */
+        CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim();
+        if (victim == null) {
+            return null;
+        }
+        /*
+         * We have a victim with the following invariants. 1. The dpid
+         * on the CachedPage may or may not be valid. 2. We have a pin
+         * on the CachedPage. We have to deal with three cases here.
+         * Case 1: The dpid on the CachedPage is invalid (-1). This
+         * indicates that this buffer has never been used or is a
+         * confiscated page. So we are the only ones holding it. Get a lock
+         * on the required dpid's hash bucket, check if someone inserted
+         * the page we want into the table. If so, decrement the
+         * pincount on the victim and return the winner page in the
+         * table. If such a winner does not exist, insert the victim and
+         * return it. Case 2: The dpid on the CachedPage is valid. Case
+         * 2a: The current dpid and required dpid hash to the same
+         * bucket. Get the bucket lock, check that the victim is still
+         * at pinCount == 1 If so check if there is a winning CachedPage
+         * with the required dpid. If so, decrement the pinCount on the
+         * victim and return the winner. If not, update the contents of
+         * the CachedPage to hold the required dpid and return it. If
+         * the picCount on the victim was != 1 or CachedPage was dirty
+         * someone used the victim for its old contents -- Decrement the
+         * pinCount and retry. Case 2b: The current dpid and required
+         * dpid hash to different buckets. Get the two bucket locks in
+         * the order of the bucket indexes (Ordering prevents
+         * deadlocks). Check for the existence of a winner in the new
+         * bucket and for potential use of the victim (pinCount != 1).
+         * If everything looks good, remove the CachedPage from the old
+         * bucket, and add it to the new bucket and update its header
+         * with the new dpid.
+         */
+        if (victim.dpid < 0) {
             /*
-             * Hash dpid to get a bucket and then check if the page exists in
-             * the bucket.
+             * Case 1.
              */
-            int hash = hash(dpid);
-            CacheBucket bucket = pageMap[hash];
             bucket.bucketLock.lock();
             try {
-                cPage = bucket.cachedPage;
-                while (cPage != null) {
-                    if(DEBUG) {
-                        assert bucket.cachedPage != bucket.cachedPage.next;
-                    }
-                    if (cPage.dpid == dpid) {
-                        if(DEBUG) {
-                            assert !cPage.confiscated.get();
-                        }
-                        cPage.pinCount.incrementAndGet();
-                        return cPage;
-                    }
-                    cPage = cPage.next;
+                if (!victim.pinCount.compareAndSet(0, 1)) {
+                    return null;
                 }
+                // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement
+                // pin count and try again
+                if (victim.dpid >= 0) {
+                    victim.pinCount.decrementAndGet();
+                    return null;
+                }
+                if (DEBUG) {
+                    confiscateLock.lock();
+                    try {
+                        if (confiscatedPages.contains(victim)) {
+                            throw new IllegalStateException();
+                        }
+                    } finally {
+                        confiscateLock.unlock();
+                    }
+                }
+                cPage = findTargetInBucket(dpid, bucket.cachedPage, victim);
+                if (cPage != null) {
+                    return cPage;
+                }
+                victim.reset(dpid);
+                victim.next = bucket.cachedPage;
+                bucket.cachedPage = victim;
             } finally {
                 bucket.bucketLock.unlock();
             }
-            /*
-             * If we got here, the page was not in the hash table. Now we ask
-             * the page replacement strategy to find us a victim.
-             */
-            CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim();
-            if (victim != null) {
-                /*
-                 * We have a victim with the following invariants. 1. The dpid
-                 * on the CachedPage may or may not be valid. 2. We have a pin
-                 * on the CachedPage. We have to deal with three cases here.
-                 * Case 1: The dpid on the CachedPage is invalid (-1). This
-                 * indicates that this buffer has never been used or is a
-                 * confiscated page. So we are the only ones holding it. Get a lock
-                 * on the required dpid's hash bucket, check if someone inserted
-                 * the page we want into the table. If so, decrement the
-                 * pincount on the victim and return the winner page in the
-                 * table. If such a winner does not exist, insert the victim and
-                 * return it. Case 2: The dpid on the CachedPage is valid. Case
-                 * 2a: The current dpid and required dpid hash to the same
-                 * bucket. Get the bucket lock, check that the victim is still
-                 * at pinCount == 1 If so check if there is a winning CachedPage
-                 * with the required dpid. If so, decrement the pinCount on the
-                 * victim and return the winner. If not, update the contents of
-                 * the CachedPage to hold the required dpid and return it. If
-                 * the picCount on the victim was != 1 or CachedPage was dirty
-                 * someone used the victim for its old contents -- Decrement the
-                 * pinCount and retry. Case 2b: The current dpid and required
-                 * dpid hash to different buckets. Get the two bucket locks in
-                 * the order of the bucket indexes (Ordering prevents
-                 * deadlocks). Check for the existence of a winner in the new
-                 * bucket and for potential use of the victim (pinCount != 1).
-                 * If everything looks good, remove the CachedPage from the old
-                 * bucket, and add it to the new bucket and update its header
-                 * with the new dpid.
-                 */
-                if (victim.dpid < 0) {
-                    /*
-                     * Case 1.
-                     */
-                    bucket.bucketLock.lock();
-                    try {
-                        if (!victim.pinCount.compareAndSet(0, 1)) {
-                            continue;
-                        }
-                        // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement
-                        // pin count and try again
-                        if (victim.dpid >= 0) {
-                            victim.pinCount.decrementAndGet();
-                            continue;
-                        }
-                        if (DEBUG) {
-                            confiscateLock.lock();
-                            try{
-                                if (confiscatedPages.contains(victim)) {
-                                    throw new IllegalStateException();
-                                }
-                            } finally{
-                                confiscateLock.unlock();
-                            }
-                        }
-                        cPage = bucket.cachedPage;
-                        while (cPage != null) {
-                            if (cPage.dpid == dpid) {
-                                cPage.pinCount.incrementAndGet();
-                                victim.pinCount.decrementAndGet();
-                                if(DEBUG) {
-                                    assert !cPage.confiscated.get();
-                                }
-                                return cPage;
-                            }
-                            cPage = cPage.next;
-                        }
-                        victim.reset(dpid);
-                        victim.next = bucket.cachedPage;
-                        bucket.cachedPage = victim;
-                    } finally {
-                        bucket.bucketLock.unlock();
-                    }
 
-                    if(DEBUG) {
-                        assert !victim.confiscated.get();
-                    }
-                    return victim;
-                }
-                int victimHash = hash(victim.dpid);
-                if (victimHash == hash) {
-                    /*
-                     * Case 2a.
-                     */
-                    bucket.bucketLock.lock();
-                    try {
-                        if (!victim.pinCount.compareAndSet(0, 1)) {
-                            continue;
-                        }
-                        // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement
-                        // pin count and try again
-                        if (victimHash != hash(victim.dpid)) {
-                            victim.pinCount.decrementAndGet();
-                            continue;
-                        }
-                        if (DEBUG) {
-                            confiscateLock.lock();
-                            try{
-                                if (confiscatedPages.contains(victim)) {
-                                    throw new IllegalStateException();
-                                }
-                            }finally{
-                                confiscateLock.unlock();
-                            }
-                        }
-                        cPage = bucket.cachedPage;
-                        while (cPage != null) {
-                            if (cPage.dpid == dpid) {
-                                cPage.pinCount.incrementAndGet();
-                                victim.pinCount.decrementAndGet();
-                                if(DEBUG) {
-                                    assert !victim.confiscated.get();
-                                }
-                                return cPage;
-                            }
-                            cPage = cPage.next;
-                        }
-                        victim.reset(dpid);
-                    } finally {
-                        bucket.bucketLock.unlock();
-                    }
-                    if(DEBUG) {
-                        assert !victim.confiscated.get();
-                    }
-                    return victim;
-                } else {
-                    /*
-                     * Case 2b.
-                     */
-                    CacheBucket victimBucket = pageMap[victimHash];
-                    if (victimHash < hash) {
-                        victimBucket.bucketLock.lock();
-                        bucket.bucketLock.lock();
-                    } else {
-                        bucket.bucketLock.lock();
-                        victimBucket.bucketLock.lock();
-                    }
-                    try {
-                        if (!victim.pinCount.compareAndSet(0, 1)) {
-                            continue;
-                        }
-                        // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement
-                        // pin count and try again
-                        if (victimHash != hash(victim.dpid)) {
-                            victim.pinCount.decrementAndGet();
-                            continue;
-                        }
-                        if (DEBUG) {
-                            if (confiscatedPages.contains(victim)) {
-                                throw new IllegalStateException();
-                            }
-                        }
-                        cPage = bucket.cachedPage;
-                        while (cPage != null) {
-                            if (cPage.dpid == dpid) {
-                                cPage.pinCount.incrementAndGet();
-                                victim.pinCount.decrementAndGet();
-                                if(DEBUG) {
-                                    assert !cPage.confiscated.get();
-                                }
-                                return cPage;
-                            }
-                            cPage = cPage.next;
-                        }
-                        if (victimBucket.cachedPage == victim) {
-                            victimBucket.cachedPage = victim.next;
-                        } else {
-                            CachedPage victimPrev = victimBucket.cachedPage;
-                            while (victimPrev.next != victim) {
-                                victimPrev = victimPrev.next;
-                                if (victimPrev == null) {
-                                    throw new IllegalStateException();
-                                }
-                            }
-                            victimPrev.next = victim.next;
-                        }
-                        victim.reset(dpid);
-                        victim.next = bucket.cachedPage;
-                        bucket.cachedPage = victim;
-                    } finally {
-                        victimBucket.bucketLock.unlock();
-                        bucket.bucketLock.unlock();
-                    }
-                    if(DEBUG) {
-                        assert !victim.confiscated.get();
-                    }
-                    return victim;
-                }
+            if (DEBUG) {
+                assert !victim.confiscated.get();
             }
-            synchronized (cleanerThread.threadLock) {
-                try {
-                    pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock);
-                } catch (InterruptedException e) {
-                    // Re-interrupt the thread so this gets handled later
-                    Thread.currentThread().interrupt();
-                }
-            }
-            // Heuristic optimization. Check whether the cleaner thread has
-            // cleaned pages since we did our last pin attempt.
-            if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) {
-                // Don't go to sleep and wait for notification from the cleaner,
-                // just try to pin again immediately.
-                continue;
-            }
-            synchronized (cleanerThread.cleanNotification) {
-                try {
-                    // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify
-                    do {
-                        cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
-                    } while (false);
-                } catch (InterruptedException e) {
-                    // Re-interrupt the thread so this gets handled later
-                    Thread.currentThread().interrupt();
-                }
-            }
-            finishQueue();
+            return victim;
         }
-        throw new HyracksDataException("Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES +
-                " cycles (buffer cache undersized?)");
+        int victimHash = hash(victim.dpid);
+        if (victimHash == hash) {
+        /*
+         * Case 2a.
+         */
+            bucket.bucketLock.lock();
+            try {
+                if (!victim.pinCount.compareAndSet(0, 1)) {
+                    return null;
+                }
+                // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement
+                // pin count and try again
+                if (victimHash != hash(victim.dpid)) {
+                    victim.pinCount.decrementAndGet();
+                    return null;
+                }
+                if (DEBUG) {
+                    confiscateLock.lock();
+                    try {
+                        if (confiscatedPages.contains(victim)) {
+                            throw new IllegalStateException();
+                        }
+                    } finally {
+                        confiscateLock.unlock();
+                    }
+                }
+                cPage = findTargetInBucket(dpid, bucket.cachedPage, victim);
+                if (cPage != null) {
+                    return cPage;
+                }
+                victim.reset(dpid);
+            } finally {
+                bucket.bucketLock.unlock();
+            }
+            if (DEBUG) {
+                assert !victim.confiscated.get();
+            }
+            return victim;
+        } else {
+        /*
+         * Case 2b.
+         */
+            CacheBucket victimBucket = pageMap[victimHash];
+            if (victimHash < hash) {
+                victimBucket.bucketLock.lock();
+                bucket.bucketLock.lock();
+            } else {
+                bucket.bucketLock.lock();
+                victimBucket.bucketLock.lock();
+            }
+            try {
+                if (!victim.pinCount.compareAndSet(0, 1)) {
+                    return null;
+                }
+                // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement
+                // pin count and try again
+                if (victimHash != hash(victim.dpid)) {
+                    victim.pinCount.decrementAndGet();
+                    return null;
+                }
+                if (DEBUG && confiscatedPages.contains(victim)) {
+                    throw new IllegalStateException();
+                }
+                cPage = findTargetInBucket(dpid, bucket.cachedPage, victim);
+                if (cPage != null) {
+                    return cPage;
+                }
+                if (victimBucket.cachedPage == victim) {
+                    victimBucket.cachedPage = victim.next;
+                } else {
+                    CachedPage victimPrev = victimBucket.cachedPage;
+                    while (victimPrev.next != victim) {
+                        victimPrev = victimPrev.next;
+                        if (victimPrev == null) {
+                            throw new IllegalStateException();
+                        }
+                    }
+                    victimPrev.next = victim.next;
+                }
+                victim.reset(dpid);
+                victim.next = bucket.cachedPage;
+                bucket.cachedPage = victim;
+            } finally {
+                victimBucket.bucketLock.unlock();
+                bucket.bucketLock.unlock();
+            }
+            if (DEBUG) {
+                assert !victim.confiscated.get();
+            }
+            return victim;
+        }
+    }
+
+    private CachedPage findTargetInBucket(long dpid, CachedPage cPage, CachedPage victim) {
+        while (cPage != null) {
+            if (cPage.dpid == dpid) {
+                cPage.pinCount.incrementAndGet();
+                victim.pinCount.decrementAndGet();
+                if (DEBUG) {
+                    assert !cPage.confiscated.get();
+                }
+                break;
+            }
+            cPage = cPage.next;
+        }
+        return cPage;
     }
 
     private String dumpState() {
@@ -494,30 +454,30 @@
             cb.bucketLock.lock();
             try {
                 CachedPage cp = cb.cachedPage;
-                if (cp != null) {
-                    buffer.append("   ").append(i).append('\n');
-                    while (cp != null) {
-                        buffer.append("      ").append(cp.cpid).append(" -> [")
-                                .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
-                                .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
-                                .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
-                                .append(cp.confiscated.get() ? "confiscated" : "physical").append(", ")
-                                .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
-                        cp = cp.next;
-                        ++nCachedPages;
-                    }
+                if (cp == null) {
+                    continue;
+                }
+                buffer.append("   ").append(i).append('\n');
+                while (cp != null) {
+                    buffer.append("      ").append(cp.cpid).append(" -> [")
+                            .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
+                            .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
+                            .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
+                            .append(cp.confiscated.get() ? "confiscated" : "physical").append(", ")
+                            .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
+                    cp = cp.next;
+                    ++nCachedPages;
                 }
             } finally {
                 cb.bucketLock.unlock();
             }
         }
         buffer.append("Number of cached pages: ").append(nCachedPages).append('\n');
-        if(DEBUG){
+        if (DEBUG){
             confiscateLock.lock();
-            try{
+            try {
                 buffer.append("Number of confiscated pages: ").append(confiscatedPages.size()).append('\n');
-            }
-            finally{
+            } finally {
                 confiscateLock.unlock();
             }
         }
@@ -534,13 +494,13 @@
                         c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0) {
                     return false;
                 }
-                if(c.valid){
+                if (c.valid){
                     reachableDpids.add(c.dpid);
                 }
             }
         }
         for(Long l: reachableDpids){
-            if(!canFindValidCachedPage(l)){
+            if (!canFindValidCachedPage(l)){
                 return false;
             }
         }
@@ -918,12 +878,10 @@
                     }
                 }
                 // Take care of the head of the chain.
-                if (bucket.cachedPage != null) {
-                    if (invalidateIfFileIdMatch(fileId, bucket.cachedPage, flushDirtyPages)) {
-                        CachedPage cPage = bucket.cachedPage;
-                        bucket.cachedPage = bucket.cachedPage.next;
-                        cPage.next = null;
-                    }
+                if (bucket.cachedPage != null && invalidateIfFileIdMatch(fileId, bucket.cachedPage, flushDirtyPages)) {
+                    CachedPage cPage = bucket.cachedPage;
+                    bucket.cachedPage = bucket.cachedPage.next;
+                    cPage.next = null;
                 }
             } finally {
                 bucket.bucketLock.unlock();
@@ -1115,30 +1073,26 @@
                 boolean found = false;
                 //traverse the bucket's linked list to find the victim.
                 while (curr != null) {
-                    if (curr == victim) { // we found where the victim
+                    if (curr == victim) {
+                        // we found where the victim
                         // resides in the hash table
-                        // if this is the first page in the bucket
+                        if (DEBUG) {
+                            assert curr != curr.next;
+                        }
                         if (prev == null) {
-                            if(DEBUG) {
-                                assert curr != curr.next;
-                            }
-                            bucket.cachedPage = bucket.cachedPage.next;
-                            found = true;
-                            break;
+                            // if this is the first page in the bucket
+                            bucket.cachedPage = curr.next;
+                        } else {
                             // if it isn't we need to make the previous
                             // node point to where it should
-                        } else {
-                            if(DEBUG) {
-                                assert curr.next != curr;
-                            }
                             prev.next = curr.next;
-                            curr.next = null;
-                            if(DEBUG) {
+                            if (DEBUG) {
                                 assert prev.next != prev;
                             }
-                            found = true;
-                            break;
                         }
+                        curr.next = null;
+                        found = true;
+                        break;
                     }
                     // go to the next entry
                     prev = curr;
@@ -1170,7 +1124,7 @@
             if (fInfo == null) {
                 throw new HyracksDataException("No such file mapped for fileId:" + fileId);
             }
-            if(DEBUG) {
+            if (DEBUG) {
                 assert ioManager.getSize(fInfo.getFileHandle()) % getPageSizeWithHeader() == 0;
             }
             return (int) (ioManager.getSize(fInfo.getFileHandle()) / getPageSizeWithHeader());
@@ -1184,141 +1138,162 @@
 
     @Override
     public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
-        return confiscatePage(dpid, pageReplacementStrategy::findVictim);
+        return confiscatePage(dpid, 1);
     }
 
     @Override
     public ICachedPage confiscateLargePage(long dpid, int multiplier, int extraBlockPageId)
             throws HyracksDataException {
-        ICachedPage cachedPage = confiscatePage(dpid, () -> pageReplacementStrategy.findVictim(multiplier));
+        ICachedPage cachedPage = confiscatePage(dpid, multiplier);
         ((ICachedPageInternal)cachedPage).setExtraBlockPageId(extraBlockPageId);
         return cachedPage;
     }
 
-    private ICachedPage confiscatePage(long dpid, Supplier<ICachedPageInternal> victimSupplier)
+    private ICachedPage confiscatePage(long dpid, int multiplier)
             throws HyracksDataException {
-        for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) {
-            int startCleanedCount = cleanerThread.cleanedCount;
-            ICachedPage returnPage = null;
-            CachedPage victim = (CachedPage) victimSupplier.get();
-            if (victim != null) {
-                if(DEBUG) {
-                    assert !victim.confiscated.get();
+        return getPageLoop(dpid, multiplier, true);
+    }
+
+    private ICachedPage confiscateInner(long dpid, int multiplier) {
+        ICachedPage returnPage = null;
+        CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim(multiplier);
+        if (victim == null) {
+            return victim;
+        }
+        if (DEBUG) {
+            assert !victim.confiscated.get();
+        }
+        // find a page that would possibly be evicted anyway
+        // Case 1 from findPage()
+        if (victim.dpid < 0) { // new page
+            if (!victim.pinCount.compareAndSet(0, 1)) {
+                return null;
+            }
+            // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement
+            // pin count and try again
+            if (victim.dpid >= 0) {
+                victim.pinCount.decrementAndGet();
+                return null;
+            }
+            returnPage = victim;
+            ((CachedPage) returnPage).dpid = dpid;
+        } else {
+            // Case 2a/b
+            int pageHash = hash(victim.getDiskPageId());
+            CacheBucket bucket = pageMap[pageHash];
+            bucket.bucketLock.lock();
+            try {
+                // readjust the next pointers to remove this page from
+                // the pagemap
+                CachedPage curr = bucket.cachedPage;
+                CachedPage prev = null;
+                boolean found = false;
+                //traverse the bucket's linked list to find the victim.
+                while (curr != null) {
+                    if (curr == victim) { // we found where the victim
+                        // resides in the hash table
+                        if (!victim.pinCount.compareAndSet(0, 1)) {
+                            break;
+                        }
+                        if (DEBUG) {
+                            assert curr != curr.next;
+                        }
+                        if (prev == null) {
+                            // if this is the first page in the bucket
+                            bucket.cachedPage = curr.next;
+                            // if it isn't we need to make the previous
+                            // node point to where it should
+                        } else {
+                            prev.next = curr.next;
+                            if (DEBUG) {
+                                assert prev.next != prev;
+                            }
+                        }
+                        curr.next = null;
+                        found = true;
+                        break;
+                    }
+                    // go to the next entry
+                    prev = curr;
+                    curr = curr.next;
                 }
-                // find a page that would possibly be evicted anyway
-                // Case 1 from findPage()
-                if (victim.dpid < 0) { // new page
-                    if (!victim.pinCount.compareAndSet(0, 1)) {
-                        continue;
-                    }
-                    // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement
-                    // pin count and try again
-                    if (victim.dpid >= 0) {
-                        victim.pinCount.decrementAndGet();
-                        continue;
-                    }
+                if (found) {
                     returnPage = victim;
                     ((CachedPage) returnPage).dpid = dpid;
-                } else {
-                    // Case 2a/b
-                    int pageHash = hash(victim.getDiskPageId());
-                    CacheBucket bucket = pageMap[pageHash];
-                    bucket.bucketLock.lock();
-                    try {
-                        // readjust the next pointers to remove this page from
-                        // the pagemap
-                        CachedPage curr = bucket.cachedPage;
-                        CachedPage prev = null;
-                        boolean found = false;
-                        //traverse the bucket's linked list to find the victim.
-                        while (curr != null) {
-                            if (curr == victim) { // we found where the victim
-                                                  // resides in the hash table
-                                if (!victim.pinCount.compareAndSet(0, 1)) {
-                                    break;
-                                }
-                                // if this is the first page in the bucket
-                                if (prev == null) {
-                                    if(DEBUG) {
-                                        assert curr != curr.next;
-                                    }
-                                    bucket.cachedPage = bucket.cachedPage.next;
-                                    found = true;
-                                    break;
-                                    // if it isn't we need to make the previous
-                                    // node point to where it should
-                                } else {
-                                    if(DEBUG) {
-                                        assert curr.next != curr;
-                                    }
-                                    prev.next = curr.next;
-                                    curr.next = null;
-                                    if(DEBUG) {
-                                        assert prev.next != prev;
-                                    }
-                                    found = true;
-                                    break;
-                                }
-                            }
-                            // go to the next entry
-                            prev = curr;
-                            curr = curr.next;
-                        }
-                        if (found) {
-                            returnPage = victim;
-                            ((CachedPage) returnPage).dpid = dpid;
-                        } //otherwise, someone took the same victim before we acquired the lock. try again!
-                    } finally {
-                        bucket.bucketLock.unlock();
-                    }
-                }
+                } //otherwise, someone took the same victim before we acquired the lock. try again!
+            } finally {
+                bucket.bucketLock.unlock();
             }
-            // if we found a page after all that, go ahead and finish
-            if (returnPage != null) {
-                ((CachedPage) returnPage).confiscated.set(true);
-                if (DEBUG) {
-                    confiscateLock.lock();
-                    try{
-                        confiscatedPages.add((CachedPage) returnPage);
-                        confiscatedPagesOwner.put((CachedPage) returnPage, Thread.currentThread().getStackTrace());
-                    }
-                    finally{
-                        confiscateLock.unlock();
-                    }
-                }
-                return returnPage;
-            }
-            // no page available to confiscate. try kicking the cleaner thread.
-            synchronized (cleanerThread.threadLock) {
-                try {
-                    pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock);
-                } catch (InterruptedException e) {
-                    // Re-interrupt the thread so this gets handled later
-                    Thread.currentThread().interrupt();
-                }
-            }
-            // Heuristic optimization. Check whether the cleaner thread has
-            // cleaned pages since we did our last pin attempt.
-            if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) {
-                // Don't go to sleep and wait for notification from the cleaner,
-                // just try to pin again immediately.
-                continue;
-            }
-            synchronized (cleanerThread.cleanNotification) {
-                try {
-                    // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify
-                    do {
-                        cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
-                    } while (false);
-                } catch (InterruptedException e) {
-                    // Re-interrupt the thread so this gets handled later
-                    Thread.currentThread().interrupt();
-                }
-            }
-            finishQueue();
         }
-        throw new HyracksDataException("Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES +
-                " cycles (buffer cache undersized?)");
+        // if we found a page after all that, go ahead and finish
+        if (returnPage != null) {
+            ((CachedPage) returnPage).confiscated.set(true);
+            if (DEBUG) {
+                confiscateLock.lock();
+                try {
+                    confiscatedPages.add((CachedPage) returnPage);
+                    confiscatedPagesOwner.put((CachedPage) returnPage, Thread.currentThread().getStackTrace());
+                } finally {
+                    confiscateLock.unlock();
+                }
+            }
+            return returnPage;
+        }
+        return null;
+    }
+
+    private ICachedPage getPageLoop(long dpid, int multiplier, boolean confiscate)
+            throws HyracksDataException {
+        int cycleCount = 0;
+        try {
+            while (true) {
+                cycleCount++;
+                int startCleanedCount = cleanerThread.cleanedCount;
+                ICachedPage page = confiscate ? confiscateInner(dpid, multiplier) : findPageInner(dpid);
+                if (page != null) {
+                    return page;
+                }
+                // no page available to confiscate. try kicking the cleaner thread.
+                synchronized (cleanerThread.threadLock) {
+                    try {
+                        pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock);
+                    } catch (InterruptedException e) {
+                        // Re-interrupt the thread so this gets handled later
+                        Thread.currentThread().interrupt();
+                    }
+                }
+                // Heuristic optimization. Check whether the cleaner thread has
+                // cleaned pages since we did our last pin attempt.
+                if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) {
+                    // Don't go to sleep and wait for notification from the cleaner,
+                    // just try to pin again immediately.
+                    continue;
+                }
+                synchronized (cleanerThread.cleanNotification) {
+                    try {
+                        // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify
+                        // This seemingly pointless loop keeps SonarQube happy
+                        do {
+                            cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
+                        } while (false);
+                    } catch (InterruptedException e) {
+                        // Re-interrupt the thread so this gets handled later
+                        Thread.currentThread().interrupt();
+                    }
+                }
+                finishQueue();
+                if (cycleCount > MAX_PIN_ATTEMPT_CYCLES) {
+                    cycleCount = 0; // suppress warning below
+                    throw new HyracksDataException("Unable to find free page in buffer cache after "
+                            + MAX_PIN_ATTEMPT_CYCLES + " cycles (buffer cache undersized?)");
+                }
+            }
+        } finally {
+            if (cycleCount > PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD && LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Took " + cycleCount + " cycles to find free page in buffer cache.  (buffer cache " +
+                        "undersized?)");
+            }
+        }
     }
 
     @Override
@@ -1329,15 +1304,15 @@
     @Override
     public void returnPage(ICachedPage page, boolean reinsert) {
         CachedPage cPage = (CachedPage) page;
-        CacheBucket bucket = null;
-        if(!page.confiscated()){
+        CacheBucket bucket;
+        if (!page.confiscated()){
             return;
         }
         if (reinsert) {
             int hash = hash(cPage.dpid);
             bucket = pageMap[hash];
             bucket.bucketLock.lock();
-            if(DEBUG) {
+            if (DEBUG) {
                 confiscateLock.lock();
             }
             try {
@@ -1346,7 +1321,7 @@
                 cPage.next = bucket.cachedPage;
                 bucket.cachedPage = cPage;
                 cPage.pinCount.decrementAndGet();
-                if(DEBUG){
+                if (DEBUG){
                     assert cPage.pinCount.get() == 0 ;
                     assert cPage.latch.getReadLockCount() == 0;
                     assert cPage.latch.getWriteHoldCount() == 0;
@@ -1355,22 +1330,22 @@
                 }
             } finally {
                 bucket.bucketLock.unlock();
-                if(DEBUG) {
+                if (DEBUG) {
                     confiscateLock.unlock();
                 }
             }
         } else {
             cPage.invalidate();
             cPage.pinCount.decrementAndGet();
-            if(DEBUG){
+            if (DEBUG){
                 assert cPage.pinCount.get() == 0;
                 assert cPage.latch.getReadLockCount() == 0;
                 assert cPage.latch.getWriteHoldCount() == 0;
                 confiscateLock.lock();
-                try{
+                try {
                     confiscatedPages.remove(cPage);
                     confiscatedPagesOwner.remove(cPage);
-                } finally{
+                } finally {
                     confiscateLock.unlock();
                 }
             }
@@ -1421,7 +1396,7 @@
     public void purgeHandle(int fileId) throws HyracksDataException{
         synchronized(fileInfoMap){
                 BufferedFileHandle fh = fileInfoMap.get(fileId);
-                if(fh != null){
+                if (fh != null){
                     ioManager.close(fh.getFileHandle());
                     fileInfoMap.remove(fileId);
                     fileMapManager.unregisterFile(fileId);