[ASTERIXDB-3431][STO] Move index cleanup to IO scheduler

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Clean up inactive memory components before inactive disk
  components.
- Halt on unexpected failures while cleaning up the inactive
  memory components to avoid getting stuck on an invalid component
  state indefinitely.
- Move destroying inactive disk components to the IO scheduler
  similar to other LSM IO operations.

Change-Id: I0b1a8f7b8ce9cf7a44593fa6d45eb552ac029796
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18375
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
index bb689ac..329ccaf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
@@ -41,8 +41,18 @@
     @Override
     public void operationFailed(ILSMIOOperation operation, Throwable t) {
         LOGGER.error("Operation {} has failed", operation, t);
-        if (operation.getIOOpertionType() != ILSMIOOperation.LSMIOOperationType.REPLICATE) {
+        if (haltOnFailure(operation)) {
             ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
         }
     }
+
+    private boolean haltOnFailure(ILSMIOOperation operation) {
+        switch (operation.getIOOpertionType()) {
+            case CLEANUP:
+            case REPLICATE:
+                return false;
+            default:
+                return true;
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index 40998b7..822df6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -38,7 +38,8 @@
         MERGE,
         LOAD,
         NOOP,
-        REPLICATE
+        REPLICATE,
+        CLEANUP
     }
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 0869fed..43227ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -229,4 +229,11 @@
      */
     IIndexDiskCacheManager getDiskCacheManager();
 
+    /**
+     * Schedules a cleanup opeartion on the components {@code inactiveDiskComponents}
+     *
+     * @param inactiveDiskComponents
+     * @throws HyracksDataException
+     */
+    void scheduleCleanup(List<ILSMDiskComponent> inactiveDiskComponents) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
index 049da38..3a8e558 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
@@ -62,6 +62,9 @@
             case REPLICATE:
                 scheduleReplicate(operation);
                 break;
+            case CLEANUP:
+                scheduleCleanup(operation);
+                break;
             case NOOP:
                 break;
             default:
@@ -83,7 +86,7 @@
             case REPLICATE:
                 completeReplicate(operation);
                 break;
-            case NOOP:
+            case CLEANUP, NOOP:
                 return;
             default:
                 // this should never happen
@@ -169,6 +172,10 @@
         }
     }
 
+    private void scheduleCleanup(ILSMIOOperation operation) {
+        executor.submit(operation);
+    }
+
     private void completeReplicate(ILSMIOOperation operation) {
         String id = operation.getIndexIdentifier();
         synchronized (executor) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 3656e4f..f7a15fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -118,6 +118,7 @@
     private final boolean atomic;
     private final List<ILSMDiskComponent> temporaryDiskComponents;
     private final ILSMMergePolicy mergePolicy;
+    private final ILSMIOOperationScheduler ioScheduler;
 
     public AbstractLSMIndex(IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches,
             IBufferCache diskBufferCache, ILSMIndexFileManager fileManager, double bloomFilterFalsePositiveRate,
@@ -148,6 +149,7 @@
         this.atomic = atomic;
         this.temporaryDiskComponents = new ArrayList<>();
         this.mergePolicy = mergePolicy;
+        this.ioScheduler = ioScheduler;
 
         fileManager.initLastUsedSeq(ioOpCallback.getLastValidSequence());
         lsmHarness = new LSMHarness(this, ioScheduler, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled(),
@@ -450,6 +452,13 @@
         return mergeOp;
     }
 
+    @Override
+    public void scheduleCleanup(List<ILSMDiskComponent> inactiveDiskComponents) throws HyracksDataException {
+        LSMCleanupOperation cleanupOperation = new LSMCleanupOperation(this, ioOpCallback, inactiveDiskComponents);
+        ioOpCallback.scheduled(cleanupOperation);
+        ioScheduler.scheduleOperation(cleanupOperation);
+    }
+
     private static void propagateMap(ILSMIndexOperationContext src, ILSMIndexOperationContext destination) {
         Map<String, Object> map = src.getParameters();
         if (map != null && !map.isEmpty()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java
index 2a48627..7e6e70b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java
@@ -74,12 +74,12 @@
             fail(executedOp, t != null ? t : executedOp.getFailure());
         }
         if (!failed || executedOp.getIOOpertionType() != LSMIOOperationType.FLUSH) {
-            executedOp.complete(); // destroy if merge or successful flush
+            executedOp.complete(); // destroy if merge, cleanup, or successful flush
         }
         scheduler.completeOperation(executedOp);
     }
 
-    private void fail(ILSMIOOperation executedOp, Throwable t) throws HyracksDataException {
+    private void fail(ILSMIOOperation executedOp, Throwable t) {
         callback.operationFailed(executedOp, t);
         if (executedOp.getIOOpertionType() == LSMIOOperationType.FLUSH) {
             executedOp.complete();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMCleanupOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMCleanupOperation.java
new file mode 100644
index 0000000..3eb86b3
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMCleanupOperation.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class LSMCleanupOperation extends AbstractIoOperation {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final List<ILSMDiskComponent> inactiveDiskComponents;
+
+    public LSMCleanupOperation(ILSMIndex index, ILSMIOOperationCallback ioOpCallback,
+            List<ILSMDiskComponent> inactiveDiskComponents) {
+        super(null, null, ioOpCallback, index.getIndexIdentifier());
+        this.inactiveDiskComponents = inactiveDiskComponents;
+    }
+
+    @Override
+    public ILSMIOOperation.LSMIOOperationType getIOOpertionType() {
+        return LSMIOOperationType.CLEANUP;
+    }
+
+    @Override
+    public ILSMIOOperation.LSMIOOperationStatus call() throws HyracksDataException {
+        try {
+            LOGGER.debug("started cleanup operation on index {} to destroy components {}", getIndexIdentifier(),
+                    inactiveDiskComponents);
+            for (ILSMDiskComponent diskComponent : inactiveDiskComponents) {
+                diskComponent.deactivateAndDestroy();
+            }
+            LOGGER.debug("completed cleanup operation on index {} to destroy components {}", getIndexIdentifier(),
+                    inactiveDiskComponents);
+            return ILSMIOOperation.LSMIOOperationStatus.SUCCESS;
+        } catch (Exception e) {
+            setFailure(e);
+            return LSMIOOperationStatus.FAILURE;
+        }
+    }
+
+    @Override
+    public long getRemainingPages() {
+        return 0;
+    }
+
+    @Override
+    protected LSMComponentFileReferences getComponentFiles() {
+        return null;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 28567c0..182887e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -19,6 +19,8 @@
 
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
+import static org.apache.hyracks.util.ExitUtil.EC_INCONSISTENT_STORAGE_REFERENCES;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
@@ -55,6 +57,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.util.ExitUtil;
 import org.apache.hyracks.util.annotations.CriticalPath;
 import org.apache.hyracks.util.trace.ITracer;
 import org.apache.hyracks.util.trace.ITracer.Scope;
@@ -244,57 +247,36 @@
                             }
                         }
                         if (inactiveDiskComponentsToBeDeleted != null) {
+                            // note: the disk components have been removed before they were destroyed
                             inactiveDiskComponents.removeAll(inactiveDiskComponentsToBeDeleted);
                         }
                     }
                     List<ILSMMemoryComponent> inactiveMemoryComponents = lsmIndex.getInactiveMemoryComponents();
                     if (!inactiveMemoryComponents.isEmpty()) {
                         inactiveMemoryComponentsToBeCleanedUp = new ArrayList<>(inactiveMemoryComponents);
+                        // note: the inactive memory components have been cleared before they were cleaned up
                         inactiveMemoryComponents.clear();
                     }
                 }
             }
         } finally {
-            /*
-             * cleanup inactive disk components if any
-             */
-            if (inactiveDiskComponentsToBeDeleted != null) {
-                try {
-                    //schedule a replication job to delete these inactive disk components from replicas
-                    if (replicationEnabled) {
-                        lsmIndex.scheduleReplication(null, inactiveDiskComponentsToBeDeleted,
-                                ReplicationOperation.DELETE, opType);
-                    }
-                    for (ILSMDiskComponent c : inactiveDiskComponentsToBeDeleted) {
-                        c.deactivateAndDestroy();
-                    }
-                } catch (Throwable e) { // NOSONAR Log and re-throw
-                    if (LOGGER.isWarnEnabled()) {
-                        LOGGER.log(Level.WARN, "Failure scheduling replication or destroying merged component", e);
-                    }
-                    throw e; // NOSONAR: The last call in the finally clause
-                }
-            }
+            // the memory components clean up must be done first to avoid any unexpected exceptions during the rest
+            // of the finally block
             if (inactiveMemoryComponentsToBeCleanedUp != null) {
-                for (ILSMMemoryComponent c : inactiveMemoryComponentsToBeCleanedUp) {
-                    tracer.instant(c.toString(), traceCategory, Scope.p, lsmIndex::toString);
-                    c.cleanup();
-                    synchronized (opTracker) {
-                        c.reset();
-                        // Notify all waiting threads whenever the mutable component's state
-                        // has changed to inactive. This is important because even though we switched
-                        // the mutable components, it is possible that the component that we just
-                        // switched to is still busy flushing its data to disk. Thus, the notification
-                        // that was issued upon scheduling the flush is not enough.
-                        opTracker.notifyAll(); // NOSONAR: Always called inside synchronized block
-                    }
-                }
+                cleanupInactiveMemoryComponents(inactiveMemoryComponentsToBeCleanedUp);
             }
             if (opType == LSMOperationType.FLUSH) {
                 ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0);
                 // We must call flushed without synchronizing on opTracker to avoid deadlocks
                 flushingComponent.flushed();
             }
+            if (inactiveDiskComponentsToBeDeleted != null) {
+                if (replicationEnabled) {
+                    lsmIndex.scheduleReplication(null, inactiveDiskComponentsToBeDeleted, ReplicationOperation.DELETE,
+                            opType);
+                }
+                lsmIndex.scheduleCleanup(inactiveDiskComponentsToBeDeleted);
+            }
         }
     }
 
@@ -931,4 +913,32 @@
             }
         }
     }
+
+    /**
+     * Resets the memory state of {@code inactiveMemoryComponents}. This method should not throw any exceptions.
+     * To account for cases where unexpected exceptions are thrown, we halt to avoid getting stuck with a bad in-memory
+     * state.
+     *
+     * @param inactiveMemoryComponents
+     */
+    private void cleanupInactiveMemoryComponents(List<ILSMMemoryComponent> inactiveMemoryComponents) {
+        try {
+            for (ILSMMemoryComponent c : inactiveMemoryComponents) {
+                tracer.instant(c.toString(), traceCategory, Scope.p, lsmIndex::toString);
+                c.cleanup();
+                synchronized (opTracker) {
+                    c.reset();
+                    // Notify all waiting threads whenever the mutable component's state
+                    // has changed to inactive. This is important because even though we switched
+                    // the mutable components, it is possible that the component that we just
+                    // switched to is still busy flushing its data to disk. Thus, the notification
+                    // that was issued upon scheduling the flush is not enough.
+                    opTracker.notifyAll(); // NOSONAR: Always called inside synchronized block
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.fatal("unexpected error while cleaning up inactive memory components", e);
+            ExitUtil.halt(EC_INCONSISTENT_STORAGE_REFERENCES);
+        }
+    }
 }