ASTERIXDB-1516: Unable to find free page in buffer cache after 3 cycles
Update exhaustion logic to be two-tiered:
- emit warning when cycle count exceeds warning threshold (3)
- fail if cycle count reaches the failure threshold (1000)
Change-Id: I46fa6bbda8c2f81e5e570dd6c07e4f4b794ef5bb
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1038
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 080c76f..1b75f94 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -37,7 +37,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -57,7 +56,8 @@
private static final int MIN_CLEANED_COUNT_DIFF = 3;
private static final int PIN_MAX_WAIT_TIME = 50;
- private static final int MAX_PIN_ATTEMPT_CYCLES = 3;
+ private static final int PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD = 3;
+ private static final int MAX_PIN_ATTEMPT_CYCLES = 1000;
public static final boolean DEBUG = false;
private final int pageSize;
@@ -107,7 +107,7 @@
closed = false;
fifoWriter = new AsyncFIFOPageQueueManager(this);
- if( DEBUG ) {
+ if ( DEBUG ) {
confiscatedPages = new ArrayList<>();
confiscatedPagesOwner = new HashMap<>();
confiscateLock = new ReentrantLock();
@@ -200,7 +200,7 @@
throw new IllegalStateException();
}
}
- } finally{
+ } finally {
confiscateLock.unlock();
}
}
@@ -216,267 +216,227 @@
cPage.valid = true;
}
pageReplacementStrategy.notifyCachePageAccess(cPage);
- if(DEBUG){
+ if (DEBUG){
pinnedPageOwner.put(cPage, Thread.currentThread().getStackTrace());
}
return cPage;
}
private CachedPage findPage(long dpid) throws HyracksDataException {
+ return (CachedPage) getPageLoop(dpid, -1, false);
+ }
- for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) {
- int startCleanedCount = cleanerThread.cleanedCount;
-
- CachedPage cPage = null;
+ private ICachedPage findPageInner(long dpid) {
+ CachedPage cPage;
+ /*
+ * Hash dpid to get a bucket and then check if the page exists in
+ * the bucket.
+ */
+ int hash = hash(dpid);
+ CacheBucket bucket = pageMap[hash];
+ bucket.bucketLock.lock();
+ try {
+ cPage = bucket.cachedPage;
+ while (cPage != null) {
+ if (DEBUG) {
+ assert bucket.cachedPage != bucket.cachedPage.next;
+ }
+ if (cPage.dpid == dpid) {
+ if (DEBUG) {
+ assert !cPage.confiscated.get();
+ }
+ cPage.pinCount.incrementAndGet();
+ return cPage;
+ }
+ cPage = cPage.next;
+ }
+ } finally {
+ bucket.bucketLock.unlock();
+ }
+ /*
+ * If we got here, the page was not in the hash table. Now we ask
+ * the page replacement strategy to find us a victim.
+ */
+ CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim();
+ if (victim == null) {
+ return null;
+ }
+ /*
+ * We have a victim with the following invariants. 1. The dpid
+ * on the CachedPage may or may not be valid. 2. We have a pin
+ * on the CachedPage. We have to deal with three cases here.
+ * Case 1: The dpid on the CachedPage is invalid (-1). This
+ * indicates that this buffer has never been used or is a
+ * confiscated page. So we are the only ones holding it. Get a lock
+ * on the required dpid's hash bucket, check if someone inserted
+ * the page we want into the table. If so, decrement the
+ * pincount on the victim and return the winner page in the
+ * table. If such a winner does not exist, insert the victim and
+ * return it. Case 2: The dpid on the CachedPage is valid. Case
+ * 2a: The current dpid and required dpid hash to the same
+ * bucket. Get the bucket lock, check that the victim is still
+ * at pinCount == 1 If so check if there is a winning CachedPage
+ * with the required dpid. If so, decrement the pinCount on the
+ * victim and return the winner. If not, update the contents of
+ * the CachedPage to hold the required dpid and return it. If
+ * the picCount on the victim was != 1 or CachedPage was dirty
+ * someone used the victim for its old contents -- Decrement the
+ * pinCount and retry. Case 2b: The current dpid and required
+ * dpid hash to different buckets. Get the two bucket locks in
+ * the order of the bucket indexes (Ordering prevents
+ * deadlocks). Check for the existence of a winner in the new
+ * bucket and for potential use of the victim (pinCount != 1).
+ * If everything looks good, remove the CachedPage from the old
+ * bucket, and add it to the new bucket and update its header
+ * with the new dpid.
+ */
+ if (victim.dpid < 0) {
/*
- * Hash dpid to get a bucket and then check if the page exists in
- * the bucket.
+ * Case 1.
*/
- int hash = hash(dpid);
- CacheBucket bucket = pageMap[hash];
bucket.bucketLock.lock();
try {
- cPage = bucket.cachedPage;
- while (cPage != null) {
- if(DEBUG) {
- assert bucket.cachedPage != bucket.cachedPage.next;
- }
- if (cPage.dpid == dpid) {
- if(DEBUG) {
- assert !cPage.confiscated.get();
- }
- cPage.pinCount.incrementAndGet();
- return cPage;
- }
- cPage = cPage.next;
+ if (!victim.pinCount.compareAndSet(0, 1)) {
+ return null;
}
+ // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement
+ // pin count and try again
+ if (victim.dpid >= 0) {
+ victim.pinCount.decrementAndGet();
+ return null;
+ }
+ if (DEBUG) {
+ confiscateLock.lock();
+ try {
+ if (confiscatedPages.contains(victim)) {
+ throw new IllegalStateException();
+ }
+ } finally {
+ confiscateLock.unlock();
+ }
+ }
+ cPage = findTargetInBucket(dpid, bucket.cachedPage, victim);
+ if (cPage != null) {
+ return cPage;
+ }
+ victim.reset(dpid);
+ victim.next = bucket.cachedPage;
+ bucket.cachedPage = victim;
} finally {
bucket.bucketLock.unlock();
}
- /*
- * If we got here, the page was not in the hash table. Now we ask
- * the page replacement strategy to find us a victim.
- */
- CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim();
- if (victim != null) {
- /*
- * We have a victim with the following invariants. 1. The dpid
- * on the CachedPage may or may not be valid. 2. We have a pin
- * on the CachedPage. We have to deal with three cases here.
- * Case 1: The dpid on the CachedPage is invalid (-1). This
- * indicates that this buffer has never been used or is a
- * confiscated page. So we are the only ones holding it. Get a lock
- * on the required dpid's hash bucket, check if someone inserted
- * the page we want into the table. If so, decrement the
- * pincount on the victim and return the winner page in the
- * table. If such a winner does not exist, insert the victim and
- * return it. Case 2: The dpid on the CachedPage is valid. Case
- * 2a: The current dpid and required dpid hash to the same
- * bucket. Get the bucket lock, check that the victim is still
- * at pinCount == 1 If so check if there is a winning CachedPage
- * with the required dpid. If so, decrement the pinCount on the
- * victim and return the winner. If not, update the contents of
- * the CachedPage to hold the required dpid and return it. If
- * the picCount on the victim was != 1 or CachedPage was dirty
- * someone used the victim for its old contents -- Decrement the
- * pinCount and retry. Case 2b: The current dpid and required
- * dpid hash to different buckets. Get the two bucket locks in
- * the order of the bucket indexes (Ordering prevents
- * deadlocks). Check for the existence of a winner in the new
- * bucket and for potential use of the victim (pinCount != 1).
- * If everything looks good, remove the CachedPage from the old
- * bucket, and add it to the new bucket and update its header
- * with the new dpid.
- */
- if (victim.dpid < 0) {
- /*
- * Case 1.
- */
- bucket.bucketLock.lock();
- try {
- if (!victim.pinCount.compareAndSet(0, 1)) {
- continue;
- }
- // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement
- // pin count and try again
- if (victim.dpid >= 0) {
- victim.pinCount.decrementAndGet();
- continue;
- }
- if (DEBUG) {
- confiscateLock.lock();
- try{
- if (confiscatedPages.contains(victim)) {
- throw new IllegalStateException();
- }
- } finally{
- confiscateLock.unlock();
- }
- }
- cPage = bucket.cachedPage;
- while (cPage != null) {
- if (cPage.dpid == dpid) {
- cPage.pinCount.incrementAndGet();
- victim.pinCount.decrementAndGet();
- if(DEBUG) {
- assert !cPage.confiscated.get();
- }
- return cPage;
- }
- cPage = cPage.next;
- }
- victim.reset(dpid);
- victim.next = bucket.cachedPage;
- bucket.cachedPage = victim;
- } finally {
- bucket.bucketLock.unlock();
- }
- if(DEBUG) {
- assert !victim.confiscated.get();
- }
- return victim;
- }
- int victimHash = hash(victim.dpid);
- if (victimHash == hash) {
- /*
- * Case 2a.
- */
- bucket.bucketLock.lock();
- try {
- if (!victim.pinCount.compareAndSet(0, 1)) {
- continue;
- }
- // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement
- // pin count and try again
- if (victimHash != hash(victim.dpid)) {
- victim.pinCount.decrementAndGet();
- continue;
- }
- if (DEBUG) {
- confiscateLock.lock();
- try{
- if (confiscatedPages.contains(victim)) {
- throw new IllegalStateException();
- }
- }finally{
- confiscateLock.unlock();
- }
- }
- cPage = bucket.cachedPage;
- while (cPage != null) {
- if (cPage.dpid == dpid) {
- cPage.pinCount.incrementAndGet();
- victim.pinCount.decrementAndGet();
- if(DEBUG) {
- assert !victim.confiscated.get();
- }
- return cPage;
- }
- cPage = cPage.next;
- }
- victim.reset(dpid);
- } finally {
- bucket.bucketLock.unlock();
- }
- if(DEBUG) {
- assert !victim.confiscated.get();
- }
- return victim;
- } else {
- /*
- * Case 2b.
- */
- CacheBucket victimBucket = pageMap[victimHash];
- if (victimHash < hash) {
- victimBucket.bucketLock.lock();
- bucket.bucketLock.lock();
- } else {
- bucket.bucketLock.lock();
- victimBucket.bucketLock.lock();
- }
- try {
- if (!victim.pinCount.compareAndSet(0, 1)) {
- continue;
- }
- // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement
- // pin count and try again
- if (victimHash != hash(victim.dpid)) {
- victim.pinCount.decrementAndGet();
- continue;
- }
- if (DEBUG) {
- if (confiscatedPages.contains(victim)) {
- throw new IllegalStateException();
- }
- }
- cPage = bucket.cachedPage;
- while (cPage != null) {
- if (cPage.dpid == dpid) {
- cPage.pinCount.incrementAndGet();
- victim.pinCount.decrementAndGet();
- if(DEBUG) {
- assert !cPage.confiscated.get();
- }
- return cPage;
- }
- cPage = cPage.next;
- }
- if (victimBucket.cachedPage == victim) {
- victimBucket.cachedPage = victim.next;
- } else {
- CachedPage victimPrev = victimBucket.cachedPage;
- while (victimPrev.next != victim) {
- victimPrev = victimPrev.next;
- if (victimPrev == null) {
- throw new IllegalStateException();
- }
- }
- victimPrev.next = victim.next;
- }
- victim.reset(dpid);
- victim.next = bucket.cachedPage;
- bucket.cachedPage = victim;
- } finally {
- victimBucket.bucketLock.unlock();
- bucket.bucketLock.unlock();
- }
- if(DEBUG) {
- assert !victim.confiscated.get();
- }
- return victim;
- }
+ if (DEBUG) {
+ assert !victim.confiscated.get();
}
- synchronized (cleanerThread.threadLock) {
- try {
- pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock);
- } catch (InterruptedException e) {
- // Re-interrupt the thread so this gets handled later
- Thread.currentThread().interrupt();
- }
- }
- // Heuristic optimization. Check whether the cleaner thread has
- // cleaned pages since we did our last pin attempt.
- if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) {
- // Don't go to sleep and wait for notification from the cleaner,
- // just try to pin again immediately.
- continue;
- }
- synchronized (cleanerThread.cleanNotification) {
- try {
- // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify
- do {
- cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
- } while (false);
- } catch (InterruptedException e) {
- // Re-interrupt the thread so this gets handled later
- Thread.currentThread().interrupt();
- }
- }
- finishQueue();
+ return victim;
}
- throw new HyracksDataException("Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES +
- " cycles (buffer cache undersized?)");
+ int victimHash = hash(victim.dpid);
+ if (victimHash == hash) {
+ /*
+ * Case 2a.
+ */
+ bucket.bucketLock.lock();
+ try {
+ if (!victim.pinCount.compareAndSet(0, 1)) {
+ return null;
+ }
+ // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement
+ // pin count and try again
+ if (victimHash != hash(victim.dpid)) {
+ victim.pinCount.decrementAndGet();
+ return null;
+ }
+ if (DEBUG) {
+ confiscateLock.lock();
+ try {
+ if (confiscatedPages.contains(victim)) {
+ throw new IllegalStateException();
+ }
+ } finally {
+ confiscateLock.unlock();
+ }
+ }
+ cPage = findTargetInBucket(dpid, bucket.cachedPage, victim);
+ if (cPage != null) {
+ return cPage;
+ }
+ victim.reset(dpid);
+ } finally {
+ bucket.bucketLock.unlock();
+ }
+ if (DEBUG) {
+ assert !victim.confiscated.get();
+ }
+ return victim;
+ } else {
+ /*
+ * Case 2b.
+ */
+ CacheBucket victimBucket = pageMap[victimHash];
+ if (victimHash < hash) {
+ victimBucket.bucketLock.lock();
+ bucket.bucketLock.lock();
+ } else {
+ bucket.bucketLock.lock();
+ victimBucket.bucketLock.lock();
+ }
+ try {
+ if (!victim.pinCount.compareAndSet(0, 1)) {
+ return null;
+ }
+ // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement
+ // pin count and try again
+ if (victimHash != hash(victim.dpid)) {
+ victim.pinCount.decrementAndGet();
+ return null;
+ }
+ if (DEBUG && confiscatedPages.contains(victim)) {
+ throw new IllegalStateException();
+ }
+ cPage = findTargetInBucket(dpid, bucket.cachedPage, victim);
+ if (cPage != null) {
+ return cPage;
+ }
+ if (victimBucket.cachedPage == victim) {
+ victimBucket.cachedPage = victim.next;
+ } else {
+ CachedPage victimPrev = victimBucket.cachedPage;
+ while (victimPrev.next != victim) {
+ victimPrev = victimPrev.next;
+ if (victimPrev == null) {
+ throw new IllegalStateException();
+ }
+ }
+ victimPrev.next = victim.next;
+ }
+ victim.reset(dpid);
+ victim.next = bucket.cachedPage;
+ bucket.cachedPage = victim;
+ } finally {
+ victimBucket.bucketLock.unlock();
+ bucket.bucketLock.unlock();
+ }
+ if (DEBUG) {
+ assert !victim.confiscated.get();
+ }
+ return victim;
+ }
+ }
+
+ private CachedPage findTargetInBucket(long dpid, CachedPage cPage, CachedPage victim) {
+ while (cPage != null) {
+ if (cPage.dpid == dpid) {
+ cPage.pinCount.incrementAndGet();
+ victim.pinCount.decrementAndGet();
+ if (DEBUG) {
+ assert !cPage.confiscated.get();
+ }
+ break;
+ }
+ cPage = cPage.next;
+ }
+ return cPage;
}
private String dumpState() {
@@ -494,30 +454,30 @@
cb.bucketLock.lock();
try {
CachedPage cp = cb.cachedPage;
- if (cp != null) {
- buffer.append(" ").append(i).append('\n');
- while (cp != null) {
- buffer.append(" ").append(cp.cpid).append(" -> [")
- .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
- .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
- .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
- .append(cp.confiscated.get() ? "confiscated" : "physical").append(", ")
- .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
- cp = cp.next;
- ++nCachedPages;
- }
+ if (cp == null) {
+ continue;
+ }
+ buffer.append(" ").append(i).append('\n');
+ while (cp != null) {
+ buffer.append(" ").append(cp.cpid).append(" -> [")
+ .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
+ .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
+ .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
+ .append(cp.confiscated.get() ? "confiscated" : "physical").append(", ")
+ .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
+ cp = cp.next;
+ ++nCachedPages;
}
} finally {
cb.bucketLock.unlock();
}
}
buffer.append("Number of cached pages: ").append(nCachedPages).append('\n');
- if(DEBUG){
+ if (DEBUG){
confiscateLock.lock();
- try{
+ try {
buffer.append("Number of confiscated pages: ").append(confiscatedPages.size()).append('\n');
- }
- finally{
+ } finally {
confiscateLock.unlock();
}
}
@@ -534,13 +494,13 @@
c.latch.getReadLockCount() != 0 || c.latch.getWriteHoldCount() != 0) {
return false;
}
- if(c.valid){
+ if (c.valid){
reachableDpids.add(c.dpid);
}
}
}
for(Long l: reachableDpids){
- if(!canFindValidCachedPage(l)){
+ if (!canFindValidCachedPage(l)){
return false;
}
}
@@ -918,12 +878,10 @@
}
}
// Take care of the head of the chain.
- if (bucket.cachedPage != null) {
- if (invalidateIfFileIdMatch(fileId, bucket.cachedPage, flushDirtyPages)) {
- CachedPage cPage = bucket.cachedPage;
- bucket.cachedPage = bucket.cachedPage.next;
- cPage.next = null;
- }
+ if (bucket.cachedPage != null && invalidateIfFileIdMatch(fileId, bucket.cachedPage, flushDirtyPages)) {
+ CachedPage cPage = bucket.cachedPage;
+ bucket.cachedPage = bucket.cachedPage.next;
+ cPage.next = null;
}
} finally {
bucket.bucketLock.unlock();
@@ -1115,30 +1073,26 @@
boolean found = false;
//traverse the bucket's linked list to find the victim.
while (curr != null) {
- if (curr == victim) { // we found where the victim
+ if (curr == victim) {
+ // we found where the victim
// resides in the hash table
- // if this is the first page in the bucket
+ if (DEBUG) {
+ assert curr != curr.next;
+ }
if (prev == null) {
- if(DEBUG) {
- assert curr != curr.next;
- }
- bucket.cachedPage = bucket.cachedPage.next;
- found = true;
- break;
+ // if this is the first page in the bucket
+ bucket.cachedPage = curr.next;
+ } else {
// if it isn't we need to make the previous
// node point to where it should
- } else {
- if(DEBUG) {
- assert curr.next != curr;
- }
prev.next = curr.next;
- curr.next = null;
- if(DEBUG) {
+ if (DEBUG) {
assert prev.next != prev;
}
- found = true;
- break;
}
+ curr.next = null;
+ found = true;
+ break;
}
// go to the next entry
prev = curr;
@@ -1170,7 +1124,7 @@
if (fInfo == null) {
throw new HyracksDataException("No such file mapped for fileId:" + fileId);
}
- if(DEBUG) {
+ if (DEBUG) {
assert ioManager.getSize(fInfo.getFileHandle()) % getPageSizeWithHeader() == 0;
}
return (int) (ioManager.getSize(fInfo.getFileHandle()) / getPageSizeWithHeader());
@@ -1184,141 +1138,162 @@
@Override
public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
- return confiscatePage(dpid, pageReplacementStrategy::findVictim);
+ return confiscatePage(dpid, 1);
}
@Override
public ICachedPage confiscateLargePage(long dpid, int multiplier, int extraBlockPageId)
throws HyracksDataException {
- ICachedPage cachedPage = confiscatePage(dpid, () -> pageReplacementStrategy.findVictim(multiplier));
+ ICachedPage cachedPage = confiscatePage(dpid, multiplier);
((ICachedPageInternal)cachedPage).setExtraBlockPageId(extraBlockPageId);
return cachedPage;
}
- private ICachedPage confiscatePage(long dpid, Supplier<ICachedPageInternal> victimSupplier)
+ private ICachedPage confiscatePage(long dpid, int multiplier)
throws HyracksDataException {
- for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) {
- int startCleanedCount = cleanerThread.cleanedCount;
- ICachedPage returnPage = null;
- CachedPage victim = (CachedPage) victimSupplier.get();
- if (victim != null) {
- if(DEBUG) {
- assert !victim.confiscated.get();
+ return getPageLoop(dpid, multiplier, true);
+ }
+
+ private ICachedPage confiscateInner(long dpid, int multiplier) {
+ ICachedPage returnPage = null;
+ CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim(multiplier);
+ if (victim == null) {
+ return victim;
+ }
+ if (DEBUG) {
+ assert !victim.confiscated.get();
+ }
+ // find a page that would possibly be evicted anyway
+ // Case 1 from findPage()
+ if (victim.dpid < 0) { // new page
+ if (!victim.pinCount.compareAndSet(0, 1)) {
+ return null;
+ }
+ // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement
+ // pin count and try again
+ if (victim.dpid >= 0) {
+ victim.pinCount.decrementAndGet();
+ return null;
+ }
+ returnPage = victim;
+ ((CachedPage) returnPage).dpid = dpid;
+ } else {
+ // Case 2a/b
+ int pageHash = hash(victim.getDiskPageId());
+ CacheBucket bucket = pageMap[pageHash];
+ bucket.bucketLock.lock();
+ try {
+ // readjust the next pointers to remove this page from
+ // the pagemap
+ CachedPage curr = bucket.cachedPage;
+ CachedPage prev = null;
+ boolean found = false;
+ //traverse the bucket's linked list to find the victim.
+ while (curr != null) {
+ if (curr == victim) { // we found where the victim
+ // resides in the hash table
+ if (!victim.pinCount.compareAndSet(0, 1)) {
+ break;
+ }
+ if (DEBUG) {
+ assert curr != curr.next;
+ }
+ if (prev == null) {
+ // if this is the first page in the bucket
+ bucket.cachedPage = curr.next;
+ // if it isn't we need to make the previous
+ // node point to where it should
+ } else {
+ prev.next = curr.next;
+ if (DEBUG) {
+ assert prev.next != prev;
+ }
+ }
+ curr.next = null;
+ found = true;
+ break;
+ }
+ // go to the next entry
+ prev = curr;
+ curr = curr.next;
}
- // find a page that would possibly be evicted anyway
- // Case 1 from findPage()
- if (victim.dpid < 0) { // new page
- if (!victim.pinCount.compareAndSet(0, 1)) {
- continue;
- }
- // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement
- // pin count and try again
- if (victim.dpid >= 0) {
- victim.pinCount.decrementAndGet();
- continue;
- }
+ if (found) {
returnPage = victim;
((CachedPage) returnPage).dpid = dpid;
- } else {
- // Case 2a/b
- int pageHash = hash(victim.getDiskPageId());
- CacheBucket bucket = pageMap[pageHash];
- bucket.bucketLock.lock();
- try {
- // readjust the next pointers to remove this page from
- // the pagemap
- CachedPage curr = bucket.cachedPage;
- CachedPage prev = null;
- boolean found = false;
- //traverse the bucket's linked list to find the victim.
- while (curr != null) {
- if (curr == victim) { // we found where the victim
- // resides in the hash table
- if (!victim.pinCount.compareAndSet(0, 1)) {
- break;
- }
- // if this is the first page in the bucket
- if (prev == null) {
- if(DEBUG) {
- assert curr != curr.next;
- }
- bucket.cachedPage = bucket.cachedPage.next;
- found = true;
- break;
- // if it isn't we need to make the previous
- // node point to where it should
- } else {
- if(DEBUG) {
- assert curr.next != curr;
- }
- prev.next = curr.next;
- curr.next = null;
- if(DEBUG) {
- assert prev.next != prev;
- }
- found = true;
- break;
- }
- }
- // go to the next entry
- prev = curr;
- curr = curr.next;
- }
- if (found) {
- returnPage = victim;
- ((CachedPage) returnPage).dpid = dpid;
- } //otherwise, someone took the same victim before we acquired the lock. try again!
- } finally {
- bucket.bucketLock.unlock();
- }
- }
+ } //otherwise, someone took the same victim before we acquired the lock. try again!
+ } finally {
+ bucket.bucketLock.unlock();
}
- // if we found a page after all that, go ahead and finish
- if (returnPage != null) {
- ((CachedPage) returnPage).confiscated.set(true);
- if (DEBUG) {
- confiscateLock.lock();
- try{
- confiscatedPages.add((CachedPage) returnPage);
- confiscatedPagesOwner.put((CachedPage) returnPage, Thread.currentThread().getStackTrace());
- }
- finally{
- confiscateLock.unlock();
- }
- }
- return returnPage;
- }
- // no page available to confiscate. try kicking the cleaner thread.
- synchronized (cleanerThread.threadLock) {
- try {
- pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock);
- } catch (InterruptedException e) {
- // Re-interrupt the thread so this gets handled later
- Thread.currentThread().interrupt();
- }
- }
- // Heuristic optimization. Check whether the cleaner thread has
- // cleaned pages since we did our last pin attempt.
- if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) {
- // Don't go to sleep and wait for notification from the cleaner,
- // just try to pin again immediately.
- continue;
- }
- synchronized (cleanerThread.cleanNotification) {
- try {
- // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify
- do {
- cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
- } while (false);
- } catch (InterruptedException e) {
- // Re-interrupt the thread so this gets handled later
- Thread.currentThread().interrupt();
- }
- }
- finishQueue();
}
- throw new HyracksDataException("Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES +
- " cycles (buffer cache undersized?)");
+ // if we found a page after all that, go ahead and finish
+ if (returnPage != null) {
+ ((CachedPage) returnPage).confiscated.set(true);
+ if (DEBUG) {
+ confiscateLock.lock();
+ try {
+ confiscatedPages.add((CachedPage) returnPage);
+ confiscatedPagesOwner.put((CachedPage) returnPage, Thread.currentThread().getStackTrace());
+ } finally {
+ confiscateLock.unlock();
+ }
+ }
+ return returnPage;
+ }
+ return null;
+ }
+
+ private ICachedPage getPageLoop(long dpid, int multiplier, boolean confiscate)
+ throws HyracksDataException {
+ int cycleCount = 0;
+ try {
+ while (true) {
+ cycleCount++;
+ int startCleanedCount = cleanerThread.cleanedCount;
+ ICachedPage page = confiscate ? confiscateInner(dpid, multiplier) : findPageInner(dpid);
+ if (page != null) {
+ return page;
+ }
+ // no page available to confiscate. try kicking the cleaner thread.
+ synchronized (cleanerThread.threadLock) {
+ try {
+ pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock);
+ } catch (InterruptedException e) {
+ // Re-interrupt the thread so this gets handled later
+ Thread.currentThread().interrupt();
+ }
+ }
+ // Heuristic optimization. Check whether the cleaner thread has
+ // cleaned pages since we did our last pin attempt.
+ if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) {
+ // Don't go to sleep and wait for notification from the cleaner,
+ // just try to pin again immediately.
+ continue;
+ }
+ synchronized (cleanerThread.cleanNotification) {
+ try {
+ // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify
+ // This seemingly pointless loop keeps SonarQube happy
+ do {
+ cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
+ } while (false);
+ } catch (InterruptedException e) {
+ // Re-interrupt the thread so this gets handled later
+ Thread.currentThread().interrupt();
+ }
+ }
+ finishQueue();
+ if (cycleCount > MAX_PIN_ATTEMPT_CYCLES) {
+ cycleCount = 0; // suppress warning below
+ throw new HyracksDataException("Unable to find free page in buffer cache after "
+ + MAX_PIN_ATTEMPT_CYCLES + " cycles (buffer cache undersized?)");
+ }
+ }
+ } finally {
+ if (cycleCount > PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD && LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Took " + cycleCount + " cycles to find free page in buffer cache. (buffer cache " +
+ "undersized?)");
+ }
+ }
}
@Override
@@ -1329,15 +1304,15 @@
@Override
public void returnPage(ICachedPage page, boolean reinsert) {
CachedPage cPage = (CachedPage) page;
- CacheBucket bucket = null;
- if(!page.confiscated()){
+ CacheBucket bucket;
+ if (!page.confiscated()){
return;
}
if (reinsert) {
int hash = hash(cPage.dpid);
bucket = pageMap[hash];
bucket.bucketLock.lock();
- if(DEBUG) {
+ if (DEBUG) {
confiscateLock.lock();
}
try {
@@ -1346,7 +1321,7 @@
cPage.next = bucket.cachedPage;
bucket.cachedPage = cPage;
cPage.pinCount.decrementAndGet();
- if(DEBUG){
+ if (DEBUG){
assert cPage.pinCount.get() == 0 ;
assert cPage.latch.getReadLockCount() == 0;
assert cPage.latch.getWriteHoldCount() == 0;
@@ -1355,22 +1330,22 @@
}
} finally {
bucket.bucketLock.unlock();
- if(DEBUG) {
+ if (DEBUG) {
confiscateLock.unlock();
}
}
} else {
cPage.invalidate();
cPage.pinCount.decrementAndGet();
- if(DEBUG){
+ if (DEBUG){
assert cPage.pinCount.get() == 0;
assert cPage.latch.getReadLockCount() == 0;
assert cPage.latch.getWriteHoldCount() == 0;
confiscateLock.lock();
- try{
+ try {
confiscatedPages.remove(cPage);
confiscatedPagesOwner.remove(cPage);
- } finally{
+ } finally {
confiscateLock.unlock();
}
}
@@ -1421,7 +1396,7 @@
public void purgeHandle(int fileId) throws HyracksDataException{
synchronized(fileInfoMap){
BufferedFileHandle fh = fileInfoMap.get(fileId);
- if(fh != null){
+ if (fh != null){
ioManager.close(fh.getFileHandle());
fileInfoMap.remove(fileId);
fileMapManager.unregisterFile(fileId);