Merged hyracks_dev_next -r 1287 into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@1288 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-storage-common/.classpath b/hyracks/hyracks-storage-common/.classpath
deleted file mode 100644
index 1f3c1ff..0000000
--- a/hyracks/hyracks-storage-common/.classpath
+++ /dev/null
@@ -1,7 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<classpath>
- <classpathentry kind="src" output="target/classes" path="src/main/java"/>
- <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
- <classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
- <classpathentry kind="output" path="target/classes"/>
-</classpath>
diff --git a/hyracks/hyracks-storage-common/.project b/hyracks/hyracks-storage-common/.project
deleted file mode 100644
index d990298..0000000
--- a/hyracks/hyracks-storage-common/.project
+++ /dev/null
@@ -1,23 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<projectDescription>
- <name>hyracks-storage-common</name>
- <comment></comment>
- <projects>
- </projects>
- <buildSpec>
- <buildCommand>
- <name>org.eclipse.jdt.core.javabuilder</name>
- <arguments>
- </arguments>
- </buildCommand>
- <buildCommand>
- <name>org.maven.ide.eclipse.maven2Builder</name>
- <arguments>
- </arguments>
- </buildCommand>
- </buildSpec>
- <natures>
- <nature>org.maven.ide.eclipse.maven2Nature</nature>
- <nature>org.eclipse.jdt.core.javanature</nature>
- </natures>
-</projectDescription>
diff --git a/hyracks/hyracks-storage-common/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-storage-common/.settings/org.eclipse.jdt.core.prefs
deleted file mode 100644
index 450f5c4..0000000
--- a/hyracks/hyracks-storage-common/.settings/org.eclipse.jdt.core.prefs
+++ /dev/null
@@ -1,6 +0,0 @@
-#Fri May 20 19:34:04 PDT 2011
-eclipse.preferences.version=1
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
-org.eclipse.jdt.core.compiler.compliance=1.6
-org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
-org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks/hyracks-storage-common/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-storage-common/.settings/org.maven.ide.eclipse.prefs
deleted file mode 100644
index a91dbc3..0000000
--- a/hyracks/hyracks-storage-common/.settings/org.maven.ide.eclipse.prefs
+++ /dev/null
@@ -1,9 +0,0 @@
-#Tue Aug 24 14:59:44 PDT 2010
-activeProfiles=
-eclipse.preferences.version=1
-fullBuildGoals=process-test-resources
-includeModules=false
-resolveWorkspaceProjects=true
-resourceFilterGoals=process-resources resources\:testResources
-skipCompilerPlugin=true
-version=1
diff --git a/hyracks/hyracks-storage-common/pom.xml b/hyracks/hyracks-storage-common/pom.xml
index adaceab..50eecd2 100644
--- a/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks/hyracks-storage-common/pom.xml
@@ -2,12 +2,12 @@
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-common</artifactId>
- <version>0.1.9-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks</artifactId>
- <version>0.1.9-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
</parent>
<build>
@@ -27,7 +27,7 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
- <version>0.1.9-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/IStorageManagerInterface.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/IStorageManagerInterface.java
index f64af2e..562305e 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/IStorageManagerInterface.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/IStorageManagerInterface.java
@@ -16,12 +16,12 @@
import java.io.Serializable;
-import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
public interface IStorageManagerInterface extends Serializable {
- public IBufferCache getBufferCache(IHyracksStageletContext ctx);
+ public IBufferCache getBufferCache(IHyracksTaskContext ctx);
- public IFileMapProvider getFileMapProvider(IHyracksStageletContext ctx);
+ public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index f9ac4d5..a6f1a4b 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.storage.common.buffercache;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
@@ -37,7 +38,9 @@
private static final Logger LOGGER = Logger.getLogger(BufferCache.class.getName());
private static final int MAP_FACTOR = 2;
- private static final int MAX_VICTIMIZATION_TRY_COUNT = 3;
+ private static final int MAX_VICTIMIZATION_TRY_COUNT = 5;
+ private static final int MAX_WAIT_FOR_CLEANER_THREAD_TIME = 1000;
+ private static final int MIN_CLEANED_COUNT_DIFF = 4;
private final int maxOpenFiles;
@@ -49,12 +52,13 @@
private final IPageReplacementStrategy pageReplacementStrategy;
private final IFileMapManager fileMapManager;
private final CleanerThread cleanerThread;
- private final Map<Integer, BufferedFileHandle> fileInfoMap;
-
+ private final Map<Integer, BufferedFileHandle> fileInfoMap;
+
private boolean closed;
public BufferCache(IIOManager ioManager, ICacheMemoryAllocator allocator,
- IPageReplacementStrategy pageReplacementStrategy, IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles) {
+ IPageReplacementStrategy pageReplacementStrategy, IFileMapManager fileMapManager, int pageSize,
+ int numPages, int maxOpenFiles) {
this.ioManager = ioManager;
this.pageSize = pageSize;
this.numPages = numPages;
@@ -91,21 +95,24 @@
if (closed) {
throw new HyracksDataException("pin called on a closed cache");
}
-
+
// check whether file has been created and opened
- int fileId = BufferedFileHandle.getFileId(dpid);
- BufferedFileHandle fInfo = fileInfoMap.get(fileId);
- if(fInfo == null) {
+ int fileId = BufferedFileHandle.getFileId(dpid);
+ BufferedFileHandle fInfo = null;
+ synchronized(fileInfoMap) {
+ fInfo = fileInfoMap.get(fileId);
+ }
+ if (fInfo == null) {
throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been created.");
- } else if(fInfo.getReferenceCount() <= 0) {
+ } else if (fInfo.getReferenceCount() <= 0) {
throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been opened.");
}
}
-
- @Override
- public ICachedPage tryPin(long dpid) throws HyracksDataException {
+
+ @Override
+ public ICachedPage tryPin(long dpid) throws HyracksDataException {
pinSanityCheck(dpid);
-
+
CachedPage cPage = null;
int hash = hash(dpid);
CacheBucket bucket = pageMap[hash];
@@ -114,7 +121,7 @@
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
- cPage.pinCount.incrementAndGet();
+ cPage.pinCount.incrementAndGet();
pageReplacementStrategy.notifyCachePageAccess(cPage);
return cPage;
}
@@ -123,15 +130,22 @@
} finally {
bucket.bucketLock.unlock();
}
-
+
return cPage;
}
-
+
@Override
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
pinSanityCheck(dpid);
-
+
CachedPage cPage = findPage(dpid, newPage);
+ if (cPage == null) {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine(dumpState());
+ }
+ throw new HyracksDataException("Failed to pin page " + BufferedFileHandle.getFileId(dpid) + ":"
+ + BufferedFileHandle.getPageId(dpid) + " because all pages are pinned.");
+ }
if (!newPage) {
if (!cPage.valid) {
/*
@@ -156,12 +170,10 @@
}
private CachedPage findPage(long dpid, boolean newPage) {
- int victimizationTryCount = 0;
- int localCleanCount = -1;
- synchronized (cleanerThread.cleanNotification) {
- localCleanCount = cleanerThread.cleanCount;
- }
+ int victimizationTryCount = 0;
while (true) {
+ int startCleanedCount = cleanerThread.cleanedCount;
+
CachedPage cPage = null;
/*
* Hash dpid to get a bucket and then check if the page exists in the bucket.
@@ -173,7 +185,7 @@
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
- cPage.pinCount.incrementAndGet();
+ cPage.pinCount.incrementAndGet();
return cPage;
}
cPage = cPage.next;
@@ -215,8 +227,8 @@
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
- cPage.pinCount.incrementAndGet();
- victim.pinCount.decrementAndGet();
+ cPage.pinCount.incrementAndGet();
+ victim.pinCount.decrementAndGet();
return cPage;
}
cPage = cPage.next;
@@ -237,14 +249,14 @@
bucket.bucketLock.lock();
try {
if (victim.pinCount.get() != 1) {
- victim.pinCount.decrementAndGet();
+ victim.pinCount.decrementAndGet();
continue;
}
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
- cPage.pinCount.incrementAndGet();
- victim.pinCount.decrementAndGet();
+ cPage.pinCount.incrementAndGet();
+ victim.pinCount.decrementAndGet();
return cPage;
}
cPage = cPage.next;
@@ -268,14 +280,14 @@
}
try {
if (victim.pinCount.get() != 1) {
- victim.pinCount.decrementAndGet();
+ victim.pinCount.decrementAndGet();
continue;
}
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
- cPage.pinCount.incrementAndGet();
- victim.pinCount.decrementAndGet();
+ cPage.pinCount.incrementAndGet();
+ victim.pinCount.decrementAndGet();
return cPage;
}
cPage = cPage.next;
@@ -309,18 +321,56 @@
synchronized (cleanerThread) {
cleanerThread.notifyAll();
}
+ // Heuristic optimization. Check whether the cleaner thread has
+ // cleaned pages since we did our last pin attempt.
+ if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) {
+ // Don't go to sleep and wait for notification from the cleaner,
+ // just try to pin again immediately.
+ continue;
+ }
synchronized (cleanerThread.cleanNotification) {
- if (cleanerThread.cleanCount == localCleanCount) {
- try {
- cleanerThread.cleanNotification.wait(1000);
- } catch (InterruptedException e) {
- // Do nothing
- }
- }
+ try {
+ cleanerThread.cleanNotification.wait(MAX_WAIT_FOR_CLEANER_THREAD_TIME);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
}
}
}
+ private String dumpState() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("Buffer cache state\n");
+ buffer.append("Page Size: ").append(pageSize).append('\n');
+ buffer.append("Number of physical pages: ").append(numPages).append('\n');
+ buffer.append("Hash table size: ").append(pageMap.length).append('\n');
+ buffer.append("Page Map:\n");
+ int nCachedPages = 0;
+ for (int i = 0; i < pageMap.length; ++i) {
+ CacheBucket cb = pageMap[i];
+ cb.bucketLock.lock();
+ try {
+ CachedPage cp = cb.cachedPage;
+ if (cp != null) {
+ buffer.append(" ").append(i).append('\n');
+ while (cp != null) {
+ buffer.append(" ").append(cp.cpid).append(" -> [")
+ .append(BufferedFileHandle.getFileId(cp.dpid)).append(':')
+ .append(BufferedFileHandle.getPageId(cp.dpid)).append(", ").append(cp.pinCount.get())
+ .append(", ").append(cp.valid ? "valid" : "invalid").append(", ")
+ .append(cp.dirty.get() ? "dirty" : "clean").append("]\n");
+ cp = cp.next;
+ ++nCachedPages;
+ }
+ }
+ } finally {
+ cb.bucketLock.unlock();
+ }
+ }
+ buffer.append("Number of cached pages: ").append(nCachedPages).append('\n');
+ return buffer.toString();
+ }
+
private void read(CachedPage cPage) throws HyracksDataException {
BufferedFileHandle fInfo = getFileInfo(cPage);
cPage.buffer.clear();
@@ -340,6 +390,9 @@
private void write(CachedPage cPage) throws HyracksDataException {
BufferedFileHandle fInfo = getFileInfo(cPage);
+ if(fInfo.fileHasBeenDeleted()){
+ return;
+ }
cPage.buffer.position(0);
cPage.buffer.limit(pageSize);
ioManager.syncWrite(fInfo.getFileHandle(), (long) BufferedFileHandle.getPageId(cPage.dpid) * pageSize,
@@ -459,43 +512,67 @@
private boolean shutdownStart = false;
private boolean shutdownComplete = false;
private final Object cleanNotification = new Object();
- private int cleanCount = 0;
+ // Simply keeps incrementing this counter when a page is cleaned.
+ // Used to implement wait-for-cleanerthread heuristic optimizations.
+ // A waiter can detect whether pages have been cleaned.
+ // No need to make this var volatile or synchronize it's access in any
+ // way because it is used for heuristics.
+ private int cleanedCount = 0;
public CleanerThread() {
setPriority(MAX_PRIORITY);
}
+ public void cleanPage(CachedPage cPage, boolean force) {
+ if (cPage.dirty.get()) {
+ boolean proceed = false;
+ if (force) {
+ cPage.latch.writeLock().lock();
+ proceed = true;
+ } else {
+ proceed = cPage.latch.readLock().tryLock();
+ }
+ if (proceed) {
+ try {
+ // Make sure page is still dirty.
+ if (!cPage.dirty.get()) {
+ return;
+ }
+ boolean cleaned = true;
+ try {
+ write(cPage);
+ } catch (HyracksDataException e) {
+ cleaned = false;
+ }
+ if (cleaned) {
+ cPage.dirty.set(false);
+ cPage.pinCount.decrementAndGet();
+ cleanedCount++;
+ synchronized (cleanNotification) {
+ cleanNotification.notifyAll();
+ }
+ }
+ } finally {
+ if (force) {
+ cPage.latch.writeLock().unlock();
+ } else {
+ cPage.latch.readLock().unlock();
+ }
+ }
+ } else if (shutdownStart) {
+ throw new IllegalStateException(
+ "Cache closed, but unable to acquire read lock on dirty page: " + cPage.dpid);
+ }
+ }
+ }
+
@Override
public synchronized void run() {
try {
while (true) {
- for (int i = 0; i < numPages; ++i) {
- CachedPage cPage = cachedPages[i];
- if (cPage.dirty.get()) {
- if (cPage.latch.readLock().tryLock()) {
- try {
- boolean cleaned = true;
- try {
- write(cPage);
- } catch (HyracksDataException e) {
- cleaned = false;
- }
- if (cleaned) {
- cPage.dirty.set(false);
- cPage.pinCount.decrementAndGet();
- synchronized (cleanNotification) {
- ++cleanCount;
- cleanNotification.notifyAll();
- }
- }
- } finally {
- cPage.latch.readLock().unlock();
- }
- } else if (shutdownStart) {
- throw new IllegalStateException(
- "Cache closed, but unable to acquire read lock on dirty page: " + cPage.dpid);
- }
- }
+ for (int i = 0; i < numPages; ++i) {
+ CachedPage cPage = cachedPages[i];
+ cleanPage(cPage, false);
}
if (shutdownStart) {
break;
@@ -514,8 +591,8 @@
}
@Override
- public void close() {
- closed = true;
+ public void close() {
+ closed = true;
synchronized (cleanerThread) {
cleanerThread.shutdownStart = true;
cleanerThread.notifyAll();
@@ -526,19 +603,22 @@
e.printStackTrace();
}
}
- }
-
+ }
+
synchronized (fileInfoMap) {
try {
- for(Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
- sweepAndFlush(entry.getKey());
- ioManager.close(entry.getValue().getFileHandle());
+ for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
+ boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
+ sweepAndFlush(entry.getKey(), !fileHasBeenDeleted);
+ if (!fileHasBeenDeleted) {
+ ioManager.close(entry.getValue().getFileHandle());
+ }
}
- } catch(HyracksDataException e) {
+ } catch (HyracksDataException e) {
e.printStackTrace();
}
fileInfoMap.clear();
- }
+ }
}
@Override
@@ -560,28 +640,32 @@
BufferedFileHandle fInfo;
fInfo = fileInfoMap.get(fileId);
if (fInfo == null) {
-
- // map is full, make room by removing cleaning up unreferenced files
+
+ // map is full, make room by cleaning up unreferenced files
boolean unreferencedFileFound = true;
- while(fileInfoMap.size() >= maxOpenFiles && unreferencedFileFound) {
- unreferencedFileFound = false;
- for(Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
- if(entry.getValue().getReferenceCount() <= 0) {
+ while (fileInfoMap.size() >= maxOpenFiles && unreferencedFileFound) {
+ unreferencedFileFound = false;
+ for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
+ if (entry.getValue().getReferenceCount() <= 0) {
int entryFileId = entry.getKey();
- sweepAndFlush(entryFileId);
+ boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
+ sweepAndFlush(entryFileId, !fileHasBeenDeleted);
+ if (!fileHasBeenDeleted) {
+ ioManager.close(entry.getValue().getFileHandle());
+ }
fileInfoMap.remove(entryFileId);
- ioManager.close(entry.getValue().getFileHandle());
unreferencedFileFound = true;
// for-each iterator is invalid because we changed fileInfoMap
break;
}
}
}
-
- if(fileInfoMap.size() >= maxOpenFiles) {
- throw new HyracksDataException("Could not open fileId " + fileId + ". Max number of files " + maxOpenFiles + " already opened and referenced.");
+
+ if (fileInfoMap.size() >= maxOpenFiles) {
+ throw new HyracksDataException("Could not open fileId " + fileId + ". Max number of files "
+ + maxOpenFiles + " already opened and referenced.");
}
-
+
// create, open, and map new file reference
FileReference fileRef = fileMapManager.lookupFileName(fileId);
FileHandle fh = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
@@ -592,8 +676,8 @@
fInfo.incReferenceCount();
}
}
-
- private void sweepAndFlush(int fileId) throws HyracksDataException {
+
+ private void sweepAndFlush(int fileId, boolean flushDirtyPages) throws HyracksDataException {
for (int i = 0; i < pageMap.length; ++i) {
CacheBucket bucket = pageMap[i];
bucket.bucketLock.lock();
@@ -604,7 +688,7 @@
if (cPage == null) {
break;
}
- if (invalidateIfFileIdMatch(fileId, cPage)) {
+ if (invalidateIfFileIdMatch(fileId, cPage, flushDirtyPages)) {
prev.next = cPage.next;
cPage.next = null;
} else {
@@ -613,7 +697,7 @@
}
// Take care of the head of the chain.
if (bucket.cachedPage != null) {
- if (invalidateIfFileIdMatch(fileId, bucket.cachedPage)) {
+ if (invalidateIfFileIdMatch(fileId, bucket.cachedPage, flushDirtyPages)) {
CachedPage cPage = bucket.cachedPage;
bucket.cachedPage = bucket.cachedPage.next;
cPage.next = null;
@@ -625,15 +709,21 @@
}
}
- private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage) throws HyracksDataException {
+ private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage, boolean flushDirtyPages)
+ throws HyracksDataException {
if (BufferedFileHandle.getFileId(cPage.dpid) == fileId) {
+ int pinCount = -1;
if (cPage.dirty.get()) {
- write(cPage);
- cPage.dirty.set(false);
- cPage.pinCount.decrementAndGet();
+ if (flushDirtyPages) {
+ write(cPage);
+ }
+ cPage.dirty.set(false);
+ pinCount = cPage.pinCount.decrementAndGet();
+ } else {
+ pinCount = cPage.pinCount.get();
}
- if (cPage.pinCount.get() != 0) {
- throw new IllegalStateException("Page is pinned and file is being closed");
+ if (pinCount != 0) {
+ throw new IllegalStateException("Page is pinned and file is being closed. Pincount is: " + pinCount);
}
cPage.invalidate();
return true;
@@ -646,28 +736,70 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Closing file: " + fileId + " in cache: " + this);
}
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine(dumpState());
+ }
+
synchronized (fileInfoMap) {
BufferedFileHandle fInfo = fileInfoMap.get(fileId);
if (fInfo == null) {
throw new HyracksDataException("Closing unopened file");
}
if (fInfo.decReferenceCount() < 0) {
- throw new HyracksDataException("Closed fileId: " + fileId + " more times than it was opened.");
+ throw new HyracksDataException("Closed fileId: " + fileId + " more times than it was opened.");
}
}
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Closed file: " + fileId + " in cache: " + this);
+ }
}
@Override
- public synchronized void deleteFile(int fileId) throws HyracksDataException {
+ public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+ // Assumes the caller has pinned the page.
+ cleanerThread.cleanPage((CachedPage)page, true);
+ }
+
+ @Override
+ public void force(int fileId, boolean metadata) throws HyracksDataException {
+ BufferedFileHandle fInfo = null;
+ synchronized (fileInfoMap) {
+ fInfo = fileInfoMap.get(fileId);
+ try {
+ fInfo.getFileHandle().getFileChannel().force(metadata);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Deleting file: " + fileId + " in cache: " + this);
}
+ if (flushDirtyPages) {
+ synchronized (fileInfoMap) {
+ sweepAndFlush(fileId, flushDirtyPages);
+ }
+ }
synchronized (fileInfoMap) {
- BufferedFileHandle fInfo = fileInfoMap.get(fileId);
- if (fInfo != null) {
- throw new HyracksDataException("Deleting open file");
+ BufferedFileHandle fInfo = null;
+ try {
+ fInfo = fileInfoMap.get(fileId);
+ if (fInfo != null && fInfo.getReferenceCount() > 0) {
+ throw new HyracksDataException("Deleting open file");
+ }
+ } finally {
+ fileMapManager.unregisterFile(fileId);
+ if (fInfo != null) {
+ // Mark the fInfo as deleted,
+ // such that when its pages are reclaimed in openFile(),
+ // the pages are not flushed to disk but only invalidates.
+ ioManager.close(fInfo.getFileHandle());
+ fInfo.markAsDeleted();
+ }
}
- fileMapManager.unregisterFile(fileId);
}
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
new file mode 100644
index 0000000..d610c7e
--- /dev/null
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.common.buffercache;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+
+/**
+ * Implementation of an IBufferCache that counts the number of pins/unpins,
+ * latches/unlatches, and file create/delete/open/close called on it. It
+ * delegates the actual functionality to another IBufferCache set in the c'tor.
+ * The counters are updated in a thread-safe fashion using AtomicLong.
+ */
+public class DebugBufferCache implements IBufferCache {
+
+ // Actual BufferCache functionality is delegated to this bufferCache.
+ private final IBufferCache bufferCache;
+ private AtomicLong pinCount;
+ private AtomicLong unpinCount;
+ private AtomicLong readLatchCount;
+ private AtomicLong readUnlatchCount;
+ private AtomicLong writeLatchCount;
+ private AtomicLong writeUnlatchCount;
+ private AtomicLong createFileCount;
+ private AtomicLong deleteFileCount;
+ private AtomicLong openFileCount;
+ private AtomicLong closeFileCount;
+
+ public DebugBufferCache(IBufferCache bufferCache) {
+ this.bufferCache = bufferCache;
+ resetCounters();
+ }
+
+ @Override
+ public void createFile(FileReference fileRef) throws HyracksDataException {
+ bufferCache.createFile(fileRef);
+ createFileCount.addAndGet(1);
+ }
+
+ @Override
+ public void openFile(int fileId) throws HyracksDataException {
+ bufferCache.openFile(fileId);
+ openFileCount.addAndGet(1);
+ }
+
+ @Override
+ public void closeFile(int fileId) throws HyracksDataException {
+ bufferCache.closeFile(fileId);
+ closeFileCount.addAndGet(1);
+ }
+
+ @Override
+ public void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException {
+ bufferCache.deleteFile(fileId, flushDirtyPages);
+ deleteFileCount.addAndGet(1);
+ }
+
+ @Override
+ public ICachedPage tryPin(long dpid) throws HyracksDataException {
+ return bufferCache.tryPin(dpid);
+ }
+
+ @Override
+ public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
+ ICachedPage page = bufferCache.pin(dpid, newPage);
+ pinCount.addAndGet(1);
+ return page;
+ }
+
+ @Override
+ public void unpin(ICachedPage page) throws HyracksDataException {
+ bufferCache.unpin(page);
+ unpinCount.addAndGet(1);
+ }
+
+ @Override
+ public int getPageSize() {
+ return bufferCache.getPageSize();
+ }
+
+ @Override
+ public int getNumPages() {
+ return bufferCache.getNumPages();
+ }
+
+ @Override
+ public void close() {
+ bufferCache.close();
+ }
+
+ public void resetCounters() {
+ pinCount.set(0);
+ unpinCount.set(0);
+ readLatchCount.set(0);
+ readUnlatchCount.set(0);
+ writeLatchCount.set(0);
+ writeUnlatchCount.set(0);
+ createFileCount.set(0);
+ deleteFileCount.set(0);
+ openFileCount.set(0);
+ closeFileCount.set(0);
+ }
+
+ public long getPinCount() {
+ return pinCount.get();
+ }
+
+ public long getUnpinCount() {
+ return unpinCount.get();
+ }
+
+ public long getReadLatchCount() {
+ return readLatchCount.get();
+ }
+
+ public long getReadUnlatchCount() {
+ return readUnlatchCount.get();
+ }
+
+ public long getWriteLatchCount() {
+ return writeLatchCount.get();
+ }
+
+ public long getWriteUnlatchCount() {
+ return writeUnlatchCount.get();
+ }
+
+ public long getCreateFileCount() {
+ return createFileCount.get();
+ }
+
+ public long getDeleteFileCount() {
+ return deleteFileCount.get();
+ }
+
+ public long getOpenFileCount() {
+ return openFileCount.get();
+ }
+
+ public long getCloseFileCount() {
+ return closeFileCount.get();
+ }
+
+ @Override
+ public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+ bufferCache.flushDirtyPage(page);
+ }
+
+ @Override
+ public void force(int fileId, boolean metadata) throws HyracksDataException {
+ bufferCache.force(fileId, metadata);
+ }
+}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
index e82c01b..e8b407e 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
@@ -24,14 +24,18 @@
public void closeFile(int fileId) throws HyracksDataException;
- public void deleteFile(int fileId) throws HyracksDataException;
-
+ public void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException;
+
public ICachedPage tryPin(long dpid) throws HyracksDataException;
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException;
public void unpin(ICachedPage page) throws HyracksDataException;
+ public void flushDirtyPage(ICachedPage page) throws HyracksDataException;
+
+ public void force(int fileId, boolean metadata) throws HyracksDataException;
+
public int getPageSize();
public int getNumPages();
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/BufferedFileHandle.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/BufferedFileHandle.java
index 3137f20..ac062d2 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/BufferedFileHandle.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/BufferedFileHandle.java
@@ -37,6 +37,14 @@
return handle;
}
+ public void markAsDeleted() {
+ handle = null;
+ }
+
+ public boolean fileHasBeenDeleted() {
+ return handle == null;
+ }
+
public int incReferenceCount() {
return refCount.incrementAndGet();
}