[ASTERIXDB-2367][STO] Various Fixes for BufferCache

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Ensure fileInfo is deleted from FileInfoMap when delete a file from
buffer cache
- When delete old components after merge, ensure components are first
purged to avoid sweep old pages
- When open/create/delete files from buffer cache, use file-level
synchronization to reduce blocking. Certain operations such as
createFile/deleteFile can take long time if the disk is busy.

Change-Id: I8a199d3e83592425ab5055bd12bde519e80deb13
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2604
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 7de716a..1f845bf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -71,7 +71,7 @@
      * Mutables
      */
     private int workspaceIndex;
-    private IFileDeviceResolver deviceComputer;
+    private final IFileDeviceResolver deviceComputer;
 
     public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer) throws HyracksDataException {
         this.ioDevices = Collections.unmodifiableList(devices);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
index 908af86..329a54b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
@@ -59,13 +59,6 @@
     }
 
     @Override
-    public void deactivateAndDestroy() throws HyracksDataException {
-        super.deactivateAndDestroy();
-        getBloomFilter().deactivate();
-        getBloomFilter().destroy();
-    }
-
-    @Override
     public void destroy() throws HyracksDataException {
         super.destroy();
         getBloomFilter().destroy();
@@ -78,9 +71,8 @@
     }
 
     @Override
-    public void deactivateAndPurge() throws HyracksDataException {
-        super.deactivateAndPurge();
-        getBloomFilter().deactivate();
+    protected void purge() throws HyracksDataException {
+        super.purge();
         getBloomFilter().purge();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
index c2f52e0..cace9e5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
@@ -52,13 +52,6 @@
     }
 
     @Override
-    public void deactivateAndDestroy() throws HyracksDataException {
-        super.deactivateAndDestroy();
-        getBuddyIndex().deactivate();
-        getBuddyIndex().destroy();
-    }
-
-    @Override
     public void destroy() throws HyracksDataException {
         super.destroy();
         getBuddyIndex().destroy();
@@ -71,9 +64,8 @@
     }
 
     @Override
-    public void deactivateAndPurge() throws HyracksDataException {
-        super.deactivateAndPurge();
-        getBuddyIndex().deactivate();
+    protected void purge() throws HyracksDataException {
+        super.purge();
         getBuddyIndex().purge();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 0aa5775..aa312fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -175,9 +175,15 @@
     }
 
     @Override
-    public void deactivateAndDestroy() throws HyracksDataException {
-        getIndex().deactivate();
-        getIndex().destroy();
+    public final void deactivateAndDestroy() throws HyracksDataException {
+        deactivateAndPurge();
+        destroy();
+    }
+
+    @Override
+    public final void deactivateAndPurge() throws HyracksDataException {
+        deactivate();
+        purge();
     }
 
     @Override
@@ -190,9 +196,7 @@
         getIndex().deactivate();
     }
 
-    @Override
-    public void deactivateAndPurge() throws HyracksDataException {
-        getIndex().deactivate();
+    protected void purge() throws HyracksDataException {
         getIndex().purge();
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 9f82f02..55ed75e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -77,7 +77,7 @@
     private final Queue<BufferCacheHeaderHelper> headerPageCache = new ConcurrentLinkedQueue<>();
 
     //DEBUG
-    private Level fileOpsLevel = Level.DEBUG;
+    private static final Level fileOpsLevel = Level.DEBUG;
     private ArrayList<CachedPage> confiscatedPages;
     private Lock confiscateLock;
     private HashMap<CachedPage, StackTraceElement[]> confiscatedPagesOwner;
@@ -607,33 +607,36 @@
 
     void write(CachedPage cPage) throws HyracksDataException {
         BufferedFileHandle fInfo = getFileInfo(cPage);
+        if (fInfo == null) {
+            throw new IllegalStateException("Attempting to write non-existing file");
+        }
         // synchronize on fInfo to prevent the file handle from being deleted until the page is written.
         synchronized (fInfo) {
-            if (!fInfo.fileHasBeenDeleted()) {
-                ByteBuffer buf = cPage.buffer.duplicate();
-                final int totalPages = cPage.getFrameSizeMultiplier();
-                final int extraBlockPageId = cPage.getExtraBlockPageId();
-                final boolean contiguousLargePages = (BufferedFileHandle.getPageId(cPage.dpid) + 1) == extraBlockPageId;
-                BufferCacheHeaderHelper header = checkoutHeaderHelper();
-                try {
-                    buf.limit(contiguousLargePages ? pageSize * totalPages : pageSize);
-                    buf.position(0);
-                    long bytesWritten = ioManager.syncWrite(fInfo.getFileHandle(),
-                            getOffsetForPage(BufferedFileHandle.getPageId(cPage.dpid)),
-                            header.prepareWrite(cPage, buf));
+            ByteBuffer buf = cPage.buffer.duplicate();
+            final int totalPages = cPage.getFrameSizeMultiplier();
+            final int extraBlockPageId = cPage.getExtraBlockPageId();
+            final boolean contiguousLargePages = (BufferedFileHandle.getPageId(cPage.dpid) + 1) == extraBlockPageId;
+            BufferCacheHeaderHelper header = checkoutHeaderHelper();
+            try {
+                buf.limit(contiguousLargePages ? pageSize * totalPages : pageSize);
+                buf.position(0);
+                long bytesWritten = ioManager.syncWrite(fInfo.getFileHandle(),
+                        getOffsetForPage(BufferedFileHandle.getPageId(cPage.dpid)), header.prepareWrite(cPage, buf));
 
-                    if (bytesWritten != (contiguousLargePages ? pageSize * (totalPages - 1) : 0)
-                            + getPageSizeWithHeader()) {
-                        throw new HyracksDataException("Failed to write completely: " + bytesWritten);
-                    }
-                } finally {
-                    returnHeaderHelper(header);
+                if (bytesWritten != (contiguousLargePages ? pageSize * (totalPages - 1) : 0)
+                        + getPageSizeWithHeader()) {
+                    throw new HyracksDataException("Failed to write completely: " + bytesWritten);
                 }
-                if (totalPages > 1 && !contiguousLargePages) {
-                    buf.limit(totalPages * pageSize);
-                    ioManager.syncWrite(fInfo.getFileHandle(), getOffsetForPage(extraBlockPageId), buf);
-                }
-                assert buf.capacity() == (pageSize * totalPages);
+            } finally {
+                returnHeaderHelper(header);
+            }
+            if (totalPages > 1 && !contiguousLargePages) {
+                buf.limit(totalPages * pageSize);
+                ioManager.syncWrite(fInfo.getFileHandle(), getOffsetForPage(extraBlockPageId), buf);
+            }
+            if (buf.capacity() != pageSize * totalPages) {
+                throw new IllegalStateException("Illegal number of bytes written, expected bytes written: "
+                        + pageSize * totalPages + " actual bytes writte: " + buf.capacity());
             }
         }
     }
@@ -786,11 +789,8 @@
         synchronized (fileInfoMap) {
             fileInfoMap.forEach((key, value) -> {
                 try {
-                    boolean fileHasBeenDeleted = value.fileHasBeenDeleted();
-                    sweepAndFlush(key, !fileHasBeenDeleted);
-                    if (!fileHasBeenDeleted) {
-                        ioManager.close(value.getFileHandle());
-                    }
+                    sweepAndFlush(key, true);
+                    ioManager.close(value.getFileHandle());
                 } catch (HyracksDataException e) {
                     if (LOGGER.isWarnEnabled()) {
                         LOGGER.log(Level.WARN, "Error flushing file id: " + key, e);
@@ -827,15 +827,15 @@
         if (LOGGER.isEnabled(fileOpsLevel)) {
             LOGGER.log(fileOpsLevel, "Opening file: " + fileRef + " in cache: " + this);
         }
-        int fileId;
+        int fileId = -1;
         synchronized (fileInfoMap) {
             if (fileMapManager.isMapped(fileRef)) {
                 fileId = fileMapManager.lookupFileId(fileRef);
             } else {
                 fileId = fileMapManager.registerFile(fileRef);
             }
-            openFile(fileId);
         }
+        openFile(fileId);
         return fileId;
     }
 
@@ -844,42 +844,60 @@
         if (LOGGER.isEnabled(fileOpsLevel)) {
             LOGGER.log(fileOpsLevel, "Opening file: " + fileId + " in cache: " + this);
         }
-        synchronized (fileInfoMap) {
-            BufferedFileHandle fInfo;
-            fInfo = fileInfoMap.get(fileId);
-            if (fInfo == null) {
-                boolean unreferencedFileFound = true;
-                while (fileInfoMap.size() >= maxOpenFiles && unreferencedFileFound) {
-                    // map is full, make room by cleaning up unreferenced files
-                    unreferencedFileFound = false;
-                    for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
-                        if (entry.getValue().getReferenceCount() <= 0) {
-                            int entryFileId = entry.getKey();
-                            boolean fileHasBeenDeleted = entry.getValue().fileHasBeenDeleted();
-                            sweepAndFlush(entryFileId, !fileHasBeenDeleted);
-                            if (!fileHasBeenDeleted) {
-                                ioManager.close(entry.getValue().getFileHandle());
-                            }
-                            fileInfoMap.remove(entryFileId);
-                            unreferencedFileFound = true;
-                            // for-each iterator is invalid because we changed
-                            // fileInfoMap
-                            break;
+        BufferedFileHandle fInfo = null;
+        try {
+            fInfo = getOrCreateFileHandle(fileId);
+            if (fInfo.getFileHandle() == null) {
+                // a new file
+                synchronized (fInfo) {
+                    // prevent concurrent opening of the same file
+                    if (fInfo.getFileHandle() == null) {
+                        if (fileInfoMap.size() > maxOpenFiles) {
+                            closeOpeningFiles(fInfo);
                         }
+                        // create, open, and map new file reference
+                        FileReference fileRef = fileMapManager.lookupFileName(fileId);
+                        IFileHandle fh = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+                                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                        fInfo.setFileHandle(fh);
                     }
                 }
-                if (fileInfoMap.size() >= maxOpenFiles) {
-                    throw new HyracksDataException("Could not open fileId " + fileId + ". Max number of files "
-                            + maxOpenFiles + " already opened and referenced.");
-                }
-                // create, open, and map new file reference
-                FileReference fileRef = fileMapManager.lookupFileName(fileId);
-                IFileHandle fh = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
-                        IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-                fInfo = new BufferedFileHandle(fileId, fh);
-                fileInfoMap.put(fileId, fInfo);
             }
             fInfo.incReferenceCount();
+        } catch (Exception e) {
+            removeFileInfo(fileId);
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void closeOpeningFiles(BufferedFileHandle newFileHandle) throws HyracksDataException {
+        synchronized (fileInfoMap) {
+            boolean unreferencedFileFound = true;
+            while (fileInfoMap.size() > maxOpenFiles && unreferencedFileFound) {
+                // map is full, make room by cleaning up unreferenced files
+                unreferencedFileFound = false;
+                for (Map.Entry<Integer, BufferedFileHandle> entry : fileInfoMap.entrySet()) {
+                    BufferedFileHandle fh = entry.getValue();
+                    if (fh != newFileHandle && fh.getReferenceCount() <= 0) {
+                        if (fh.getReferenceCount() < 0) {
+                            throw new IllegalStateException("Illegal reference count " + fh.getReferenceCount()
+                                    + " of file " + fh.getFileHandle().getFileReference());
+                        }
+                        int entryFileId = entry.getKey();
+                        sweepAndFlush(entryFileId, true);
+                        ioManager.close(entry.getValue().getFileHandle());
+                        fileInfoMap.remove(entryFileId);
+                        unreferencedFileFound = true;
+                        // for-each iterator is invalid because we changed
+                        // fileInfoMap
+                        break;
+                    }
+                }
+            }
+            if (fileInfoMap.size() > maxOpenFiles) {
+                throw new HyracksDataException("Could not open fileId " + newFileHandle.getFileId()
+                        + ". Max number of files " + maxOpenFiles + " already opened and referenced.");
+            }
         }
     }
 
@@ -977,15 +995,19 @@
 
     @Override
     public void deleteFile(FileReference fileRef) throws HyracksDataException {
+        boolean mapped = false;
+        int fileId = -1;
         synchronized (fileInfoMap) {
             if (fileMapManager.isMapped(fileRef)) {
-                int fileId = fileMapManager.lookupFileId(fileRef);
-                deleteFile(fileId);
-                return;
-            } else {
-                IoUtil.delete(fileRef);
+                mapped = true;
+                fileId = fileMapManager.lookupFileId(fileRef);
             }
         }
+        if (mapped) {
+            deleteFile(fileId);
+        } else {
+            IoUtil.delete(fileRef);
+        }
     }
 
     @Override
@@ -993,35 +1015,30 @@
         if (LOGGER.isEnabled(fileOpsLevel)) {
             LOGGER.log(fileOpsLevel, "Deleting file: " + fileId + " in cache: " + this);
         }
-        synchronized (fileInfoMap) {
-            sweepAndFlush(fileId, false);
-            BufferedFileHandle fInfo = null;
+        BufferedFileHandle fInfo = removeFileInfo(fileId);
+        if (fInfo == null) {
+            return;
+        }
+        sweepAndFlush(fileId, false);
+        try {
+            if (fInfo.getReferenceCount() > 0) {
+                throw new HyracksDataException("Deleting open file");
+            }
+        } finally {
+            FileReference fileRef = null;
             try {
-                fInfo = fileInfoMap.get(fileId);
-                if (fInfo != null && fInfo.getReferenceCount() > 0) {
-                    throw new HyracksDataException("Deleting open file");
+                synchronized (fileInfoMap) {
+                    fileRef = fileMapManager.unregisterFile(fileId);
                 }
-            } catch (Exception e) {
-                throw HyracksDataException.create(e);
             } finally {
-                FileReference fileRef = fileMapManager.unregisterFile(fileId);
                 try {
-                    if (fInfo != null) {
-                        // Mark the fInfo as deleted,
-                        // such that when its pages are reclaimed in openFile(),
-                        // the pages are not flushed to disk but only invalidated.
-                        synchronized (fInfo) {
-                            if (!fInfo.fileHasBeenDeleted()) {
-                                ioManager.close(fInfo.getFileHandle());
-                                fInfo.markAsDeleted();
-                            }
-                        }
-                    }
+                    ioManager.close(fInfo.getFileHandle());
                 } finally {
                     IoUtil.delete(fileRef);
                 }
             }
         }
+
     }
 
     @Override
@@ -1264,6 +1281,18 @@
         return null;
     }
 
+    private BufferedFileHandle getOrCreateFileHandle(int fileId) {
+        synchronized (fileInfoMap) {
+            return fileInfoMap.computeIfAbsent(fileId, id -> new BufferedFileHandle(fileId, null));
+        }
+    }
+
+    private BufferedFileHandle removeFileInfo(int fileId) {
+        synchronized (fileInfoMap) {
+            return fileInfoMap.remove(fileId);
+        }
+    }
+
     private ICachedPage getPageLoop(long dpid, int multiplier, boolean confiscate) throws HyracksDataException {
         final long startingPinCount = DEBUG ? masterPinCount.get() : -1;
         int cycleCount = 0;
@@ -1404,14 +1433,14 @@
 
     @Override
     public void purgeHandle(int fileId) throws HyracksDataException {
-        synchronized (fileInfoMap) {
-            BufferedFileHandle fh = fileInfoMap.get(fileId);
-            if (fh != null) {
-                ioManager.close(fh.getFileHandle());
-                fileInfoMap.remove(fileId);
+        BufferedFileHandle fh = removeFileInfo(fileId);
+        if (fh != null) {
+            synchronized (fileInfoMap) {
                 fileMapManager.unregisterFile(fileId);
             }
+            ioManager.close(fh.getFileHandle());
         }
+
     }
 
     static class BufferCacheHeaderHelper {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
index 177128e..62e7888 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java
@@ -37,18 +37,14 @@
         return fileId;
     }
 
+    public void setFileHandle(IFileHandle fileHandle) {
+        this.handle = fileHandle;
+    }
+
     public IFileHandle getFileHandle() {
         return handle;
     }
 
-    public void markAsDeleted() {
-        handle = null;
-    }
-
-    public boolean fileHasBeenDeleted() {
-        return handle == null;
-    }
-
     public int incReferenceCount() {
         return refCount.incrementAndGet();
     }