ASTERIXDB-1438: BufferCache spins indefinitely...
Fixes ASTERIXDB-1438: BufferCache spins indefinitely when cache is exceeded
After three unsuccessful cycles (each traversing clock pages three times)
and waiting for dirty pages to be cleaned, an exception is thrown
Change-Id: I327a7423bd630c96e16601b1a3a2a21f558f9f50
Reviewed-on: https://asterix-gerrit.ics.uci.edu/894
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java
index 1bbe6db..114dcb8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java
@@ -125,6 +125,7 @@
}
}
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
// TODO what do we do here?
e.printStackTrace();
}
@@ -140,6 +141,7 @@
try {
entry = queue.take();
} catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
break;
}
if (entry.getQueueInfo() != null && entry.getQueueInfo().hasWaiters()){
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 2b6dd73..201526b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -54,6 +54,7 @@
private static final int MIN_CLEANED_COUNT_DIFF = 3;
private static final int PIN_MAX_WAIT_TIME = 50;
+ private static final int MAX_PIN_ATTEMPT_CYCLES = 3;
public static final boolean DEBUG = false;
private final int pageSize;
@@ -74,7 +75,7 @@
private ConcurrentHashMap<CachedPage, StackTraceElement[]> pinnedPageOwner;
//!DEBUG
private IIOReplicationManager ioReplicationManager;
- private List<ICachedPageInternal> cachedPages = new ArrayList<>();
+ private final List<ICachedPageInternal> cachedPages = new ArrayList<>();
private boolean closed;
@@ -94,16 +95,16 @@
this.fileMapManager = fileMapManager;
Executor executor = Executors.newCachedThreadPool(threadFactory);
- fileInfoMap = new HashMap<Integer, BufferedFileHandle>();
- virtualFiles = new HashSet<Integer>();
+ fileInfoMap = new HashMap<>();
+ virtualFiles = new HashSet<>();
cleanerThread = new CleanerThread();
executor.execute(cleanerThread);
closed = false;
fifoWriter = new AsyncFIFOPageQueueManager(this);
if( DEBUG ) {
- confiscatedPages = new ArrayList<CachedPage>();
- confiscatedPagesOwner = new HashMap<CachedPage, StackTraceElement[]>();
+ confiscatedPages = new ArrayList<>();
+ confiscatedPagesOwner = new HashMap<>();
confiscateLock = new ReentrantLock();
pinnedPageOwner = new ConcurrentHashMap<>();
}
@@ -135,7 +136,7 @@
// check whether file has been created and opened
int fileId = BufferedFileHandle.getFileId(dpid);
- BufferedFileHandle fInfo = null;
+ BufferedFileHandle fInfo;
synchronized (fileInfoMap) {
fInfo = fileInfoMap.get(fileId);
}
@@ -185,7 +186,7 @@
if (DEBUG) {
pinSanityCheck(dpid);
}
- CachedPage cPage = findPage(dpid, false);
+ CachedPage cPage = findPage(dpid);
if (!newPage) {
if (DEBUG) {
confiscateLock.lock();
@@ -220,13 +221,9 @@
return cPage;
}
- private boolean isVirtual(long vpid) throws HyracksDataException {
- CachedPage virtPage = findPage(vpid, true);
- return virtPage.confiscated.get();
- }
+ private CachedPage findPage(long dpid) throws HyracksDataException {
- private CachedPage findPage(long dpid, boolean virtual) throws HyracksDataException {
- while (true) {
+ for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) {
int startCleanedCount = cleanerThread.cleanedCount;
CachedPage cPage = null;
@@ -428,11 +425,11 @@
victimBucket.cachedPage = victim.next;
} else {
CachedPage victimPrev = victimBucket.cachedPage;
- while (victimPrev != null && victimPrev.next != victim) {
+ while (victimPrev.next != victim) {
victimPrev = victimPrev.next;
- }
- if(DEBUG) {
- assert victimPrev != null;
+ if (victimPrev == null) {
+ throw new IllegalStateException();
+ }
}
victimPrev.next = victim.next;
}
@@ -449,8 +446,13 @@
return victim;
}
}
- synchronized (cleanerThread) {
- pageCleanerPolicy.notifyVictimNotFound(cleanerThread);
+ synchronized (cleanerThread.threadLock) {
+ try {
+ pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock);
+ } catch (InterruptedException e) {
+ // Re-interrupt the thread so this gets handled later
+ Thread.currentThread().interrupt();
+ }
}
// Heuristic optimization. Check whether the cleaner thread has
// cleaned pages since we did our last pin attempt.
@@ -461,12 +463,18 @@
}
synchronized (cleanerThread.cleanNotification) {
try {
- cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
+ // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify
+ do {
+ cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
+ } while (false);
} catch (InterruptedException e) {
- // Do nothing
+ // Re-interrupt the thread so this gets handled later
+ Thread.currentThread().interrupt();
}
}
}
+ throw new HyracksDataException("Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES +
+ " cycles (buffer cache undersized?)");
}
private String dumpState() {
@@ -651,21 +659,15 @@
}
}
- private class CleanerThread extends Thread {
- private boolean shutdownStart = false;
- private boolean shutdownComplete = false;
+ private class CleanerThread implements Runnable {
+ private volatile boolean shutdownStart = false;
+ private volatile boolean shutdownComplete = false;
+ private final Object threadLock = new Object();
private final Object cleanNotification = new Object();
// Simply keeps incrementing this counter when a page is cleaned.
// Used to implement wait-for-cleanerthread heuristic optimizations.
// A waiter can detect whether pages have been cleaned.
- // No need to make this var volatile or synchronize it's access in any
- // way because it is used for heuristics.
- private int cleanedCount = 0;
-
- public CleanerThread() {
- setPriority(Thread.NORM_PRIORITY);
- setDaemon(true);
- }
+ private volatile int cleanedCount = 0;
public void cleanPage(CachedPage cPage, boolean force) {
if (cPage.dirty.get() && !cPage.confiscated.get()) {
@@ -678,24 +680,7 @@
}
if (proceed) {
try {
- // Make sure page is still dirty.
- if (!cPage.dirty.get()) {
- return;
- }
- boolean cleaned = true;
- try {
- write(cPage);
- } catch (HyracksDataException e) {
- cleaned = false;
- }
- if (cleaned) {
- cPage.dirty.set(false);
- cPage.pinCount.decrementAndGet();
- cleanedCount++;
- synchronized (cleanNotification) {
- cleanNotification.notifyAll();
- }
- }
+ cleanPageLocked(cPage);
} finally {
if (force) {
cPage.latch.writeLock().unlock();
@@ -710,34 +695,63 @@
}
}
- @Override
- public synchronized void run() {
+ private void cleanPageLocked(CachedPage cPage) {
+ // Make sure page is still dirty.
+ if (!cPage.dirty.get()) {
+ return;
+ }
+ boolean cleaned = true;
try {
- while (true) {
- pageCleanerPolicy.notifyCleanCycleStart(this);
- int curPage = 0;
- while (true) {
- synchronized (cachedPages) {
- if (curPage >= cachedPages.size()) {
- break;
- }
- CachedPage cPage = (CachedPage) cachedPages.get(curPage);
- if (cPage != null) {
- cleanPage(cPage, false);
- }
- }
- curPage++;
+ write(cPage);
+ } catch (HyracksDataException e) {
+ LOGGER.log(Level.WARNING, "Unable to write dirty page", e);
+ cleaned = false;
+ }
+ if (cleaned) {
+ cPage.dirty.set(false);
+ cPage.pinCount.decrementAndGet();
+ // this increment of a volatile is OK as there is only one writer
+ cleanedCount++;
+ synchronized (cleanNotification) {
+ cleanNotification.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ synchronized (threadLock) {
+ try {
+ while (!shutdownStart) {
+ runCleanCycle();
}
- if (shutdownStart) {
+ } catch (InterruptedException e) {
+
+ Thread.currentThread().interrupt();
+ } finally {
+ shutdownComplete = true;
+ threadLock.notifyAll();
+ }
+ }
+ }
+
+ private void runCleanCycle() throws InterruptedException {
+ pageCleanerPolicy.notifyCleanCycleStart(threadLock);
+ int curPage = 0;
+ while (true) {
+ synchronized (cachedPages) {
+ if (curPage >= cachedPages.size()) {
break;
}
- pageCleanerPolicy.notifyCleanCycleFinish(this);
+ CachedPage cPage = (CachedPage) cachedPages.get(curPage);
+ if (cPage != null) {
+ cleanPage(cPage, false);
+ }
}
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- shutdownComplete = true;
- notifyAll();
+ curPage++;
+ }
+ if (!shutdownStart) {
+ pageCleanerPolicy.notifyCleanCycleFinish(threadLock);
}
}
}
@@ -746,29 +760,31 @@
public void close() {
closed = true;
fifoWriter.destroyQueue();
- synchronized (cleanerThread) {
- cleanerThread.shutdownStart = true;
- cleanerThread.notifyAll();
- while (!cleanerThread.shutdownComplete) {
- try {
- cleanerThread.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
+ try {
+ synchronized (cleanerThread.threadLock) {
+ cleanerThread.shutdownStart = true;
+ cleanerThread.threadLock.notifyAll();
+ while (!cleanerThread.shutdownComplete) {
+ cleanerThread.threadLock.wait();
}
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
synchronized (fileInfoMap) {
- try {
- for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
+ for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
+ try {
boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
sweepAndFlush(entry.getKey(), !fileHasBeenDeleted);
if (!fileHasBeenDeleted) {
ioManager.close(entry.getValue().getFileHandle());
}
+ } catch (HyracksDataException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Error flushing file id: " + entry.getKey(), e);
+ }
}
- } catch (HyracksDataException e) {
- e.printStackTrace();
}
fileInfoMap.clear();
}
@@ -848,8 +864,7 @@
}
private void sweepAndFlush(int fileId, boolean flushDirtyPages) throws HyracksDataException {
- for (int i = 0; i < pageMap.length; ++i) {
- final CacheBucket bucket = pageMap[i];
+ for (final CacheBucket bucket : pageMap) {
bucket.bucketLock.lock();
try {
CachedPage prev = bucket.cachedPage;
@@ -882,7 +897,7 @@
private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage, boolean flushDirtyPages)
throws HyracksDataException {
if (BufferedFileHandle.getFileId(cPage.dpid) == fileId) {
- int pinCount = -1;
+ int pinCount;
if (cPage.dirty.get()) {
if (flushDirtyPages) {
write(cPage);
@@ -935,7 +950,7 @@
@Override
public void force(int fileId, boolean metadata) throws HyracksDataException {
- BufferedFileHandle fInfo = null;
+ BufferedFileHandle fInfo;
synchronized (fileInfoMap) {
fInfo = fileInfoMap.get(fileId);
}
@@ -1016,10 +1031,7 @@
synchronized (cachedPages) {
final int cpid = page.getCachedPageId();
if (cpid < cachedPages.size()) {
- ICachedPageInternal old = cachedPages.set(cpid, page);
- if (DEBUG) {
- assert old == null;
- }
+ cachedPages.set(cpid, page);
} else {
if (cpid > cachedPages.size()) {
// 4 > 1 -> [exiting, null, null, null, new]
@@ -1135,7 +1147,7 @@
@Override
public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
- return confiscatePage(dpid, () -> pageReplacementStrategy.findVictim());
+ return confiscatePage(dpid, pageReplacementStrategy::findVictim);
}
@Override
@@ -1145,7 +1157,7 @@
private ICachedPage confiscatePage(long dpid, Supplier<ICachedPageInternal> victimSupplier)
throws HyracksDataException {
- while (true) {
+ for (int i = 0; i < MAX_PIN_ATTEMPT_CYCLES; i++) {
int startCleanedCount = cleanerThread.cleanedCount;
ICachedPage returnPage = null;
CachedPage victim = (CachedPage) victimSupplier.get();
@@ -1238,8 +1250,13 @@
return returnPage;
}
// no page available to confiscate. try kicking the cleaner thread.
- synchronized (cleanerThread) {
- pageCleanerPolicy.notifyVictimNotFound(cleanerThread);
+ synchronized (cleanerThread.threadLock) {
+ try {
+ pageCleanerPolicy.notifyVictimNotFound(cleanerThread.threadLock);
+ } catch (InterruptedException e) {
+ // Re-interrupt the thread so this gets handled later
+ Thread.currentThread().interrupt();
+ }
}
// Heuristic optimization. Check whether the cleaner thread has
// cleaned pages since we did our last pin attempt.
@@ -1250,12 +1267,18 @@
}
synchronized (cleanerThread.cleanNotification) {
try {
- cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
+ // it's OK to not loop on this wait, as we do not rely on any condition to be true on notify
+ do {
+ cleanerThread.cleanNotification.wait(PIN_MAX_WAIT_TIME);
+ } while (false);
} catch (InterruptedException e) {
- // Do nothing
+ // Re-interrupt the thread so this gets handled later
+ Thread.currentThread().interrupt();
}
}
}
+ throw new HyracksDataException("Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES +
+ " cycles (buffer cache undersized?)");
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java
index 276e26a..ceb9c36 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java
@@ -18,8 +18,6 @@
*/
package org.apache.hyracks.storage.common.buffercache;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
public class DelayPageCleanerPolicy implements IPageCleanerPolicy {
private long delay;
@@ -28,21 +26,16 @@
}
@Override
- public void notifyCleanCycleStart(Object monitor) throws HyracksDataException {
-
+ public void notifyCleanCycleStart(Object monitor) throws InterruptedException {
}
@Override
- public void notifyCleanCycleFinish(Object monitor) throws HyracksDataException {
- try {
- monitor.wait(delay);
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
+ public void notifyCleanCycleFinish(Object monitor) throws InterruptedException {
+ monitor.wait(delay);
}
@Override
- public void notifyVictimNotFound(Object monitor) throws HyracksDataException {
+ public void notifyVictimNotFound(Object monitor) {
monitor.notifyAll();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageCleanerPolicy.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageCleanerPolicy.java
index 897ae75..e62547c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageCleanerPolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageCleanerPolicy.java
@@ -18,8 +18,6 @@
*/
package org.apache.hyracks.storage.common.buffercache;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
/**
* Allows customization of the page cleaning strategy by the cleaner thread.
*
@@ -31,25 +29,22 @@
*
* @param monitor
* - The monitor on which a mutex is held while in this call
- * @throws HyracksDataException
*/
- public void notifyCleanCycleStart(Object monitor) throws HyracksDataException;
+ void notifyCleanCycleStart(Object monitor) throws InterruptedException;
/**
* Callback from the cleaner just after the finish of a cleaning cycle.
*
* @param monitor
* - The monitor on which a mutex is held while in this call.
- * @throws HyracksDataException
*/
- public void notifyCleanCycleFinish(Object monitor) throws HyracksDataException;
+ void notifyCleanCycleFinish(Object monitor) throws InterruptedException;
/**
* Callback to indicate that no victim was found.
*
* @param monitor
* - The monitor on which a mutex is held while in this call.
- * @throws HyracksDataException
*/
- public void notifyVictimNotFound(Object monitor) throws HyracksDataException;
+ void notifyVictimNotFound(Object monitor) throws InterruptedException;
}