Implemented counting of free pages in BufferCache to better sync the cleaner thread with threads waiting for pages to be cleaned.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1241 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index bbe5ba6..8cdf891 100644
--- a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -38,11 +38,11 @@
     private static final Logger LOGGER = Logger.getLogger(BufferCache.class.getName());
     private static final int MAP_FACTOR = 2;
 
-    //private static final int MAX_VICTIMIZATION_TRY_COUNT = 3;
-    private static final int MAX_VICTIMIZATION_TRY_COUNT = 1000000;
+    private static final int MAX_VICTIMIZATION_TRY_COUNT = 5;
+    private static final int MAX_WAIT_FOR_CLEANER_TRY_COUNT = 5;
 
     private final int maxOpenFiles;
-
+    
     private final IIOManager ioManager;
     private final int pageSize;
     private final int numPages;
@@ -51,8 +51,8 @@
     private final IPageReplacementStrategy pageReplacementStrategy;
     private final IFileMapManager fileMapManager;
     private final CleanerThread cleanerThread;
-    private final Map<Integer, BufferedFileHandle> fileInfoMap;
-
+    private final Map<Integer, BufferedFileHandle> fileInfoMap;    
+    
     private boolean closed;
 
     public BufferCache(IIOManager ioManager, ICacheMemoryAllocator allocator,
@@ -120,7 +120,7 @@
             cPage = bucket.cachedPage;
             while (cPage != null) {
                 if (cPage.dpid == dpid) {
-                    cPage.pinCount.incrementAndGet();
+                	cPage.pinCount.incrementAndGet();
                     pageReplacementStrategy.notifyCachePageAccess(cPage);
                     return cPage;
                 }
@@ -168,12 +168,26 @@
         return cPage;
     }
 
-    private CachedPage findPage(long dpid, boolean newPage) {
-        int victimizationTryCount = 0;
-        int localCleanCount = -1;
-        synchronized (cleanerThread.cleanNotification) {
-            localCleanCount = cleanerThread.cleanCount;
+    private int incrementPinCount(CachedPage page) {
+    	int pinCount = page.pinCount.incrementAndGet();
+    	// If this was the first pin, we decrement the freepages count.    	
+    	if (pinCount == 1) {
+    		cleanerThread.freePages.decrementAndGet();
+    	}
+    	return pinCount;
+    }
+    
+    private int decrementPinCount(CachedPage page) {
+    	int pinCount = ((CachedPage) page).pinCount.decrementAndGet();
+        // If this was the last unpin, we increment the freepages count.
+        if (pinCount == 0) {        	
+        	cleanerThread.freePages.incrementAndGet();
         }
+        return pinCount;
+    }
+    
+    private CachedPage findPage(long dpid, boolean newPage) {
+        int victimizationTryCount = 0;        
         while (true) {
             CachedPage cPage = null;
             /*
@@ -186,7 +200,7 @@
                 cPage = bucket.cachedPage;
                 while (cPage != null) {
                     if (cPage.dpid == dpid) {
-                        cPage.pinCount.incrementAndGet();
+                    	cPage.pinCount.incrementAndGet();
                         return cPage;
                     }
                     cPage = cPage.next;
@@ -228,8 +242,8 @@
                         cPage = bucket.cachedPage;
                         while (cPage != null) {
                             if (cPage.dpid == dpid) {
-                                cPage.pinCount.incrementAndGet();
-                                victim.pinCount.decrementAndGet();
+                            	cPage.pinCount.incrementAndGet();
+                            	victim.pinCount.decrementAndGet();
                                 return cPage;
                             }
                             cPage = cPage.next;
@@ -250,14 +264,14 @@
                     bucket.bucketLock.lock();
                     try {
                         if (victim.pinCount.get() != 1) {
-                            victim.pinCount.decrementAndGet();
+                        	victim.pinCount.decrementAndGet();
                             continue;
                         }
                         cPage = bucket.cachedPage;
                         while (cPage != null) {
                             if (cPage.dpid == dpid) {
-                                cPage.pinCount.incrementAndGet();
-                                victim.pinCount.decrementAndGet();
+                            	cPage.pinCount.incrementAndGet();
+                            	victim.pinCount.decrementAndGet();
                                 return cPage;
                             }
                             cPage = cPage.next;
@@ -281,14 +295,14 @@
                     }
                     try {
                         if (victim.pinCount.get() != 1) {
-                            victim.pinCount.decrementAndGet();
+                        	victim.pinCount.decrementAndGet();                        	
                             continue;
                         }
                         cPage = bucket.cachedPage;
                         while (cPage != null) {
                             if (cPage.dpid == dpid) {
-                                cPage.pinCount.incrementAndGet();
-                                victim.pinCount.decrementAndGet();
+                            	cPage.pinCount.incrementAndGet();
+                            	victim.pinCount.decrementAndGet();
                                 return cPage;
                             }
                             cPage = cPage.next;
@@ -322,14 +336,17 @@
             synchronized (cleanerThread) {
                 cleanerThread.notifyAll();
             }
+            int waitTryCount = 0;
             synchronized (cleanerThread.cleanNotification) {
-                if (cleanerThread.cleanCount == localCleanCount) {
-                    try {
-                        cleanerThread.cleanNotification.wait(1000);
-                    } catch (InterruptedException e) {
-                        // Do nothing
-                    }
-                }
+            	// Sleep until a page becomes available.
+            	do {            		
+            		try {
+            			cleanerThread.cleanNotification.wait(100);
+            		} catch (InterruptedException e) {
+            			// Do nothing
+            		}
+            		waitTryCount++;
+            	} while (cleanerThread.freePages.get() == 0 && waitTryCount < MAX_WAIT_FOR_CLEANER_TRY_COUNT);
             }
         }
     }
@@ -400,7 +417,7 @@
         if (closed) {
             throw new HyracksDataException("unpin called on a closed cache");
         }
-        ((CachedPage) page).pinCount.decrementAndGet();
+        decrementPinCount((CachedPage) page);
     }
 
     private int hash(long dpid) {
@@ -461,7 +478,11 @@
 
         @Override
         public boolean pinIfGoodVictim() {
-            return pinCount.compareAndSet(0, 1);
+            if (pinCount.compareAndSet(0, 1)) {
+            	cleanerThread.freePages.decrementAndGet();
+            	return true;
+            }
+            return false;
         }
 
         @Override
@@ -508,10 +529,11 @@
         private boolean shutdownStart = false;
         private boolean shutdownComplete = false;
         private final Object cleanNotification = new Object();
-        private int cleanCount = 0;
+        private AtomicInteger freePages = new AtomicInteger();
 
         public CleanerThread() {
             setPriority(MAX_PRIORITY);
+            freePages.set(numPages);
         }
 
         public void cleanPage(CachedPage cPage, boolean force) {
@@ -537,10 +559,9 @@
                         }
                         if (cleaned) {                           	                        	
                         	cPage.dirty.set(false);
-                        	cPage.pinCount.decrementAndGet();
+                        	decrementPinCount(cPage);
                         	synchronized (cleanNotification) {
-                        		++cleanCount;
-                        		cleanNotification.notify();
+                        		cleanNotification.notifyAll();
                         	}
                         }
                     } finally {
@@ -708,8 +729,8 @@
                 if (flushDirtyPages) {
                     write(cPage);
                 }
-                cPage.dirty.set(false);
-                pinCount = cPage.pinCount.decrementAndGet();
+                cPage.dirty.set(false);                
+                pinCount = decrementPinCount(cPage);
             } else {
                 pinCount = cPage.pinCount.get();
             }