BufferCache Concurrency Fixes

1. Fix thread-safety issues in ClockPageReplacementStrategy.findVictimByEviction
2. Fix race-condition between page evicition & file deletion

Change-Id: I01b4ab3000ae6f481f226c0df9fe876c6b16c7aa
Reviewed-on: https://asterix-gerrit.ics.uci.edu/835
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 27d0423..6020d7bfb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -283,6 +283,15 @@
                      */
                     bucket.bucketLock.lock();
                     try {
+                        if (!victim.pinCount.compareAndSet(0, 1)) {
+                            continue;
+                        }
+                        // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement
+                        // pin count and try again
+                        if (victim.dpid >= 0) {
+                            victim.pinCount.decrementAndGet();
+                            continue;
+                        }
                         if (DEBUG) {
                             confiscateLock.lock();
                             try{
@@ -324,7 +333,12 @@
                      */
                     bucket.bucketLock.lock();
                     try {
-                        if (victim.pinCount.get() != 1) {
+                        if (!victim.pinCount.compareAndSet(0, 1)) {
+                            continue;
+                        }
+                        // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement
+                        // pin count and try again
+                        if (victimHash != hash(victim.dpid)) {
                             victim.pinCount.decrementAndGet();
                             continue;
                         }
@@ -371,7 +385,12 @@
                         victimBucket.bucketLock.lock();
                     }
                     try {
-                        if (victim.pinCount.get() != 1) {
+                        if (!victim.pinCount.compareAndSet(0, 1)) {
+                            continue;
+                        }
+                        // now that we have the pin, ensure the victim's bucket hasn't changed, if it has, decrement
+                        // pin count and try again
+                        if (victimHash != hash(victim.dpid)) {
                             victim.pinCount.decrementAndGet();
                             continue;
                         }
@@ -992,7 +1011,12 @@
                 // find a page that would possibly be evicted anyway
                 // Case 1 from findPage()
                 if (victim.dpid < 0) { // new page
-                    if (victim.pinCount.get() != 1) {
+                    if (!victim.pinCount.compareAndSet(0, 1)) {
+                        continue;
+                    }
+                    // now that we have the pin, ensure the victim's dpid still is < 0, if it's not, decrement
+                    // pin count and try again
+                    if (victim.dpid >= 0) {
                         victim.pinCount.decrementAndGet();
                         continue;
                     }
@@ -1013,8 +1037,7 @@
                         while (curr != null) {
                             if (curr == victim) { // we found where the victim
                                                   // resides in the hash table
-                                if (victim.pinCount.get() != 1) {
-                                    victim.pinCount.decrementAndGet();
+                                if (!victim.pinCount.compareAndSet(0, 1)) {
                                     break;
                                 }
                                 // if this is the first page in the bucket
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index 305b577..cda81ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -91,11 +91,11 @@
     }
 
     @Override
-    public boolean pinIfGoodVictim() {
+    public boolean isGoodVictim() {
         if (confiscated.get())
-            return false; //i am not a good victim because i cant flush!
+            return false; // i am not a good victim because i cant flush!
         else {
-            return pinCount.compareAndSet(0, 1);
+            return pinCount.get() == 0;
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java
index 6a82b97..5c89bb6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java
@@ -63,7 +63,7 @@
 
     @Override
     public ICachedPageInternal findVictim() {
-        ICachedPageInternal cachedPage = null;
+        ICachedPageInternal cachedPage;
         if (numPages.get() >= maxAllowedNumPages) {
             cachedPage = findVictimByEviction();
         } else {
@@ -75,31 +75,40 @@
     private ICachedPageInternal findVictimByEviction() {
         //check if we're starved from confiscation
         assert (maxAllowedNumPages > 0);
-        int startClockPtr = clockPtr.get();
+        int clockPtr = advanceClock();
+        int startClockPtr = clockPtr;
+        int lastClockPtr = -1;
         int cycleCount = 0;
-        do {
-            ICachedPageInternal cPage = bufferCache.getPage(clockPtr.get());
+        boolean looped = false;
+        while (true) {
+            ICachedPageInternal cPage = bufferCache.getPage(clockPtr);
 
             /*
              * We do two things here:
              * 1. If the page has been accessed, then we skip it -- The CAS would return
              * false if the current value is false which makes the page a possible candidate
              * for replacement.
-             * 2. We check with the buffer manager if it feels its a good idea to use this
+             * 2. We check with the buffer manager if it feels it's a good idea to use this
              * page as a victim.
              */
             AtomicBoolean accessedFlag = getPerPageObject(cPage);
             if (!accessedFlag.compareAndSet(true, false)) {
-                if (cPage.pinIfGoodVictim()) {
-                        return cPage;
+                if (cPage.isGoodVictim()) {
+                    return cPage;
                 }
             }
-            advanceClock();
-            if (clockPtr.get() == startClockPtr) {
-                ++cycleCount;
+            if (clockPtr < lastClockPtr) {
+                looped = true;
             }
-        } while (cycleCount < MAX_UNSUCCESSFUL_CYCLE_COUNT);
-        return null;
+            if (looped && clockPtr >= startClockPtr) {
+                if (++cycleCount >= MAX_UNSUCCESSFUL_CYCLE_COUNT) {
+                    return null;
+                }
+                looped = false;
+            }
+            lastClockPtr = clockPtr;
+            clockPtr = advanceClock();
+        }
     }
 
     @Override
@@ -113,7 +122,7 @@
         numPages.incrementAndGet();
         AtomicBoolean accessedFlag = getPerPageObject(cPage);
         if (!accessedFlag.compareAndSet(true, false)) {
-            if (cPage.pinIfGoodVictim()) {
+            if (cPage.isGoodVictim()) {
                 return cPage;
             }
         }
@@ -121,17 +130,18 @@
     }
 
     //derived from RoundRobinAllocationPolicy in Apache directmemory
-    private int advanceClock(){
-        boolean clockInDial = false;
-        int newClockPtr = 0;
+    private int advanceClock() {
+
+        boolean clockInDial;
+        int currClockPtr;
         do
         {
-            int currClockPtr = clockPtr.get();
-            newClockPtr = ( currClockPtr + 1 ) % numPages.get();
+            currClockPtr = clockPtr.get();
+            int newClockPtr = ( currClockPtr + 1 ) % numPages.get();
             clockInDial = clockPtr.compareAndSet( currClockPtr, newClockPtr );
         }
         while ( !clockInDial );
-        return newClockPtr;
+        return currClockPtr;
 
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
index 497192d..6a6c2f5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
@@ -25,6 +25,6 @@
 
     public Object getReplacementStrategyObject();
 
-    public boolean pinIfGoodVictim();
+    public boolean isGoodVictim();
 
 }