Fixed issue 44.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_btree_updates_next@769 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index d7558ba..2fc3929 100644
--- a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -564,8 +564,11 @@
         synchronized (fileInfoMap) {
             try {
                 for(Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
-                    sweepAndFlush(entry.getKey());
-                    ioManager.close(entry.getValue().getFileHandle());
+                	boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
+                    sweepAndFlush(entry.getKey(), !fileHasBeenDeleted);                            
+                    if (!fileHasBeenDeleted) {
+                    	ioManager.close(entry.getValue().getFileHandle());
+                    }
                 }
             } catch(HyracksDataException e) {
                 e.printStackTrace();
@@ -601,9 +604,12 @@
                     for(Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
                         if(entry.getValue().getReferenceCount() <= 0) {
                             int entryFileId = entry.getKey();
-                            sweepAndFlush(entryFileId);
+                            boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
+                            sweepAndFlush(entryFileId, !fileHasBeenDeleted);                            
+                            if (!fileHasBeenDeleted) {
+                            	ioManager.close(entry.getValue().getFileHandle());
+                            }
                             fileInfoMap.remove(entryFileId);
-                            ioManager.close(entry.getValue().getFileHandle());
                             unreferencedFileFound = true;
                             // for-each iterator is invalid because we changed fileInfoMap
                             break;
@@ -626,7 +632,7 @@
         }
     }
         
-    private void sweepAndFlush(int fileId) throws HyracksDataException {
+    private void sweepAndFlush(int fileId, boolean flushDirtyPages) throws HyracksDataException {
         for (int i = 0; i < pageMap.length; ++i) {
             CacheBucket bucket = pageMap[i];
             bucket.bucketLock.lock();
@@ -637,7 +643,7 @@
                     if (cPage == null) {
                         break;
                     }
-                    if (invalidateIfFileIdMatch(fileId, cPage)) {
+                    if (invalidateIfFileIdMatch(fileId, cPage, flushDirtyPages)) {
                         prev.next = cPage.next;
                         cPage.next = null;
                     } else {
@@ -646,7 +652,7 @@
                 }
                 // Take care of the head of the chain.
                 if (bucket.cachedPage != null) {
-                    if (invalidateIfFileIdMatch(fileId, bucket.cachedPage)) {
+                    if (invalidateIfFileIdMatch(fileId, bucket.cachedPage, flushDirtyPages)) {
                         CachedPage cPage = bucket.cachedPage;
                         bucket.cachedPage = bucket.cachedPage.next;
                         cPage.next = null;
@@ -658,10 +664,12 @@
         }
     }
 
-    private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage) throws HyracksDataException {
+    private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage, boolean flushDirtyPages) throws HyracksDataException {
         if (BufferedFileHandle.getFileId(cPage.dpid) == fileId) {
             if (cPage.dirty.get()) {
-                write(cPage);
+				if (flushDirtyPages) {
+					write(cPage);
+				}
                 cPage.dirty.set(false);
                 cPage.pinCount.decrementAndGet();
             }
@@ -697,11 +705,22 @@
             LOGGER.info("Deleting file: " + fileId + " in cache: " + this);
         }
         synchronized (fileInfoMap) {
-            BufferedFileHandle fInfo = fileInfoMap.get(fileId);
-            if (fInfo != null) {
-                throw new HyracksDataException("Deleting open file");
-            }
-            fileMapManager.unregisterFile(fileId);
+        	BufferedFileHandle fInfo = null;
+        	try {
+				fInfo = fileInfoMap.get(fileId);
+				if (fInfo != null && fInfo.getReferenceCount() > 0) {
+					throw new HyracksDataException("Deleting open file");
+				}
+			} finally {
+				fileMapManager.unregisterFile(fileId);
+				if (fInfo != null) {
+					// Mark the fInfo as deleted, 
+					// such that when its pages are reclaimed in openFile(),
+					// the pages are not flushed to disk but only invalidates.
+					ioManager.close(fInfo.getFileHandle());
+					fInfo.markAsDeleted();
+				}
+			}       
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/BufferedFileHandle.java b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/BufferedFileHandle.java
index 3137f20..ac062d2 100644
--- a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/BufferedFileHandle.java
+++ b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/BufferedFileHandle.java
@@ -37,6 +37,14 @@
         return handle;
     }
 
+    public void markAsDeleted() {
+    	handle = null;
+    }
+    
+    public boolean fileHasBeenDeleted() {
+    	return handle == null;
+    }
+    
     public int incReferenceCount() {
         return refCount.incrementAndGet();
     }
diff --git a/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheRegressionTests.java b/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheRegressionTests.java
new file mode 100644
index 0000000..b8d64ef
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheRegressionTests.java
@@ -0,0 +1,121 @@
+package edu.uci.ics.hyracks.storage.common;
+
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.io.FileHandle;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IIOManager.FileReadWriteMode;
+import edu.uci.ics.hyracks.api.io.IIOManager.FileSyncMode;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
+import edu.uci.ics.hyracks.test.support.TestUtils;
+
+public class BufferCacheRegressionTests {
+	protected static final String tmpDir = System.getProperty("java.io.tmpdir");
+	protected static final String sep = System.getProperty("file.separator");
+	
+	protected String fileName = tmpDir + sep + "flushTestFile";
+	
+	private static final int PAGE_SIZE = 256;
+    private static final int HYRACKS_FRAME_SIZE = PAGE_SIZE;
+    private IHyracksTaskContext ctx = TestUtils.create(HYRACKS_FRAME_SIZE);
+		
+	// We want to test the following behavior when reclaiming a file slot in the
+	// buffer cache:
+	// 1. If the file being evicted was deleted, then its dirty pages should be
+	// invalidated, but most not be flushed.
+	// 2. If the file was not deleted, then we must flush its dirty pages.
+    @Test
+	public void testFlushBehaviorOnFileEviction() throws IOException {
+    	File f = new File(fileName);
+		if (f.exists()) {
+			f.delete();
+		}
+    	flushBehaviorTest(true);
+		flushBehaviorTest(false);
+	}
+	
+    private void flushBehaviorTest(boolean deleteFile) throws IOException {
+        TestStorageManagerComponentHolder.init(PAGE_SIZE, 10, 1);                
+        
+        IBufferCache bufferCache = TestStorageManagerComponentHolder.getBufferCache(ctx);
+        IFileMapProvider fmp = TestStorageManagerComponentHolder.getFileMapProvider(ctx);       
+        
+        FileReference firstFileRef = new FileReference(new File(fileName));
+        bufferCache.createFile(firstFileRef);
+        int firstFileId = fmp.lookupFileId(firstFileRef);
+        bufferCache.openFile(firstFileId);
+        
+        // Fill the first page with known data and make it dirty by write latching it.
+        ICachedPage writePage = bufferCache.pin(BufferedFileHandle.getDiskPageId(firstFileId, 0), true);
+        writePage.acquireWriteLatch();
+        try {
+        	ByteBuffer buf = writePage.getBuffer();
+        	for(int i = 0; i < buf.capacity(); i++) {
+        		buf.put(Byte.MAX_VALUE);
+        	}
+        } finally {
+        	writePage.releaseWriteLatch();
+        	bufferCache.unpin(writePage);
+        }
+		bufferCache.closeFile(firstFileId);
+		if (deleteFile) {
+			bufferCache.deleteFile(firstFileId);
+		}
+        
+        // Create a file with the same name. 
+        FileReference secondFileRef = new FileReference(new File(fileName));
+        bufferCache.createFile(secondFileRef);
+        int secondFileId = fmp.lookupFileId(secondFileRef);
+        
+		// This open will replace the firstFileRef's slot in the BufferCache,
+		// causing it's pages to be cleaned up. We want to make sure that those
+		// dirty pages are not flushed to the disk, because the file was declared as deleted, and
+		// somebody might be already using the same filename again (having been
+		// assigned a different fileId).
+        bufferCache.openFile(secondFileId);
+        
+		// Manually open the file and inspect it's contents. We cannot simply
+		// ask the BufferCache to pin the page, because it would return the same
+		// physical memory again, and for performance reasons pages are never
+		// reset with 0's.
+        IIOManager ioManager = ctx.getIOManager();
+        FileReference testFileRef = new FileReference(new File(fileName));
+        FileHandle testFileHandle = new FileHandle(testFileRef);
+        testFileHandle.open(FileReadWriteMode.READ_ONLY, FileSyncMode.METADATA_SYNC_DATA_SYNC);
+        ByteBuffer testBuffer = ByteBuffer.allocate(PAGE_SIZE);
+        ioManager.syncRead(testFileHandle, 0, testBuffer);
+		for (int i = 0; i < testBuffer.capacity(); i++) {
+			if (deleteFile) {
+				// We deleted the file. We expect to see a clean buffer.
+				if (testBuffer.get(i) == Byte.MAX_VALUE) {
+					fail("Page 0 of deleted file was fazily flushed in openFile(), "
+							+ "corrupting the data of a newly created file with the same name.");
+				}
+			} else {
+				// We didn't delete the file. We expect to see a buffer full of
+				// Byte.MAX_VALUE.
+				if (testBuffer.get(i) != Byte.MAX_VALUE) {
+					fail("Page 0 of closed file was not flushed when properly, when reclaiming the file slot of fileId 0 in the BufferCache.");
+				}
+			}
+		}
+        testFileHandle.close();
+        bufferCache.closeFile(secondFileId);
+        if (deleteFile) {
+        	bufferCache.deleteFile(secondFileId);
+        }
+        bufferCache.close();
+    }
+}