[ASTERIXDB-2367][STO] Various Fixes for BufferCache
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Ensure fileInfo is deleted from FileInfoMap when delete a file from
buffer cache
- When delete old components after merge, ensure components are first
purged to avoid sweep old pages
- When open/create/delete files from buffer cache, use file-level
synchronization to reduce blocking. Certain operations such as
createFile/deleteFile can take long time if the disk is busy.
Change-Id: I8a199d3e83592425ab5055bd12bde519e80deb13
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2604
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 7de716a..1f845bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -71,7 +71,7 @@
* Mutables
*/
private int workspaceIndex;
- private IFileDeviceResolver deviceComputer;
+ private final IFileDeviceResolver deviceComputer;
public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer) throws HyracksDataException {
this.ioDevices = Collections.unmodifiableList(devices);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
index 908af86..329a54b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
@@ -59,13 +59,6 @@
}
@Override
- public void deactivateAndDestroy() throws HyracksDataException {
- super.deactivateAndDestroy();
- getBloomFilter().deactivate();
- getBloomFilter().destroy();
- }
-
- @Override
public void destroy() throws HyracksDataException {
super.destroy();
getBloomFilter().destroy();
@@ -78,9 +71,8 @@
}
@Override
- public void deactivateAndPurge() throws HyracksDataException {
- super.deactivateAndPurge();
- getBloomFilter().deactivate();
+ protected void purge() throws HyracksDataException {
+ super.purge();
getBloomFilter().purge();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
index c2f52e0..cace9e5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
@@ -52,13 +52,6 @@
}
@Override
- public void deactivateAndDestroy() throws HyracksDataException {
- super.deactivateAndDestroy();
- getBuddyIndex().deactivate();
- getBuddyIndex().destroy();
- }
-
- @Override
public void destroy() throws HyracksDataException {
super.destroy();
getBuddyIndex().destroy();
@@ -71,9 +64,8 @@
}
@Override
- public void deactivateAndPurge() throws HyracksDataException {
- super.deactivateAndPurge();
- getBuddyIndex().deactivate();
+ protected void purge() throws HyracksDataException {
+ super.purge();
getBuddyIndex().purge();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 0aa5775..aa312fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -175,9 +175,15 @@
}
@Override
- public void deactivateAndDestroy() throws HyracksDataException {
- getIndex().deactivate();
- getIndex().destroy();
+ public final void deactivateAndDestroy() throws HyracksDataException {
+ deactivateAndPurge();
+ destroy();
+ }
+
+ @Override
+ public final void deactivateAndPurge() throws HyracksDataException {
+ deactivate();
+ purge();
}
@Override
@@ -190,9 +196,7 @@
getIndex().deactivate();
}
- @Override
- public void deactivateAndPurge() throws HyracksDataException {
- getIndex().deactivate();
+ protected void purge() throws HyracksDataException {
getIndex().purge();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 9f82f02..55ed75e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -77,7 +77,7 @@
private final Queue<BufferCacheHeaderHelper> headerPageCache = new ConcurrentLinkedQueue<>();
//DEBUG
- private Level fileOpsLevel = Level.DEBUG;
+ private static final Level fileOpsLevel = Level.DEBUG;
private ArrayList<CachedPage> confiscatedPages;
private Lock confiscateLock;
private HashMap<CachedPage, StackTraceElement[]> confiscatedPagesOwner;
@@ -607,33 +607,36 @@
void write(CachedPage cPage) throws HyracksDataException {
BufferedFileHandle fInfo = getFileInfo(cPage);
+ if (fInfo == null) {
+ throw new IllegalStateException("Attempting to write non-existing file");
+ }
// synchronize on fInfo to prevent the file handle from being deleted until the page is written.
synchronized (fInfo) {
- if (!fInfo.fileHasBeenDeleted()) {
- ByteBuffer buf = cPage.buffer.duplicate();
- final int totalPages = cPage.getFrameSizeMultiplier();
- final int extraBlockPageId = cPage.getExtraBlockPageId();
- final boolean contiguousLargePages = (BufferedFileHandle.getPageId(cPage.dpid) + 1) == extraBlockPageId;
- BufferCacheHeaderHelper header = checkoutHeaderHelper();
- try {
- buf.limit(contiguousLargePages ? pageSize * totalPages : pageSize);
- buf.position(0);
- long bytesWritten = ioManager.syncWrite(fInfo.getFileHandle(),
- getOffsetForPage(BufferedFileHandle.getPageId(cPage.dpid)),
- header.prepareWrite(cPage, buf));
+ ByteBuffer buf = cPage.buffer.duplicate();
+ final int totalPages = cPage.getFrameSizeMultiplier();
+ final int extraBlockPageId = cPage.getExtraBlockPageId();
+ final boolean contiguousLargePages = (BufferedFileHandle.getPageId(cPage.dpid) + 1) == extraBlockPageId;
+ BufferCacheHeaderHelper header = checkoutHeaderHelper();
+ try {
+ buf.limit(contiguousLargePages ? pageSize * totalPages : pageSize);
+ buf.position(0);
+ long bytesWritten = ioManager.syncWrite(fInfo.getFileHandle(),
+ getOffsetForPage(BufferedFileHandle.getPageId(cPage.dpid)), header.prepareWrite(cPage, buf));
- if (bytesWritten != (contiguousLargePages ? pageSize * (totalPages - 1) : 0)
- + getPageSizeWithHeader()) {
- throw new HyracksDataException("Failed to write completely: " + bytesWritten);
- }
- } finally {
- returnHeaderHelper(header);
+ if (bytesWritten != (contiguousLargePages ? pageSize * (totalPages - 1) : 0)
+ + getPageSizeWithHeader()) {
+ throw new HyracksDataException("Failed to write completely: " + bytesWritten);
}
- if (totalPages > 1 && !contiguousLargePages) {
- buf.limit(totalPages * pageSize);
- ioManager.syncWrite(fInfo.getFileHandle(), getOffsetForPage(extraBlockPageId), buf);
- }
- assert buf.capacity() == (pageSize * totalPages);
+ } finally {
+ returnHeaderHelper(header);
+ }
+ if (totalPages > 1 && !contiguousLargePages) {
+ buf.limit(totalPages * pageSize);
+ ioManager.syncWrite(fInfo.getFileHandle(), getOffsetForPage(extraBlockPageId), buf);
+ }
+ if (buf.capacity() != pageSize * totalPages) {
+ throw new IllegalStateException("Illegal number of bytes written, expected bytes written: "
+ + pageSize * totalPages + " actual bytes writte: " + buf.capacity());
}
}
}
@@ -786,11 +789,8 @@
synchronized (fileInfoMap) {
fileInfoMap.forEach((key, value) -> {
try {
- boolean fileHasBeenDeleted = value.fileHasBeenDeleted();
- sweepAndFlush(key, !fileHasBeenDeleted);
- if (!fileHasBeenDeleted) {
- ioManager.close(value.getFileHandle());
- }
+ sweepAndFlush(key, true);
+ ioManager.close(value.getFileHandle());
} catch (HyracksDataException e) {
if (LOGGER.isWarnEnabled()) {
LOGGER.log(Level.WARN, "Error flushing file id: " + key, e);
@@ -827,15 +827,15 @@
if (LOGGER.isEnabled(fileOpsLevel)) {
LOGGER.log(fileOpsLevel, "Opening file: " + fileRef + " in cache: " + this);
}
- int fileId;
+ int fileId = -1;
synchronized (fileInfoMap) {
if (fileMapManager.isMapped(fileRef)) {
fileId = fileMapManager.lookupFileId(fileRef);
} else {
fileId = fileMapManager.registerFile(fileRef);
}
- openFile(fileId);
}
+ openFile(fileId);
return fileId;
}
@@ -844,42 +844,60 @@
if (LOGGER.isEnabled(fileOpsLevel)) {
LOGGER.log(fileOpsLevel, "Opening file: " + fileId + " in cache: " + this);
}
- synchronized (fileInfoMap) {
- BufferedFileHandle fInfo;
- fInfo = fileInfoMap.get(fileId);
- if (fInfo == null) {
- boolean unreferencedFileFound = true;
- while (fileInfoMap.size() >= maxOpenFiles && unreferencedFileFound) {
- // map is full, make room by cleaning up unreferenced files
- unreferencedFileFound = false;
- for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
- if (entry.getValue().getReferenceCount() <= 0) {
- int entryFileId = entry.getKey();
- boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
- sweepAndFlush(entryFileId, !fileHasBeenDeleted);
- if (!fileHasBeenDeleted) {
- ioManager.close(entry.getValue().getFileHandle());
- }
- fileInfoMap.remove(entryFileId);
- unreferencedFileFound = true;
- // for-each iterator is invalid because we changed
- // fileInfoMap
- break;
+ BufferedFileHandle fInfo = null;
+ try {
+ fInfo = getOrCreateFileHandle(fileId);
+ if (fInfo.getFileHandle() == null) {
+ // a new file
+ synchronized (fInfo) {
+ // prevent concurrent opening of the same file
+ if (fInfo.getFileHandle() == null) {
+ if (fileInfoMap.size() > maxOpenFiles) {
+ closeOpeningFiles(fInfo);
}
+ // create, open, and map new file reference
+ FileReference fileRef = fileMapManager.lookupFileName(fileId);
+ IFileHandle fh = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ fInfo.setFileHandle(fh);
}
}
- if (fileInfoMap.size() >= maxOpenFiles) {
- throw new HyracksDataException("Could not open fileId " + fileId + ". Max number of files "
- + maxOpenFiles + " already opened and referenced.");
- }
- // create, open, and map new file reference
- FileReference fileRef = fileMapManager.lookupFileName(fileId);
- IFileHandle fh = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- fInfo = new BufferedFileHandle(fileId, fh);
- fileInfoMap.put(fileId, fInfo);
}
fInfo.incReferenceCount();
+ } catch (Exception e) {
+ removeFileInfo(fileId);
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void closeOpeningFiles(BufferedFileHandle newFileHandle) throws HyracksDataException {
+ synchronized (fileInfoMap) {
+ boolean unreferencedFileFound = true;
+ while (fileInfoMap.size() > maxOpenFiles && unreferencedFileFound) {
+ // map is full, make room by cleaning up unreferenced files
+ unreferencedFileFound = false;
+ for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
+ BufferedFileHandle fh = entry.getValue();
+ if (fh != newFileHandle && fh.getReferenceCount() <= 0) {
+ if (fh.getReferenceCount() < 0) {
+ throw new IllegalStateException("Illegal reference count " + fh.getReferenceCount()
+ + " of file " + fh.getFileHandle().getFileReference());
+ }
+ int entryFileId = entry.getKey();
+ sweepAndFlush(entryFileId, true);
+ ioManager.close(entry.getValue().getFileHandle());
+ fileInfoMap.remove(entryFileId);
+ unreferencedFileFound = true;
+ // for-each iterator is invalid because we changed
+ // fileInfoMap
+ break;
+ }
+ }
+ }
+ if (fileInfoMap.size() > maxOpenFiles) {
+ throw new HyracksDataException("Could not open fileId " + newFileHandle.getFileId()
+ + ". Max number of files " + maxOpenFiles + " already opened and referenced.");
+ }
}
}
@@ -977,15 +995,19 @@
@Override
public void deleteFile(FileReference fileRef) throws HyracksDataException {
+ boolean mapped = false;
+ int fileId = -1;
synchronized (fileInfoMap) {
if (fileMapManager.isMapped(fileRef)) {
- int fileId = fileMapManager.lookupFileId(fileRef);
- deleteFile(fileId);
- return;
- } else {
- IoUtil.delete(fileRef);
+ mapped = true;
+ fileId = fileMapManager.lookupFileId(fileRef);
}
}
+ if (mapped) {
+ deleteFile(fileId);
+ } else {
+ IoUtil.delete(fileRef);
+ }
}
@Override
@@ -993,35 +1015,30 @@
if (LOGGER.isEnabled(fileOpsLevel)) {
LOGGER.log(fileOpsLevel, "Deleting file: " + fileId + " in cache: " + this);
}
- synchronized (fileInfoMap) {
- sweepAndFlush(fileId, false);
- BufferedFileHandle fInfo = null;
+ BufferedFileHandle fInfo = removeFileInfo(fileId);
+ if (fInfo == null) {
+ return;
+ }
+ sweepAndFlush(fileId, false);
+ try {
+ if (fInfo.getReferenceCount() > 0) {
+ throw new HyracksDataException("Deleting open file");
+ }
+ } finally {
+ FileReference fileRef = null;
try {
- fInfo = fileInfoMap.get(fileId);
- if (fInfo != null && fInfo.getReferenceCount() > 0) {
- throw new HyracksDataException("Deleting open file");
+ synchronized (fileInfoMap) {
+ fileRef = fileMapManager.unregisterFile(fileId);
}
- } catch (Exception e) {
- throw HyracksDataException.create(e);
} finally {
- FileReference fileRef = fileMapManager.unregisterFile(fileId);
try {
- if (fInfo != null) {
- // Mark the fInfo as deleted,
- // such that when its pages are reclaimed in openFile(),
- // the pages are not flushed to disk but only invalidated.
- synchronized (fInfo) {
- if (!fInfo.fileHasBeenDeleted()) {
- ioManager.close(fInfo.getFileHandle());
- fInfo.markAsDeleted();
- }
- }
- }
+ ioManager.close(fInfo.getFileHandle());
} finally {
IoUtil.delete(fileRef);
}
}
}
+
}
@Override
@@ -1264,6 +1281,18 @@
return null;
}
+ private BufferedFileHandle getOrCreateFileHandle(int fileId) {
+ synchronized (fileInfoMap) {
+ return fileInfoMap.computeIfAbsent(fileId, id -> new BufferedFileHandle(fileId, null));
+ }
+ }
+
+ private BufferedFileHandle removeFileInfo(int fileId) {
+ synchronized (fileInfoMap) {
+ return fileInfoMap.remove(fileId);
+ }
+ }
+
private ICachedPage getPageLoop(long dpid, int multiplier, boolean confiscate) throws HyracksDataException {
final long startingPinCount = DEBUG ? masterPinCount.get() : -1;
int cycleCount = 0;
@@ -1404,14 +1433,14 @@
@Override
public void purgeHandle(int fileId) throws HyracksDataException {
- synchronized (fileInfoMap) {
- BufferedFileHandle fh = fileInfoMap.get(fileId);
- if (fh != null) {
- ioManager.close(fh.getFileHandle());
- fileInfoMap.remove(fileId);
+ BufferedFileHandle fh = removeFileInfo(fileId);
+ if (fh != null) {
+ synchronized (fileInfoMap) {
fileMapManager.unregisterFile(fileId);
}
+ ioManager.close(fh.getFileHandle());
}
+
}
static class BufferCacheHeaderHelper {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
index 177128e..62e7888 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
@@ -37,18 +37,14 @@
return fileId;
}
+ public void setFileHandle(IFileHandle fileHandle) {
+ this.handle = fileHandle;
+ }
+
public IFileHandle getFileHandle() {
return handle;
}
- public void markAsDeleted() {
- handle = null;
- }
-
- public boolean fileHasBeenDeleted() {
- return handle == null;
- }
-
public int incReferenceCount() {
return refCount.incrementAndGet();
}