[ASTERIXDB-3431][STO] Move index cleanup to IO scheduler
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Clean up inactive memory components before inactive disk
components.
- Halt on unexpected failures while cleaning up the inactive
memory components to avoid getting stuck on an invalid component
state indefinitely.
- Move destroying inactive disk components to the IO scheduler
similar to other LSM IO operations.
Change-Id: I0b1a8f7b8ce9cf7a44593fa6d45eb552ac029796
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18375
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
index bb689ac..329ccaf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
@@ -41,8 +41,18 @@
@Override
public void operationFailed(ILSMIOOperation operation, Throwable t) {
LOGGER.error("Operation {} has failed", operation, t);
- if (operation.getIOOpertionType() != ILSMIOOperation.LSMIOOperationType.REPLICATE) {
+ if (haltOnFailure(operation)) {
ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
}
}
+
+ private boolean haltOnFailure(ILSMIOOperation operation) {
+ switch (operation.getIOOpertionType()) {
+ case CLEANUP:
+ case REPLICATE:
+ return false;
+ default:
+ return true;
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index 40998b7..822df6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -38,7 +38,8 @@
MERGE,
LOAD,
NOOP,
- REPLICATE
+ REPLICATE,
+ CLEANUP
}
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 0869fed..43227ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -229,4 +229,11 @@
*/
IIndexDiskCacheManager getDiskCacheManager();
+ /**
+ * Schedules a cleanup opeartion on the components {@code inactiveDiskComponents}
+ *
+ * @param inactiveDiskComponents
+ * @throws HyracksDataException
+ */
+ void scheduleCleanup(List<ILSMDiskComponent> inactiveDiskComponents) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
index 049da38..3a8e558 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
@@ -62,6 +62,9 @@
case REPLICATE:
scheduleReplicate(operation);
break;
+ case CLEANUP:
+ scheduleCleanup(operation);
+ break;
case NOOP:
break;
default:
@@ -83,7 +86,7 @@
case REPLICATE:
completeReplicate(operation);
break;
- case NOOP:
+ case CLEANUP, NOOP:
return;
default:
// this should never happen
@@ -169,6 +172,10 @@
}
}
+ private void scheduleCleanup(ILSMIOOperation operation) {
+ executor.submit(operation);
+ }
+
private void completeReplicate(ILSMIOOperation operation) {
String id = operation.getIndexIdentifier();
synchronized (executor) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 3656e4f..f7a15fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -118,6 +118,7 @@
private final boolean atomic;
private final List<ILSMDiskComponent> temporaryDiskComponents;
private final ILSMMergePolicy mergePolicy;
+ private final ILSMIOOperationScheduler ioScheduler;
public AbstractLSMIndex(IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches,
IBufferCache diskBufferCache, ILSMIndexFileManager fileManager, double bloomFilterFalsePositiveRate,
@@ -148,6 +149,7 @@
this.atomic = atomic;
this.temporaryDiskComponents = new ArrayList<>();
this.mergePolicy = mergePolicy;
+ this.ioScheduler = ioScheduler;
fileManager.initLastUsedSeq(ioOpCallback.getLastValidSequence());
lsmHarness = new LSMHarness(this, ioScheduler, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled(),
@@ -450,6 +452,13 @@
return mergeOp;
}
+ @Override
+ public void scheduleCleanup(List<ILSMDiskComponent> inactiveDiskComponents) throws HyracksDataException {
+ LSMCleanupOperation cleanupOperation = new LSMCleanupOperation(this, ioOpCallback, inactiveDiskComponents);
+ ioOpCallback.scheduled(cleanupOperation);
+ ioScheduler.scheduleOperation(cleanupOperation);
+ }
+
private static void propagateMap(ILSMIndexOperationContext src, ILSMIndexOperationContext destination) {
Map<String, Object> map = src.getParameters();
if (map != null && !map.isEmpty()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java
index 2a48627..7e6e70b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IoOperationExecutor.java
@@ -74,12 +74,12 @@
fail(executedOp, t != null ? t : executedOp.getFailure());
}
if (!failed || executedOp.getIOOpertionType() != LSMIOOperationType.FLUSH) {
- executedOp.complete(); // destroy if merge or successful flush
+ executedOp.complete(); // destroy if merge, cleanup, or successful flush
}
scheduler.completeOperation(executedOp);
}
- private void fail(ILSMIOOperation executedOp, Throwable t) throws HyracksDataException {
+ private void fail(ILSMIOOperation executedOp, Throwable t) {
callback.operationFailed(executedOp, t);
if (executedOp.getIOOpertionType() == LSMIOOperationType.FLUSH) {
executedOp.complete();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMCleanupOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMCleanupOperation.java
new file mode 100644
index 0000000..3eb86b3
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMCleanupOperation.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class LSMCleanupOperation extends AbstractIoOperation {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final List<ILSMDiskComponent> inactiveDiskComponents;
+
+ public LSMCleanupOperation(ILSMIndex index, ILSMIOOperationCallback ioOpCallback,
+ List<ILSMDiskComponent> inactiveDiskComponents) {
+ super(null, null, ioOpCallback, index.getIndexIdentifier());
+ this.inactiveDiskComponents = inactiveDiskComponents;
+ }
+
+ @Override
+ public ILSMIOOperation.LSMIOOperationType getIOOpertionType() {
+ return LSMIOOperationType.CLEANUP;
+ }
+
+ @Override
+ public ILSMIOOperation.LSMIOOperationStatus call() throws HyracksDataException {
+ try {
+ LOGGER.debug("started cleanup operation on index {} to destroy components {}", getIndexIdentifier(),
+ inactiveDiskComponents);
+ for (ILSMDiskComponent diskComponent : inactiveDiskComponents) {
+ diskComponent.deactivateAndDestroy();
+ }
+ LOGGER.debug("completed cleanup operation on index {} to destroy components {}", getIndexIdentifier(),
+ inactiveDiskComponents);
+ return ILSMIOOperation.LSMIOOperationStatus.SUCCESS;
+ } catch (Exception e) {
+ setFailure(e);
+ return LSMIOOperationStatus.FAILURE;
+ }
+ }
+
+ @Override
+ public long getRemainingPages() {
+ return 0;
+ }
+
+ @Override
+ protected LSMComponentFileReferences getComponentFiles() {
+ return null;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 28567c0..182887e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -19,6 +19,8 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
+import static org.apache.hyracks.util.ExitUtil.EC_INCONSISTENT_STORAGE_REFERENCES;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -55,6 +57,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.annotations.CriticalPath;
import org.apache.hyracks.util.trace.ITracer;
import org.apache.hyracks.util.trace.ITracer.Scope;
@@ -244,57 +247,36 @@
}
}
if (inactiveDiskComponentsToBeDeleted != null) {
+ // note: the disk components have been removed before they were destroyed
inactiveDiskComponents.removeAll(inactiveDiskComponentsToBeDeleted);
}
}
List<ILSMMemoryComponent> inactiveMemoryComponents = lsmIndex.getInactiveMemoryComponents();
if (!inactiveMemoryComponents.isEmpty()) {
inactiveMemoryComponentsToBeCleanedUp = new ArrayList<>(inactiveMemoryComponents);
+ // note: the inactive memory components have been cleared before they were cleaned up
inactiveMemoryComponents.clear();
}
}
}
} finally {
- /*
- * cleanup inactive disk components if any
- */
- if (inactiveDiskComponentsToBeDeleted != null) {
- try {
- //schedule a replication job to delete these inactive disk components from replicas
- if (replicationEnabled) {
- lsmIndex.scheduleReplication(null, inactiveDiskComponentsToBeDeleted,
- ReplicationOperation.DELETE, opType);
- }
- for (ILSMDiskComponent c : inactiveDiskComponentsToBeDeleted) {
- c.deactivateAndDestroy();
- }
- } catch (Throwable e) { // NOSONAR Log and re-throw
- if (LOGGER.isWarnEnabled()) {
- LOGGER.log(Level.WARN, "Failure scheduling replication or destroying merged component", e);
- }
- throw e; // NOSONAR: The last call in the finally clause
- }
- }
+ // the memory components clean up must be done first to avoid any unexpected exceptions during the rest
+ // of the finally block
if (inactiveMemoryComponentsToBeCleanedUp != null) {
- for (ILSMMemoryComponent c : inactiveMemoryComponentsToBeCleanedUp) {
- tracer.instant(c.toString(), traceCategory, Scope.p, lsmIndex::toString);
- c.cleanup();
- synchronized (opTracker) {
- c.reset();
- // Notify all waiting threads whenever the mutable component's state
- // has changed to inactive. This is important because even though we switched
- // the mutable components, it is possible that the component that we just
- // switched to is still busy flushing its data to disk. Thus, the notification
- // that was issued upon scheduling the flush is not enough.
- opTracker.notifyAll(); // NOSONAR: Always called inside synchronized block
- }
- }
+ cleanupInactiveMemoryComponents(inactiveMemoryComponentsToBeCleanedUp);
}
if (opType == LSMOperationType.FLUSH) {
ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0);
// We must call flushed without synchronizing on opTracker to avoid deadlocks
flushingComponent.flushed();
}
+ if (inactiveDiskComponentsToBeDeleted != null) {
+ if (replicationEnabled) {
+ lsmIndex.scheduleReplication(null, inactiveDiskComponentsToBeDeleted, ReplicationOperation.DELETE,
+ opType);
+ }
+ lsmIndex.scheduleCleanup(inactiveDiskComponentsToBeDeleted);
+ }
}
}
@@ -931,4 +913,32 @@
}
}
}
+
+ /**
+ * Resets the memory state of {@code inactiveMemoryComponents}. This method should not throw any exceptions.
+ * To account for cases where unexpected exceptions are thrown, we halt to avoid getting stuck with a bad in-memory
+ * state.
+ *
+ * @param inactiveMemoryComponents
+ */
+ private void cleanupInactiveMemoryComponents(List<ILSMMemoryComponent> inactiveMemoryComponents) {
+ try {
+ for (ILSMMemoryComponent c : inactiveMemoryComponents) {
+ tracer.instant(c.toString(), traceCategory, Scope.p, lsmIndex::toString);
+ c.cleanup();
+ synchronized (opTracker) {
+ c.reset();
+ // Notify all waiting threads whenever the mutable component's state
+ // has changed to inactive. This is important because even though we switched
+ // the mutable components, it is possible that the component that we just
+ // switched to is still busy flushing its data to disk. Thus, the notification
+ // that was issued upon scheduling the flush is not enough.
+ opTracker.notifyAll(); // NOSONAR: Always called inside synchronized block
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.fatal("unexpected error while cleaning up inactive memory components", e);
+ ExitUtil.halt(EC_INCONSISTENT_STORAGE_REFERENCES);
+ }
+ }
}