Merged hyracks_dev_next -r 1287 into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@1288 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-storage-common/.classpath b/hyracks/hyracks-storage-common/.classpath
deleted file mode 100644
index 1f3c1ff..0000000
--- a/hyracks/hyracks-storage-common/.classpath
+++ /dev/null
@@ -1,7 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<classpath>
-	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
-	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
-	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
-	<classpathentry kind="output" path="target/classes"/>
-</classpath>
diff --git a/hyracks/hyracks-storage-common/.project b/hyracks/hyracks-storage-common/.project
deleted file mode 100644
index d990298..0000000
--- a/hyracks/hyracks-storage-common/.project
+++ /dev/null
@@ -1,23 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<projectDescription>
-	<name>hyracks-storage-common</name>
-	<comment></comment>
-	<projects>
-	</projects>
-	<buildSpec>
-		<buildCommand>
-			<name>org.eclipse.jdt.core.javabuilder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-		<buildCommand>
-			<name>org.maven.ide.eclipse.maven2Builder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-	</buildSpec>
-	<natures>
-		<nature>org.maven.ide.eclipse.maven2Nature</nature>
-		<nature>org.eclipse.jdt.core.javanature</nature>
-	</natures>
-</projectDescription>
diff --git a/hyracks/hyracks-storage-common/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-storage-common/.settings/org.eclipse.jdt.core.prefs
deleted file mode 100644
index 450f5c4..0000000
--- a/hyracks/hyracks-storage-common/.settings/org.eclipse.jdt.core.prefs
+++ /dev/null
@@ -1,6 +0,0 @@
-#Fri May 20 19:34:04 PDT 2011
-eclipse.preferences.version=1
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
-org.eclipse.jdt.core.compiler.compliance=1.6
-org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
-org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks/hyracks-storage-common/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-storage-common/.settings/org.maven.ide.eclipse.prefs
deleted file mode 100644
index a91dbc3..0000000
--- a/hyracks/hyracks-storage-common/.settings/org.maven.ide.eclipse.prefs
+++ /dev/null
@@ -1,9 +0,0 @@
-#Tue Aug 24 14:59:44 PDT 2010
-activeProfiles=
-eclipse.preferences.version=1
-fullBuildGoals=process-test-resources
-includeModules=false
-resolveWorkspaceProjects=true
-resourceFilterGoals=process-resources resources\:testResources
-skipCompilerPlugin=true
-version=1
diff --git a/hyracks/hyracks-storage-common/pom.xml b/hyracks/hyracks-storage-common/pom.xml
index adaceab..50eecd2 100644
--- a/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks/hyracks-storage-common/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-storage-common</artifactId>
-  <version>0.1.9-SNAPSHOT</version>
+  <version>0.2.0-SNAPSHOT</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.9-SNAPSHOT</version>
+    <version>0.2.0-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -27,7 +27,7 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-api</artifactId>
-  		<version>0.1.9-SNAPSHOT</version>
+  		<version>0.2.0-SNAPSHOT</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/IStorageManagerInterface.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/IStorageManagerInterface.java
index f64af2e..562305e 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/IStorageManagerInterface.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/IStorageManagerInterface.java
@@ -16,12 +16,12 @@
 
 import java.io.Serializable;
 
-import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
 public interface IStorageManagerInterface extends Serializable {
-    public IBufferCache getBufferCache(IHyracksStageletContext ctx);
+    public IBufferCache getBufferCache(IHyracksTaskContext ctx);
 
-    public IFileMapProvider getFileMapProvider(IHyracksStageletContext ctx);
+    public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index f9ac4d5..a6f1a4b 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.storage.common.buffercache;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
@@ -37,7 +38,9 @@
     private static final Logger LOGGER = Logger.getLogger(BufferCache.class.getName());
     private static final int MAP_FACTOR = 2;
 
-    private static final int MAX_VICTIMIZATION_TRY_COUNT = 3;
+    private static final int MAX_VICTIMIZATION_TRY_COUNT = 5;
+    private static final int MAX_WAIT_FOR_CLEANER_THREAD_TIME = 1000;
+    private static final int MIN_CLEANED_COUNT_DIFF = 4;
 
     private final int maxOpenFiles;
     
@@ -49,12 +52,13 @@
     private final IPageReplacementStrategy pageReplacementStrategy;
     private final IFileMapManager fileMapManager;
     private final CleanerThread cleanerThread;
-    private final Map<Integer, BufferedFileHandle> fileInfoMap;
-
+    private final Map<Integer, BufferedFileHandle> fileInfoMap;    
+    
     private boolean closed;
 
     public BufferCache(IIOManager ioManager, ICacheMemoryAllocator allocator,
-            IPageReplacementStrategy pageReplacementStrategy, IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles) {
+            IPageReplacementStrategy pageReplacementStrategy, IFileMapManager fileMapManager, int pageSize,
+            int numPages, int maxOpenFiles) {
         this.ioManager = ioManager;
         this.pageSize = pageSize;
         this.numPages = numPages;
@@ -91,21 +95,24 @@
         if (closed) {
             throw new HyracksDataException("pin called on a closed cache");
         }
-        
+
         // check whether file has been created and opened
-        int fileId = BufferedFileHandle.getFileId(dpid);
-        BufferedFileHandle fInfo = fileInfoMap.get(fileId);
-        if(fInfo == null) {
+        int fileId = BufferedFileHandle.getFileId(dpid);        
+        BufferedFileHandle fInfo = null;
+        synchronized(fileInfoMap) {
+        	fInfo = fileInfoMap.get(fileId);
+        }
+        if (fInfo == null) {
             throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been created.");
-        } else if(fInfo.getReferenceCount() <= 0) {
+        } else if (fInfo.getReferenceCount() <= 0) {
             throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been opened.");
         }
     }
-    
-    @Override 
-    public ICachedPage tryPin(long dpid) throws HyracksDataException {        
+
+    @Override
+    public ICachedPage tryPin(long dpid) throws HyracksDataException {
         pinSanityCheck(dpid);
-        
+
         CachedPage cPage = null;
         int hash = hash(dpid);
         CacheBucket bucket = pageMap[hash];
@@ -114,7 +121,7 @@
             cPage = bucket.cachedPage;
             while (cPage != null) {
                 if (cPage.dpid == dpid) {
-                    cPage.pinCount.incrementAndGet();
+                	cPage.pinCount.incrementAndGet();
                     pageReplacementStrategy.notifyCachePageAccess(cPage);
                     return cPage;
                 }
@@ -123,15 +130,22 @@
         } finally {
             bucket.bucketLock.unlock();
         }
-        
+
         return cPage;
     }
-    
+
     @Override
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
         pinSanityCheck(dpid);
-        
+
         CachedPage cPage = findPage(dpid, newPage);
+        if (cPage == null) {
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine(dumpState());
+            }
+            throw new HyracksDataException("Failed to pin page " + BufferedFileHandle.getFileId(dpid) + ":"
+                    + BufferedFileHandle.getPageId(dpid) + " because all pages are pinned.");
+        }
         if (!newPage) {
             if (!cPage.valid) {
                 /*
@@ -156,12 +170,10 @@
     }
 
     private CachedPage findPage(long dpid, boolean newPage) {
-        int victimizationTryCount = 0;
-        int localCleanCount = -1;
-        synchronized (cleanerThread.cleanNotification) {
-            localCleanCount = cleanerThread.cleanCount;
-        }
+        int victimizationTryCount = 0;        
         while (true) {
+        	int startCleanedCount = cleanerThread.cleanedCount;
+        	
             CachedPage cPage = null;
             /*
              * Hash dpid to get a bucket and then check if the page exists in the bucket.
@@ -173,7 +185,7 @@
                 cPage = bucket.cachedPage;
                 while (cPage != null) {
                     if (cPage.dpid == dpid) {
-                        cPage.pinCount.incrementAndGet();
+                    	cPage.pinCount.incrementAndGet();
                         return cPage;
                     }
                     cPage = cPage.next;
@@ -215,8 +227,8 @@
                         cPage = bucket.cachedPage;
                         while (cPage != null) {
                             if (cPage.dpid == dpid) {
-                                cPage.pinCount.incrementAndGet();
-                                victim.pinCount.decrementAndGet();
+                            	cPage.pinCount.incrementAndGet();
+                            	victim.pinCount.decrementAndGet();
                                 return cPage;
                             }
                             cPage = cPage.next;
@@ -237,14 +249,14 @@
                     bucket.bucketLock.lock();
                     try {
                         if (victim.pinCount.get() != 1) {
-                            victim.pinCount.decrementAndGet();
+                        	victim.pinCount.decrementAndGet();
                             continue;
                         }
                         cPage = bucket.cachedPage;
                         while (cPage != null) {
                             if (cPage.dpid == dpid) {
-                                cPage.pinCount.incrementAndGet();
-                                victim.pinCount.decrementAndGet();
+                            	cPage.pinCount.incrementAndGet();
+                            	victim.pinCount.decrementAndGet();
                                 return cPage;
                             }
                             cPage = cPage.next;
@@ -268,14 +280,14 @@
                     }
                     try {
                         if (victim.pinCount.get() != 1) {
-                            victim.pinCount.decrementAndGet();
+                        	victim.pinCount.decrementAndGet();                        	
                             continue;
                         }
                         cPage = bucket.cachedPage;
                         while (cPage != null) {
                             if (cPage.dpid == dpid) {
-                                cPage.pinCount.incrementAndGet();
-                                victim.pinCount.decrementAndGet();
+                            	cPage.pinCount.incrementAndGet();
+                            	victim.pinCount.decrementAndGet();
                                 return cPage;
                             }
                             cPage = cPage.next;
@@ -309,18 +321,56 @@
             synchronized (cleanerThread) {
                 cleanerThread.notifyAll();
             }
+			// Heuristic optimization. Check whether the cleaner thread has
+			// cleaned pages since we did our last pin attempt.
+			if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) {
+				// Don't go to sleep and wait for notification from the cleaner,
+				// just try to pin again immediately.
+				continue;
+			}
             synchronized (cleanerThread.cleanNotification) {
-                if (cleanerThread.cleanCount == localCleanCount) {
-                    try {
-                        cleanerThread.cleanNotification.wait(1000);
-                    } catch (InterruptedException e) {
-                        // Do nothing
-                    }
-                }
+            	try {
+            		cleanerThread.cleanNotification.wait(MAX_WAIT_FOR_CLEANER_THREAD_TIME);
+            	} catch (InterruptedException e) {
+            		// Do nothing
+            	}
             }
         }
     }
 
+    private String dumpState() {
+        StringBuilder buffer = new StringBuilder();
+        buffer.append("Buffer cache state\n");
+        buffer.append("Page Size: ").append(pageSize).append('\n');
+        buffer.append("Number of physical pages: ").append(numPages).append('\n');
+        buffer.append("Hash table size: ").append(pageMap.length).append('\n');
+        buffer.append("Page Map:\n");
+        int nCachedPages = 0;
+        for (int i = 0; i < pageMap.length; ++i) {
+            CacheBucket cb = pageMap[i];
+            cb.bucketLock.lock();
+            try {
+                CachedPage cp = cb.cachedPage;
+                if (cp != null) {
+                    buffer.append("   ").append(i).append('\n');
+                    while (cp != null) {
+                        buffer.append("      ").append(cp.cpid).append(" -> [")
+                                .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
+                                .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
+                                .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
+                                .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
+                        cp = cp.next;
+                        ++nCachedPages;
+                    }
+                }
+            } finally {
+                cb.bucketLock.unlock();
+            }
+        }
+        buffer.append("Number of cached pages: ").append(nCachedPages).append('\n');
+        return buffer.toString();
+    }
+
     private void read(CachedPage cPage) throws HyracksDataException {
         BufferedFileHandle fInfo = getFileInfo(cPage);
         cPage.buffer.clear();
@@ -340,6 +390,9 @@
 
     private void write(CachedPage cPage) throws HyracksDataException {
         BufferedFileHandle fInfo = getFileInfo(cPage);
+        if(fInfo.fileHasBeenDeleted()){
+            return;
+        }
         cPage.buffer.position(0);
         cPage.buffer.limit(pageSize);
         ioManager.syncWrite(fInfo.getFileHandle(), (long) BufferedFileHandle.getPageId(cPage.dpid) * pageSize,
@@ -459,43 +512,67 @@
         private boolean shutdownStart = false;
         private boolean shutdownComplete = false;
         private final Object cleanNotification = new Object();
-        private int cleanCount = 0;
+		// Simply keeps incrementing this counter when a page is cleaned.
+		// Used to implement wait-for-cleanerthread heuristic optimizations.
+		// A waiter can detect whether pages have been cleaned.
+		// No need to make this var volatile or synchronize it's access in any
+		// way because it is used for heuristics.
+        private int cleanedCount = 0;
 
         public CleanerThread() {
             setPriority(MAX_PRIORITY);
         }
 
+        public void cleanPage(CachedPage cPage, boolean force) {
+        	if (cPage.dirty.get()) {
+        		boolean proceed = false;
+        		if (force) {        			
+        			cPage.latch.writeLock().lock();
+        			proceed = true;
+        		} else {        			
+        			proceed = cPage.latch.readLock().tryLock();
+        		}
+                if (proceed) {
+                	try {
+                		// Make sure page is still dirty.
+                    	if (!cPage.dirty.get()) {
+                    		return;
+                    	}
+                        boolean cleaned = true;
+                        try {
+                            write(cPage);
+                        } catch (HyracksDataException e) {
+                            cleaned = false;
+                        }
+                        if (cleaned) {                           	                        	
+                        	cPage.dirty.set(false);
+                        	cPage.pinCount.decrementAndGet();
+                        	cleanedCount++;
+                        	synchronized (cleanNotification) {                        		
+                        		cleanNotification.notifyAll();
+                        	}
+                        }
+                    } finally {
+                        if (force) {
+                        	cPage.latch.writeLock().unlock();
+                        } else {
+                        	cPage.latch.readLock().unlock();
+                        }
+                    }
+                } else if (shutdownStart) {
+                    throw new IllegalStateException(
+                            "Cache closed, but unable to acquire read lock on dirty page: " + cPage.dpid);
+                }
+            }
+        }
+        
         @Override
         public synchronized void run() {
             try {
                 while (true) {
-                    for (int i = 0; i < numPages; ++i) {
-                        CachedPage cPage = cachedPages[i];
-                        if (cPage.dirty.get()) {
-                            if (cPage.latch.readLock().tryLock()) {
-                                try {
-                                    boolean cleaned = true;
-                                    try {
-                                        write(cPage);
-                                    } catch (HyracksDataException e) {
-                                        cleaned = false;
-                                    }
-                                    if (cleaned) {
-                                        cPage.dirty.set(false);
-                                        cPage.pinCount.decrementAndGet();
-                                        synchronized (cleanNotification) {
-                                            ++cleanCount;
-                                            cleanNotification.notifyAll();
-                                        }
-                                    }
-                                } finally {
-                                    cPage.latch.readLock().unlock();
-                                }
-                            } else if (shutdownStart) {
-                                throw new IllegalStateException(
-                                        "Cache closed, but unable to acquire read lock on dirty page: " + cPage.dpid);
-                            }
-                        }
+                    for (int i = 0; i < numPages; ++i) {                        
+                    	CachedPage cPage = cachedPages[i];
+                        cleanPage(cPage, false);
                     }
                     if (shutdownStart) {
                         break;
@@ -514,8 +591,8 @@
     }
 
     @Override
-    public void close() {        
-        closed = true;
+    public void close() {
+    	closed = true;
         synchronized (cleanerThread) {
             cleanerThread.shutdownStart = true;
             cleanerThread.notifyAll();
@@ -526,19 +603,22 @@
                     e.printStackTrace();
                 }
             }
-        }    
-        
+        }
+
         synchronized (fileInfoMap) {
             try {
-                for(Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
-                    sweepAndFlush(entry.getKey());
-                    ioManager.close(entry.getValue().getFileHandle());
+                for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
+                    boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
+                    sweepAndFlush(entry.getKey(), !fileHasBeenDeleted);
+                    if (!fileHasBeenDeleted) {
+                        ioManager.close(entry.getValue().getFileHandle());
+                    }
                 }
-            } catch(HyracksDataException e) {
+            } catch (HyracksDataException e) {
                 e.printStackTrace();
             }
             fileInfoMap.clear();
-        }        
+        }
     }
 
     @Override
@@ -560,28 +640,32 @@
             BufferedFileHandle fInfo;
             fInfo = fileInfoMap.get(fileId);
             if (fInfo == null) {
-                
-                // map is full, make room by removing cleaning up unreferenced files
+
+                // map is full, make room by cleaning up unreferenced files
                 boolean unreferencedFileFound = true;
-                while(fileInfoMap.size() >= maxOpenFiles && unreferencedFileFound) {                
-                    unreferencedFileFound = false;                    
-                    for(Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
-                        if(entry.getValue().getReferenceCount() <= 0) {
+                while (fileInfoMap.size() >= maxOpenFiles && unreferencedFileFound) {
+                    unreferencedFileFound = false;
+                    for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
+                        if (entry.getValue().getReferenceCount() <= 0) {
                             int entryFileId = entry.getKey();
-                            sweepAndFlush(entryFileId);
+                            boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
+                            sweepAndFlush(entryFileId, !fileHasBeenDeleted);
+                            if (!fileHasBeenDeleted) {
+                                ioManager.close(entry.getValue().getFileHandle());
+                            }
                             fileInfoMap.remove(entryFileId);
-                            ioManager.close(entry.getValue().getFileHandle());
                             unreferencedFileFound = true;
                             // for-each iterator is invalid because we changed fileInfoMap
                             break;
                         }
                     }
                 }
-                
-                if(fileInfoMap.size() >= maxOpenFiles) {
-                    throw new HyracksDataException("Could not open fileId " + fileId + ". Max number of files " + maxOpenFiles + " already opened and referenced.");
+
+                if (fileInfoMap.size() >= maxOpenFiles) {
+                    throw new HyracksDataException("Could not open fileId " + fileId + ". Max number of files "
+                            + maxOpenFiles + " already opened and referenced.");
                 }
-                
+
                 // create, open, and map new file reference
                 FileReference fileRef = fileMapManager.lookupFileName(fileId);
                 FileHandle fh = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
@@ -592,8 +676,8 @@
             fInfo.incReferenceCount();
         }
     }
-        
-    private void sweepAndFlush(int fileId) throws HyracksDataException {
+
+    private void sweepAndFlush(int fileId, boolean flushDirtyPages) throws HyracksDataException {
         for (int i = 0; i < pageMap.length; ++i) {
             CacheBucket bucket = pageMap[i];
             bucket.bucketLock.lock();
@@ -604,7 +688,7 @@
                     if (cPage == null) {
                         break;
                     }
-                    if (invalidateIfFileIdMatch(fileId, cPage)) {
+                    if (invalidateIfFileIdMatch(fileId, cPage, flushDirtyPages)) {
                         prev.next = cPage.next;
                         cPage.next = null;
                     } else {
@@ -613,7 +697,7 @@
                 }
                 // Take care of the head of the chain.
                 if (bucket.cachedPage != null) {
-                    if (invalidateIfFileIdMatch(fileId, bucket.cachedPage)) {
+                    if (invalidateIfFileIdMatch(fileId, bucket.cachedPage, flushDirtyPages)) {
                         CachedPage cPage = bucket.cachedPage;
                         bucket.cachedPage = bucket.cachedPage.next;
                         cPage.next = null;
@@ -625,15 +709,21 @@
         }
     }
 
-    private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage) throws HyracksDataException {
+    private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage, boolean flushDirtyPages)
+            throws HyracksDataException {
         if (BufferedFileHandle.getFileId(cPage.dpid) == fileId) {
+            int pinCount = -1;
             if (cPage.dirty.get()) {
-                write(cPage);
-                cPage.dirty.set(false);
-                cPage.pinCount.decrementAndGet();
+                if (flushDirtyPages) {
+                    write(cPage);
+                }
+                cPage.dirty.set(false);                
+                pinCount = cPage.pinCount.decrementAndGet();
+            } else {
+                pinCount = cPage.pinCount.get();
             }
-            if (cPage.pinCount.get() != 0) {
-                throw new IllegalStateException("Page is pinned and file is being closed");
+            if (pinCount != 0) {
+                throw new IllegalStateException("Page is pinned and file is being closed. Pincount is: " + pinCount);
             }
             cPage.invalidate();
             return true;
@@ -646,28 +736,70 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Closing file: " + fileId + " in cache: " + this);
         }
+        if (LOGGER.isLoggable(Level.FINE)) {            
+            LOGGER.fine(dumpState());
+        }
+        
         synchronized (fileInfoMap) {
             BufferedFileHandle fInfo = fileInfoMap.get(fileId);
             if (fInfo == null) {
                 throw new HyracksDataException("Closing unopened file");
             }
             if (fInfo.decReferenceCount() < 0) {
-                throw new HyracksDataException("Closed fileId: " + fileId + " more times than it was opened.");                                
+                throw new HyracksDataException("Closed fileId: " + fileId + " more times than it was opened.");
             }
         }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Closed file: " + fileId + " in cache: " + this);
+        }
     }
 
     @Override
-    public synchronized void deleteFile(int fileId) throws HyracksDataException {
+    public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+    	// Assumes the caller has pinned the page.
+    	cleanerThread.cleanPage((CachedPage)page, true);
+    }
+	
+	@Override
+    public void force(int fileId, boolean metadata) throws HyracksDataException {
+    	BufferedFileHandle fInfo = null;
+    	synchronized (fileInfoMap) {
+    		fInfo = fileInfoMap.get(fileId);
+    		try {
+    			fInfo.getFileHandle().getFileChannel().force(metadata);
+    		} catch (IOException e) {
+    			throw new HyracksDataException(e);
+    		}
+    	}    	
+    }
+	
+    @Override
+    public synchronized void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Deleting file: " + fileId + " in cache: " + this);
         }
+        if (flushDirtyPages) {
+        	synchronized (fileInfoMap) {
+        		sweepAndFlush(fileId, flushDirtyPages);
+        	}
+        }
         synchronized (fileInfoMap) {
-            BufferedFileHandle fInfo = fileInfoMap.get(fileId);
-            if (fInfo != null) {
-                throw new HyracksDataException("Deleting open file");
+            BufferedFileHandle fInfo = null;
+            try {
+                fInfo = fileInfoMap.get(fileId);
+                if (fInfo != null && fInfo.getReferenceCount() > 0) {
+                    throw new HyracksDataException("Deleting open file");
+                }
+            } finally {
+                fileMapManager.unregisterFile(fileId);
+                if (fInfo != null) {
+                    // Mark the fInfo as deleted, 
+                    // such that when its pages are reclaimed in openFile(),
+                    // the pages are not flushed to disk but only invalidates.
+                    ioManager.close(fInfo.getFileHandle());
+                    fInfo.markAsDeleted();
+                }
             }
-            fileMapManager.unregisterFile(fileId);
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
new file mode 100644
index 0000000..d610c7e
--- /dev/null
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.common.buffercache;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+
+/**
+ * Implementation of an IBufferCache that counts the number of pins/unpins,
+ * latches/unlatches, and file create/delete/open/close called on it. It
+ * delegates the actual functionality to another IBufferCache set in the c'tor.
+ * The counters are updated in a thread-safe fashion using AtomicLong.
+ */
+public class DebugBufferCache implements IBufferCache {
+
+    // Actual BufferCache functionality is delegated to this bufferCache.
+    private final IBufferCache bufferCache;
+    private AtomicLong pinCount;
+    private AtomicLong unpinCount;
+    private AtomicLong readLatchCount;
+    private AtomicLong readUnlatchCount;
+    private AtomicLong writeLatchCount;
+    private AtomicLong writeUnlatchCount;
+    private AtomicLong createFileCount;
+    private AtomicLong deleteFileCount;
+    private AtomicLong openFileCount;
+    private AtomicLong closeFileCount;
+
+    public DebugBufferCache(IBufferCache bufferCache) {
+        this.bufferCache = bufferCache;
+        resetCounters();
+    }
+
+    @Override
+    public void createFile(FileReference fileRef) throws HyracksDataException {
+        bufferCache.createFile(fileRef);
+        createFileCount.addAndGet(1);
+    }
+
+    @Override
+    public void openFile(int fileId) throws HyracksDataException {
+        bufferCache.openFile(fileId);
+        openFileCount.addAndGet(1);
+    }
+
+    @Override
+    public void closeFile(int fileId) throws HyracksDataException {
+        bufferCache.closeFile(fileId);
+        closeFileCount.addAndGet(1);
+    }
+
+    @Override
+    public void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException {
+        bufferCache.deleteFile(fileId, flushDirtyPages);
+        deleteFileCount.addAndGet(1);
+    }
+
+    @Override
+    public ICachedPage tryPin(long dpid) throws HyracksDataException {
+        return bufferCache.tryPin(dpid);
+    }
+
+    @Override
+    public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
+        ICachedPage page = bufferCache.pin(dpid, newPage);
+        pinCount.addAndGet(1);
+        return page;
+    }
+
+    @Override
+    public void unpin(ICachedPage page) throws HyracksDataException {
+        bufferCache.unpin(page);
+        unpinCount.addAndGet(1);
+    }
+
+    @Override
+    public int getPageSize() {
+        return bufferCache.getPageSize();
+    }
+
+    @Override
+    public int getNumPages() {
+        return bufferCache.getNumPages();
+    }
+
+    @Override
+    public void close() {
+        bufferCache.close();
+    }
+
+    public void resetCounters() {
+        pinCount.set(0);
+        unpinCount.set(0);
+        readLatchCount.set(0);
+        readUnlatchCount.set(0);
+        writeLatchCount.set(0);
+        writeUnlatchCount.set(0);
+        createFileCount.set(0);
+        deleteFileCount.set(0);
+        openFileCount.set(0);
+        closeFileCount.set(0);
+    }
+
+    public long getPinCount() {
+        return pinCount.get();
+    }
+
+    public long getUnpinCount() {
+        return unpinCount.get();
+    }
+
+    public long getReadLatchCount() {
+        return readLatchCount.get();
+    }
+
+    public long getReadUnlatchCount() {
+        return readUnlatchCount.get();
+    }
+
+    public long getWriteLatchCount() {
+        return writeLatchCount.get();
+    }
+
+    public long getWriteUnlatchCount() {
+        return writeUnlatchCount.get();
+    }
+
+    public long getCreateFileCount() {
+        return createFileCount.get();
+    }
+
+    public long getDeleteFileCount() {
+        return deleteFileCount.get();
+    }
+
+    public long getOpenFileCount() {
+        return openFileCount.get();
+    }
+
+    public long getCloseFileCount() {
+        return closeFileCount.get();
+    }
+
+	@Override
+	public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+		bufferCache.flushDirtyPage(page);
+	}
+
+	@Override
+	public void force(int fileId, boolean metadata) throws HyracksDataException {
+		bufferCache.force(fileId, metadata);
+	}
+}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
index e82c01b..e8b407e 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
@@ -24,14 +24,18 @@
 
     public void closeFile(int fileId) throws HyracksDataException;
 
-    public void deleteFile(int fileId) throws HyracksDataException;
-
+    public void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException;
+    
     public ICachedPage tryPin(long dpid) throws HyracksDataException;
 
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException;
 
     public void unpin(ICachedPage page) throws HyracksDataException;
 
+    public void flushDirtyPage(ICachedPage page) throws HyracksDataException;
+    
+    public void force(int fileId, boolean metadata) throws HyracksDataException;
+    
     public int getPageSize();
 
     public int getNumPages();
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/BufferedFileHandle.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/BufferedFileHandle.java
index 3137f20..ac062d2 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/BufferedFileHandle.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/BufferedFileHandle.java
@@ -37,6 +37,14 @@
         return handle;
     }
 
+    public void markAsDeleted() {
+    	handle = null;
+    }
+    
+    public boolean fileHasBeenDeleted() {
+    	return handle == null;
+    }
+    
     public int incReferenceCount() {
         return refCount.incrementAndGet();
     }