Implemented counting of free pages in BufferCache to better sync the cleaner thread with threads waiting for pages to be cleaned.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1241 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index bbe5ba6..8cdf891 100644
--- a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -38,11 +38,11 @@
private static final Logger LOGGER = Logger.getLogger(BufferCache.class.getName());
private static final int MAP_FACTOR = 2;
- //private static final int MAX_VICTIMIZATION_TRY_COUNT = 3;
- private static final int MAX_VICTIMIZATION_TRY_COUNT = 1000000;
+ private static final int MAX_VICTIMIZATION_TRY_COUNT = 5;
+ private static final int MAX_WAIT_FOR_CLEANER_TRY_COUNT = 5;
private final int maxOpenFiles;
-
+
private final IIOManager ioManager;
private final int pageSize;
private final int numPages;
@@ -51,8 +51,8 @@
private final IPageReplacementStrategy pageReplacementStrategy;
private final IFileMapManager fileMapManager;
private final CleanerThread cleanerThread;
- private final Map<Integer, BufferedFileHandle> fileInfoMap;
-
+ private final Map<Integer, BufferedFileHandle> fileInfoMap;
+
private boolean closed;
public BufferCache(IIOManager ioManager, ICacheMemoryAllocator allocator,
@@ -120,7 +120,7 @@
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
- cPage.pinCount.incrementAndGet();
+ cPage.pinCount.incrementAndGet();
pageReplacementStrategy.notifyCachePageAccess(cPage);
return cPage;
}
@@ -168,12 +168,26 @@
return cPage;
}
- private CachedPage findPage(long dpid, boolean newPage) {
- int victimizationTryCount = 0;
- int localCleanCount = -1;
- synchronized (cleanerThread.cleanNotification) {
- localCleanCount = cleanerThread.cleanCount;
+ private int incrementPinCount(CachedPage page) {
+ int pinCount = page.pinCount.incrementAndGet();
+ // If this was the first pin, we decrement the freepages count.
+ if (pinCount == 1) {
+ cleanerThread.freePages.decrementAndGet();
+ }
+ return pinCount;
+ }
+
+ private int decrementPinCount(CachedPage page) {
+ int pinCount = ((CachedPage) page).pinCount.decrementAndGet();
+ // If this was the last unpin, we increment the freepages count.
+ if (pinCount == 0) {
+ cleanerThread.freePages.incrementAndGet();
}
+ return pinCount;
+ }
+
+ private CachedPage findPage(long dpid, boolean newPage) {
+ int victimizationTryCount = 0;
while (true) {
CachedPage cPage = null;
/*
@@ -186,7 +200,7 @@
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
- cPage.pinCount.incrementAndGet();
+ cPage.pinCount.incrementAndGet();
return cPage;
}
cPage = cPage.next;
@@ -228,8 +242,8 @@
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
- cPage.pinCount.incrementAndGet();
- victim.pinCount.decrementAndGet();
+ cPage.pinCount.incrementAndGet();
+ victim.pinCount.decrementAndGet();
return cPage;
}
cPage = cPage.next;
@@ -250,14 +264,14 @@
bucket.bucketLock.lock();
try {
if (victim.pinCount.get() != 1) {
- victim.pinCount.decrementAndGet();
+ victim.pinCount.decrementAndGet();
continue;
}
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
- cPage.pinCount.incrementAndGet();
- victim.pinCount.decrementAndGet();
+ cPage.pinCount.incrementAndGet();
+ victim.pinCount.decrementAndGet();
return cPage;
}
cPage = cPage.next;
@@ -281,14 +295,14 @@
}
try {
if (victim.pinCount.get() != 1) {
- victim.pinCount.decrementAndGet();
+ victim.pinCount.decrementAndGet();
continue;
}
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
- cPage.pinCount.incrementAndGet();
- victim.pinCount.decrementAndGet();
+ cPage.pinCount.incrementAndGet();
+ victim.pinCount.decrementAndGet();
return cPage;
}
cPage = cPage.next;
@@ -322,14 +336,17 @@
synchronized (cleanerThread) {
cleanerThread.notifyAll();
}
+ int waitTryCount = 0;
synchronized (cleanerThread.cleanNotification) {
- if (cleanerThread.cleanCount == localCleanCount) {
- try {
- cleanerThread.cleanNotification.wait(1000);
- } catch (InterruptedException e) {
- // Do nothing
- }
- }
+ // Sleep until a page becomes available.
+ do {
+ try {
+ cleanerThread.cleanNotification.wait(100);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ waitTryCount++;
+ } while (cleanerThread.freePages.get() == 0 && waitTryCount < MAX_WAIT_FOR_CLEANER_TRY_COUNT);
}
}
}
@@ -400,7 +417,7 @@
if (closed) {
throw new HyracksDataException("unpin called on a closed cache");
}
- ((CachedPage) page).pinCount.decrementAndGet();
+ decrementPinCount((CachedPage) page);
}
private int hash(long dpid) {
@@ -461,7 +478,11 @@
@Override
public boolean pinIfGoodVictim() {
- return pinCount.compareAndSet(0, 1);
+ if (pinCount.compareAndSet(0, 1)) {
+ cleanerThread.freePages.decrementAndGet();
+ return true;
+ }
+ return false;
}
@Override
@@ -508,10 +529,11 @@
private boolean shutdownStart = false;
private boolean shutdownComplete = false;
private final Object cleanNotification = new Object();
- private int cleanCount = 0;
+ private AtomicInteger freePages = new AtomicInteger();
public CleanerThread() {
setPriority(MAX_PRIORITY);
+ freePages.set(numPages);
}
public void cleanPage(CachedPage cPage, boolean force) {
@@ -537,10 +559,9 @@
}
if (cleaned) {
cPage.dirty.set(false);
- cPage.pinCount.decrementAndGet();
+ decrementPinCount(cPage);
synchronized (cleanNotification) {
- ++cleanCount;
- cleanNotification.notify();
+ cleanNotification.notifyAll();
}
}
} finally {
@@ -708,8 +729,8 @@
if (flushDirtyPages) {
write(cPage);
}
- cPage.dirty.set(false);
- pinCount = cPage.pinCount.decrementAndGet();
+ cPage.dirty.set(false);
+ pinCount = decrementPinCount(cPage);
} else {
pinCount = cPage.pinCount.get();
}