[ASTERIXDB-2599][STO] Cleanup compression LAFs

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Cleanup compression LAFs after merge.
- Delete LAF files when merged components contain
  previous components.
- Make sure the recovery after non-graceful shutdown
  (in the middle of cleanup) deletes the component and its LAF.

Change-Id: I17adb6145f7bf77470fd82f04321faf7a4007bf7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3498
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index ca5f968..b5d1c79 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -148,9 +148,9 @@
             } else if (currentBTree.isWithin(lastBTree)
                     && (!hasBloomFilter || currentBloomFilter.isWithin(lastBloomFilter))) {
                 // Invalid files are completely contained in last interval.
-                delete(btreeFactory.getBufferCache(), currentBTree.getFullPath());
+                delete(btreeFactory.getBufferCache(), currentBTree.getFileRef());
                 if (hasBloomFilter) {
-                    delete(btreeFactory.getBufferCache(), currentBloomFilter.getFullPath());
+                    delete(btreeFactory.getBufferCache(), currentBloomFilter.getFileRef());
                 }
             } else {
                 // This scenario should not be possible.
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
index 8fb3751..81e6d10 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
@@ -139,9 +139,9 @@
             } else if (currentBTree.isWithin(lastBTree) && currentBuddyBTree.isWithin(lastBuddyBTree)
                     && currentBloomFilter.isWithin(lastBloomFilter)) {
                 // Invalid files are completely contained in last sequence.
-                delete(treeFactory.getBufferCache(), currentBTree.getFullPath());
-                delete(treeFactory.getBufferCache(), currentBuddyBTree.getFullPath());
-                delete(treeFactory.getBufferCache(), currentBloomFilter.getFullPath());
+                delete(treeFactory.getBufferCache(), currentBTree.getFileRef());
+                delete(treeFactory.getBufferCache(), currentBuddyBTree.getFileRef());
+                delete(treeFactory.getBufferCache(), currentBloomFilter.getFileRef());
             } else {
                 // This scenario should not be possible.
                 throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 7618264..35a11ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -189,7 +189,7 @@
             if (groundTruth.contains(cmpFileName.getSequence())) {
                 validFiles.add(cmpFileName);
             } else {
-                delete(bufferCache, cmpFileName.getFullPath());
+                delete(bufferCache, cmpFileName.getFileRef());
             }
         }
     }
@@ -256,7 +256,7 @@
                 // The current file is completely contained in the interval of the
                 // last file. Thus the last file must contain at least as much information
                 // as the current file, so delete the current file.
-                delete(treeFactory.getBufferCache(), current.getFullPath());
+                delete(treeFactory.getBufferCache(), current.getFileRef());
             } else {
                 // This scenario should not be possible since timestamps are monotonically increasing.
                 throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
@@ -363,8 +363,7 @@
         }
     }
 
-    protected void delete(IBufferCache bufferCache, String fullPath) throws HyracksDataException {
-        FileReference fileRef = ioManager.resolveAbsolutePath(fullPath);
+    protected void delete(IBufferCache bufferCache, FileReference fileRef) throws HyracksDataException {
         bufferCache.deleteFile(fileRef);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
index 4471102..3155801 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
@@ -149,9 +149,9 @@
             } else if (currentDeletedKeysBTree.isWithin(lastDeletedKeysBTree)
                     && currentDictBTree.isWithin(lastDictBTree) && currentBloomFilter.isWithin(lastBloomFilter)) {
                 // Invalid files are completely contained in last sequence.
-                delete(treeFactory.getBufferCache(), currentDeletedKeysBTree.getFullPath());
-                delete(treeFactory.getBufferCache(), currentDictBTree.getFullPath());
-                delete(treeFactory.getBufferCache(), currentBloomFilter.getFullPath());
+                delete(treeFactory.getBufferCache(), currentDeletedKeysBTree.getFileRef());
+                delete(treeFactory.getBufferCache(), currentDictBTree.getFileRef());
+                delete(treeFactory.getBufferCache(), currentBloomFilter.getFileRef());
             } else {
                 // This scenario should not be possible.
                 throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
index 3348407..81e8f83 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
@@ -144,9 +144,9 @@
             } else if (currentRTree.isWithin(lastRTree) && currentBTree.isWithin(lastBTree)
                     && currentBloomFilter.isWithin(lastBloomFilter)) {
                 // Invalid files are completely contained in last sequence.
-                delete(treeFactory.getBufferCache(), currentRTree.getFullPath());
-                delete(treeFactory.getBufferCache(), currentBTree.getFullPath());
-                delete(treeFactory.getBufferCache(), currentBloomFilter.getFullPath());
+                delete(treeFactory.getBufferCache(), currentRTree.getFileRef());
+                delete(treeFactory.getBufferCache(), currentBTree.getFileRef());
+                delete(treeFactory.getBufferCache(), currentBloomFilter.getFileRef());
             } else {
                 // This scenario should not be possible.
                 throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
index 47e7534..a060972 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java
@@ -168,7 +168,7 @@
      * Check whether the file has ever been opened
      *
      * @return
-     *         true if has ever been open, false o.w
+     *         true if has ever been opened, false otherwise
      */
     public final boolean hasBeenOpened() {
         return hasOpen;
@@ -185,7 +185,7 @@
             try {
                 bufferCache.createFile(cFileRef.getLAFFileReference());
             } catch (HyracksDataException e) {
-                //In case of creating the LAF file failed, delete fileRef
+                //In case of creating the LAF file failed, delete index file reference
                 IoUtil.delete(fileRef);
                 throw e;
             }
@@ -193,12 +193,37 @@
     }
 
     public static void deleteFile(FileReference fileRef) throws HyracksDataException {
-        IoUtil.delete(fileRef);
-        if (fileRef.isCompressed()) {
-            final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
-            if (cFileRef.getFile().exists()) {
-                IoUtil.delete(cFileRef.getLAFFileReference());
+        HyracksDataException savedEx = null;
+
+        /*
+         * LAF file has to be deleted before the index file.
+         * If the index file deleted first and a non-graceful shutdown happened before the deletion of
+         * the LAF file, the LAF file will not be deleted during the next recovery.
+         */
+        try {
+            if (fileRef.isCompressed()) {
+                final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
+                final FileReference lafFileRef = cFileRef.getLAFFileReference();
+                if (lafFileRef.getFile().exists()) {
+                    IoUtil.delete(lafFileRef);
+                }
             }
+        } catch (HyracksDataException e) {
+            savedEx = e;
+        }
+
+        try {
+            IoUtil.delete(fileRef);
+        } catch (HyracksDataException e) {
+            if (savedEx != null) {
+                savedEx.addSuppressed(e);
+            } else {
+                savedEx = e;
+            }
+        }
+
+        if (savedEx != null) {
+            throw savedEx;
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
index 70d1388..7d0cc62 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java
@@ -42,52 +42,128 @@
     protected static final int ENTRY_LENGTH = 16; //<offset(8-bytes),size(8-bytes)>
     protected static final int EOF = -1;
     private static final EnumSet<State> CLOSED = EnumSet.of(State.CLOSED);
-    private static final EnumSet<State> READABLE_WRITABLE = EnumSet.of(State.READABLE, State.WRITABLE);
+    private static final EnumSet<State> CLOSED_OR_INVALID = EnumSet.of(State.CLOSED, State.INVALID);
+    private static final EnumSet<State> OPEN = EnumSet.of(State.READABLE, State.WRITABLE);
+    private static final EnumSet<State> OPEN_OR_INVALID = EnumSet.of(State.READABLE, State.WRITABLE, State.INVALID);
     private static final EnumSet<State> READABLE = EnumSet.of(State.READABLE);
     private static final EnumSet<State> WRITABLE = EnumSet.of(State.WRITABLE);
 
     private enum State {
         READABLE,
         WRITABLE,
+        /*
+         * INVALID state means that the LAF does not exists.
+         * This could happened after deleting the LAF file but not the index file (e.g., non-graceful shutdown).
+         * To account for this case, we should indicate the index is not valid and the index should be deleted by the
+         * recovery manager.
+         */
+        INVALID,
         CLOSED
     }
 
     private final IBufferCache bufferCache;
-    private final int fileId;
     private final ICompressorDecompressor compressorDecompressor;
+    private final CompressedFileReference fileRef;
 
+    private int fileId;
     private State state;
     private int totalNumOfPages;
 
     private LAFWriter lafWriter;
 
-    public CompressedFileManager(IBufferCache bufferCache, int fileId, CompressedFileReference fileRef) {
+    public CompressedFileManager(IBufferCache bufferCache, CompressedFileReference fileRef) {
         state = State.CLOSED;
         totalNumOfPages = 0;
         this.bufferCache = bufferCache;
-        this.fileId = fileId;
+        this.fileRef = fileRef;
         this.compressorDecompressor = fileRef.getCompressorDecompressor();
     }
 
+    /* ************************
+     * File handling methods
+     * ************************
+     */
+
     /**
+     * Open LAF file
      * If the file is empty (i.e. the number of pages is zero)
      * Then the state will be WRITABLE.
      *
-     * @throws HyracksDataException
+     * @return
+     *         true if the file exists and was closed
+     *         false otherwise
+     * @throws IllegalStateException
+     *             if the the file is not in CLOSED state
      */
-    public void open() throws HyracksDataException {
+    public boolean open() throws HyracksDataException {
         ensureState(CLOSED);
+
+        boolean open = false;
+        if (fileRef.getLAFFileReference().getFile().exists()) {
+            fileId = bufferCache.openFile(fileRef.getLAFFileReference());
+            open = true;
+        } else {
+            fileId = -1;
+        }
         changeToFunctionalState();
+        return open;
     }
 
     /**
-     * Close the LAF file.
+     * Close LAF file
      *
-     * @throws HyracksDataException
+     * @return
+     *         true if the LAF file exists and was OPEN
+     *         false otherwise
+     * @throws IllegalStateException
+     *             if the the file is not in OPEN state
      */
-    public void close() {
-        ensureState(READABLE_WRITABLE);
-        state = State.CLOSED;
+    public boolean close() throws HyracksDataException {
+        ensureState(OPEN_OR_INVALID);
+
+        boolean closed = false;
+        if (state != State.INVALID) {
+            bufferCache.closeFile(fileId);
+            state = State.CLOSED;
+            closed = true;
+        }
+        return closed;
+    }
+
+    /**
+     * Close and purge LAF file
+     *
+     * @throws IllegalStateException
+     *             if the the file is not in OPEN state
+     */
+    public void purge() throws HyracksDataException {
+        if (close()) {
+            bufferCache.purgeHandle(fileId);
+        }
+    }
+
+    /**
+     * Delete LAF file
+     *
+     * @throws IllegalStateException
+     *             if the the file is not in CLOSED state
+     */
+    public void delete() throws HyracksDataException {
+        ensureState(CLOSED_OR_INVALID);
+        if (state != State.INVALID) {
+            bufferCache.deleteFile(fileId);
+        }
+    }
+
+    /**
+     * Force LAF file content to disk
+     *
+     * @throws IllegalStateException
+     *             if the the file is not in CLOSED state
+     */
+    public void force(boolean metadata) throws HyracksDataException {
+        ensureState(OPEN);
+        bufferCache.force(fileId, metadata);
     }
 
     /* ************************
@@ -95,6 +171,13 @@
      * ************************
      */
 
+    /*
+     * Should be only visible to LAFWriter
+     */
+    int getFileId() {
+        return fileId;
+    }
+
     public ICompressedPageWriter getCompressedPageWriter() {
         ensureState(WRITABLE);
         return lafWriter;
@@ -124,7 +207,8 @@
      * @param extraPageIndex
      *            the index of the extra page (starting from 0)
      * @return offset for the compressed page.
-     * @throws HyracksDataException
+     * @throws IllegalStateException
+     *             If the file is not in WRITABLE state
      */
     public long writeExtraPageInfo(int extraPageId, long size, int extraPageIndex) throws HyracksDataException {
         ensureState(WRITABLE);
@@ -146,7 +230,8 @@
      *
      * @param totalNumOfPages
      *            The total number of pages of the index
-     * @throws HyracksDataException
+     * @throws IllegalStateException
+     *             If the file is not in WRITABLE state
      */
     void endWriting(int totalNumOfPages) {
         ensureState(WRITABLE);
@@ -164,7 +249,6 @@
      * Set the compressed page offset and size
      *
      * @param compressedPage
-     * @throws HyracksDataException
      */
     public void setCompressedPageInfo(ICachedPageInternal compressedPage) throws HyracksDataException {
         setCompressedPageInfo(BufferedFileHandle.getPageId(compressedPage.getDiskPageId()), compressedPage);
@@ -175,7 +259,6 @@
      *
      * @param compressedPage
      * @param extraPageIndex
-     * @throws HyracksDataException
      */
     public void setExtraCompressedPageInfo(ICachedPageInternal compressedPage, int extraPageIndex)
             throws HyracksDataException {
@@ -189,17 +272,11 @@
 
     /**
      * Get the number of compressed pages
-     *
-     * @return
      */
     public int getNumberOfPages() {
         return totalNumOfPages;
     }
 
-    public int getFileId() {
-        return fileId;
-    }
-
     public ICompressorDecompressor getCompressorDecompressor() {
         return compressorDecompressor;
     }
@@ -217,7 +294,9 @@
     }
 
     private void changeToFunctionalState() throws HyracksDataException {
-        if (bufferCache.getNumPagesOfFile(fileId) == 0) {
+        if (fileId == -1) {
+            state = State.INVALID;
+        } else if (bufferCache.getNumPagesOfFile(fileId) == 0) {
             state = State.WRITABLE;
             lafWriter = new LAFWriter(this, bufferCache);
         } else {
@@ -281,5 +360,4 @@
             bufferCache.unpin(page);
         }
     }
-
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index 235e144..b6178c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -157,9 +157,7 @@
     @Override
     public void open(FileReference fileRef) throws HyracksDataException {
         final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
-        final int lafFileId = bufferCache.openFile(cFileRef.getLAFFileReference());
-
-        compressedFileManager = new CompressedFileManager(bufferCache, lafFileId, cFileRef);
+        compressedFileManager = new CompressedFileManager(bufferCache, cFileRef);
         compressedFileManager.open();
         super.open(fileRef);
     }
@@ -173,23 +171,20 @@
     public void close() throws HyracksDataException {
         if (hasBeenOpened()) {
             compressedFileManager.close();
-            bufferCache.closeFile(compressedFileManager.getFileId());
         }
         super.close();
     }
 
     @Override
     public void purge() throws HyracksDataException {
+        compressedFileManager.purge();
         super.purge();
-        compressedFileManager.close();
-        bufferCache.closeFile(compressedFileManager.getFileId());
-        bufferCache.purgeHandle(compressedFileManager.getFileId());
     }
 
     @Override
     public void markAsDeleted() throws HyracksDataException {
         if (hasBeenOpened()) {
-            bufferCache.deleteFile(compressedFileManager.getFileId());
+            compressedFileManager.delete();
             compressedFileManager = null;
         } else {
             bufferCache.deleteFile(lafFileRef);
@@ -199,8 +194,8 @@
 
     @Override
     public void force(boolean metadata) throws HyracksDataException {
+        compressedFileManager.force(metadata);
         super.force(metadata);
-        bufferCache.force(compressedFileManager.getFileId(), metadata);
     }
 
     @Override