[ASTERIXDB-2730][STO] Optimize flush in GVBC
- user model changes: no
- storage format changes: no
- interface changes: yes. Introduce a new storage property:
storage.memorycomponent.max.concurrent.flushes (default 0)
Details:
- Introduce a new storage property to allow concurrent flushes
by GVBC. The default value is 0, which means that the flush concurency
will be the same as the number of NC partitions.
- Move cleaning up of a memory component out of the synchronization block
on op tracker because this may take a relatively long time (a full scan over
all GVBC pages).
- Introduce a minor fix to make sure the memory component is unwritable
before requesting flushing it by GVBC
Change-Id: Id8867fa3ac65da319723b804cc1e39dc8eb6bde5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6624
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 3cf57c5..e9da651 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -206,7 +206,15 @@
}
localResourceRepository.deleteStorageData();
}
- virtualBufferCache = new GlobalVirtualBufferCache(allocator, storageProperties);
+ int maxConcurrentFlushes = storageProperties.getMaxConcurrentFlushes();
+ if (maxConcurrentFlushes <= 0) {
+ maxConcurrentFlushes = ioManager.getIODevices().size();
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("The value of maxConcurrentFlushes is not provided. Setting maxConcurrentFlushes = {}.",
+ maxConcurrentFlushes);
+ }
+ }
+ virtualBufferCache = new GlobalVirtualBufferCache(allocator, storageProperties, maxConcurrentFlushes);
// Must start vbc now instead of by life cycle component manager (lccm) because lccm happens after
// the metadata bootstrap task
((ILifeCycleComponent) virtualBufferCache).start();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index 958d5ae..ed0bf9a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.config;
import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
+import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
@@ -48,6 +49,7 @@
STORAGE_MEMORYCOMPONENT_PAGESIZE(INTEGER_BYTE_UNIT, StorageUtil.getIntSizeInBytes(128, KILOBYTE)),
STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS(POSITIVE_INTEGER, 2),
STORAGE_MEMORYCOMPONENT_FLUSH_THRESHOLD(DOUBLE, 0.9d),
+ STORAGE_MEMORYCOMPONENT_MAX_CONCURRENT_FLUSHES(INTEGER, 0),
STORAGE_FILTERED_MEMORYCOMPONENT_MAX_SIZE(LONG_BYTE_UNIT, 0L),
STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE(DOUBLE, 0.01d),
STORAGE_COMPRESSION_BLOCK(STRING, "snappy"),
@@ -84,6 +86,9 @@
return "The page size in bytes for pages allocated to memory components";
case STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS:
return "The number of memory components to be used per lsm index";
+ case STORAGE_MEMORYCOMPONENT_MAX_CONCURRENT_FLUSHES:
+ return "The maximum number of concurrent flush operations. 0 means that the value will be "
+ + "calculated as the number of partitions";
case STORAGE_MEMORYCOMPONENT_FLUSH_THRESHOLD:
return "The memory usage threshold when memory components should be flushed";
case STORAGE_FILTERED_MEMORYCOMPONENT_MAX_SIZE:
@@ -165,6 +170,10 @@
return (int) (getBufferCacheSize() / (getBufferCachePageSize() + IBufferCache.RESERVED_HEADER_BYTES));
}
+ public int getMaxConcurrentFlushes() {
+ return accessor.getInt(Option.STORAGE_MEMORYCOMPONENT_MAX_CONCURRENT_FLUSHES);
+ }
+
public long getJobExecutionMemoryBudget() {
final long jobExecutionMemory =
Runtime.getRuntime().maxMemory() - getBufferCacheSize() - getMemoryComponentGlobalBudget();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
index 45594eb..d8c76ab 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
@@ -23,8 +23,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -34,6 +36,7 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.api.replication.IIOReplicationManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -66,9 +69,11 @@
private final Int2ObjectMap<AtomicInteger> fileIdUsageMap =
Int2ObjectMaps.synchronize(new Int2ObjectOpenHashMap<>());
+ private final int maxConcurrentFlushes;
private final List<ILSMIndex> primaryIndexes = new ArrayList<>();
+
+ private final Set<ILSMIndex> flushingIndexes = Collections.synchronizedSet(new HashSet<>());
private volatile int flushPtr;
- private volatile ILSMIndex flushingIndex;
private final int filteredMemoryComponentMaxNumPages;
private final int flushPageBudget;
@@ -76,7 +81,8 @@
private final AtomicBoolean isOpen = new AtomicBoolean(false);
private final FlushThread flushThread = new FlushThread();
- public GlobalVirtualBufferCache(ICacheMemoryAllocator allocator, StorageProperties storageProperties) {
+ public GlobalVirtualBufferCache(ICacheMemoryAllocator allocator, StorageProperties storageProperties,
+ int maxConcurrentFlushes) {
this.vbc = new VirtualBufferCache(allocator, storageProperties.getBufferCachePageSize(),
(int) (storageProperties.getMemoryComponentGlobalBudget()
/ storageProperties.getMemoryComponentPageSize()));
@@ -84,6 +90,7 @@
/ storageProperties.getMemoryComponentPageSize()
* storageProperties.getMemoryComponentFlushThreshold());
this.filteredMemoryComponentMaxNumPages = storageProperties.getFilteredMemoryComponentMaxNumPages();
+ this.maxConcurrentFlushes = maxConcurrentFlushes;
}
@Override
@@ -97,24 +104,26 @@
}
@Override
- public synchronized void register(ILSMMemoryComponent memoryComponent) {
+ public void register(ILSMMemoryComponent memoryComponent) {
ILSMIndex index = memoryComponent.getLsmIndex();
if (index.isPrimaryIndex()) {
- if (!primaryIndexes.contains(index)) {
- // make sure only add index once
- primaryIndexes.add(index);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Registered {} index {} to the global VBC",
- isMetadataIndex(index) ? "metadata" : "primary", index.toString());
+ synchronized (primaryIndexes) {
+ if (!primaryIndexes.contains(index)) {
+ // make sure only add index once
+ primaryIndexes.add(index);
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Registered {} index {} to the global VBC",
+ isMetadataIndex(index) ? "metadata" : "primary", index.toString());
+ }
}
- }
- if (index.getNumOfFilterFields() > 0) {
- // handle filtered primary index
- AtomicInteger usage = new AtomicInteger();
- memoryComponentUsageMap.put(memoryComponent, usage);
- for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) {
- if (ref != null) {
- fileRefUsageMap.put(ref, usage);
+ if (index.getNumOfFilterFields() > 0) {
+ // handle filtered primary index
+ AtomicInteger usage = new AtomicInteger();
+ memoryComponentUsageMap.put(memoryComponent, usage);
+ for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) {
+ if (ref != null) {
+ fileRefUsageMap.put(ref, usage);
+ }
}
}
}
@@ -122,29 +131,31 @@
}
@Override
- public synchronized void unregister(ILSMMemoryComponent memoryComponent) {
+ public void unregister(ILSMMemoryComponent memoryComponent) {
ILSMIndex index = memoryComponent.getLsmIndex();
if (index.isPrimaryIndex()) {
- int pos = primaryIndexes.indexOf(index);
- if (pos >= 0) {
- primaryIndexes.remove(index);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Unregistered {} index {} to the global VBC",
- isMetadataIndex(index) ? "metadata" : "primary", index.toString());
+ synchronized (primaryIndexes) {
+ int pos = primaryIndexes.indexOf(index);
+ if (pos >= 0) {
+ primaryIndexes.remove(index);
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Unregistered {} index {} to the global VBC",
+ isMetadataIndex(index) ? "metadata" : "primary", index.toString());
+ }
+ if (primaryIndexes.isEmpty()) {
+ flushPtr = 0;
+ } else if (flushPtr > pos) {
+ // If the removed index is before flushPtr, we should decrement flushPtr by 1 so that
+ // it still points to the same index.
+ flushPtr = (flushPtr - 1) % primaryIndexes.size();
+ }
}
- if (primaryIndexes.isEmpty()) {
- flushPtr = 0;
- } else if (flushPtr > pos) {
- // If the removed index is before flushPtr, we should decrement flushPtr by 1 so that
- // it still points to the same index.
- flushPtr = (flushPtr - 1) % primaryIndexes.size();
- }
- }
- if (index.getNumOfFilterFields() > 0) {
- memoryComponentUsageMap.remove(memoryComponent);
- for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) {
- if (ref != null) {
- fileRefUsageMap.remove(ref);
+ if (index.getNumOfFilterFields() > 0) {
+ memoryComponentUsageMap.remove(memoryComponent);
+ for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) {
+ if (ref != null) {
+ fileRefUsageMap.remove(ref);
+ }
}
}
}
@@ -153,26 +164,19 @@
@Override
public void flushed(ILSMMemoryComponent memoryComponent) throws HyracksDataException {
- if (memoryComponent.getLsmIndex() == flushingIndex) {
+ if (flushingIndexes.remove(memoryComponent.getLsmIndex())) {
+ LOGGER.info("Completed flushing {}.", memoryComponent.getIndex());
+ // After the flush operation is completed, we may have 2 cases:
+ // 1. there is no active reader on this memory component and memory is reclaimed;
+ // 2. there are still some active readers and memory cannot be reclaimed.
+ // But for both cases, we will notify all primary index op trackers to let their writers retry,
+ // if they have been blocked. Moreover, we will check whether more flushes are needed.
synchronized (this) {
- if (memoryComponent.getLsmIndex() == flushingIndex) {
- flushingIndex = null;
- // After the flush operation is completed, we may have 2 cases:
- // 1. there is no active reader on this memory component and memory is reclaimed;
- // 2. there are still some active readers and memory cannot be reclaimed.
- // But for both cases, we will notify all primary index op trackers to let their writers retry,
- // if they have been blocked. Moreover, we will check whether more flushes are needed.
- final int size = primaryIndexes.size();
- for (int i = 0; i < size; i++) {
- ILSMOperationTracker opTracker = primaryIndexes.get(i).getOperationTracker();
- synchronized (opTracker) {
- opTracker.notifyAll();
- }
- }
-
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Completed flushing {}. Resetting flushIndex back to null.",
- memoryComponent.getIndex().toString());
+ final int size = primaryIndexes.size();
+ for (int i = 0; i < size; i++) {
+ ILSMOperationTracker opTracker = primaryIndexes.get(i).getOperationTracker();
+ synchronized (opTracker) {
+ opTracker.notifyAll();
}
}
}
@@ -200,7 +204,8 @@
@Override
public boolean isFull(ILSMMemoryComponent memoryComponent) {
- return memoryComponent.getLsmIndex() == flushingIndex || isFilteredMemoryComponentFull(memoryComponent);
+ return flushingIndexes.contains(memoryComponent.getLsmIndex())
+ || isFilteredMemoryComponentFull(memoryComponent);
}
private boolean isFilteredMemoryComponentFull(ILSMMemoryComponent memoryComponent) {
@@ -278,11 +283,7 @@
}
private void checkAndNotifyFlushThread() {
- if (vbc.getUsage() < flushPageBudget || flushingIndex != null) {
- // For better performance, we only flush one dataset partition at a time.
- // After reclaiming memory from this dataset partition, its memory can be used by other indexes.
- // Thus, given N dataset partitions, each dataset partition will approximately receive 2/N of
- // the total memory instead of 1/N, which doubles the memory utilization.
+ if (vbc.getUsage() < flushPageBudget) {
return;
}
// Notify the flush thread to schedule flushes. This is used to avoid deadlocks because page pins can be
@@ -470,48 +471,63 @@
}
private void scheduleFlush() throws HyracksDataException {
+ ILSMIndex selectedIndex = null;
synchronized (GlobalVirtualBufferCache.this) {
- int cycles = 0;
- while (vbc.getUsage() >= flushPageBudget && flushingIndex == null && cycles <= primaryIndexes.size()) {
- // find the first modified memory component while avoiding infinite loops
- while (cycles <= primaryIndexes.size()
- && primaryIndexes.get(flushPtr).isCurrentMutableComponentEmpty()) {
- flushPtr = (flushPtr + 1) % primaryIndexes.size();
- cycles++;
- }
-
- ILSMIndex primaryIndex = primaryIndexes.get(flushPtr);
- flushPtr = (flushPtr + 1) % primaryIndexes.size();
- // we need to manually flush this memory component because it may be idle at this point
- // note that this is different from flushing a filtered memory component
- PrimaryIndexOperationTracker opTracker =
- (PrimaryIndexOperationTracker) primaryIndex.getOperationTracker();
- synchronized (opTracker) {
- boolean flushable = !primaryIndex.isCurrentMutableComponentEmpty();
- if (flushable && !opTracker.isFlushLogCreated()) {
- // if the flush log has already been created, then we can simply wait for
- // that flush to complete
- opTracker.setFlushOnExit(true);
- opTracker.flushIfNeeded();
- // If the flush cannot be scheduled at this time, then there must be active writers.
- // The flush will be eventually scheduled when writers exit
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Requested {} flushing primary index {}",
- isMetadataIndex(primaryIndex) ? "metadata" : "primary",
- primaryIndex.toString());
- }
- }
- if ((flushable || opTracker.isFlushLogCreated()) && !isMetadataIndex(primaryIndex)) {
- // global vbc cannot wait on metadata indexes because metadata indexes support full
- // ACID transactions. Waiting on metadata indexes can introduce deadlocks.
- flushingIndex = primaryIndex;
- LOGGER.debug("Waiting for flushing primary index {} to complete...", primaryIndex);
- break;
- }
- }
+ while (flushingIndexes.size() < maxConcurrentFlushes
+ && ((selectedIndex = selectFlushIndex()) != null)) {
+ LOGGER.debug("Waiting for flushing primary index {} to complete...", selectedIndex);
+ flushingIndexes.add(selectedIndex);
}
}
}
+
+ private ILSMIndex selectFlushIndex() throws HyracksDataException {
+ int cycles = 0;
+ while (vbc.getUsage() >= flushPageBudget && cycles <= primaryIndexes.size()) {
+ // find the first modified memory component while avoiding infinite loops
+ while (cycles <= primaryIndexes.size()
+ && primaryIndexes.get(flushPtr).isCurrentMutableComponentEmpty()) {
+ flushPtr = (flushPtr + 1) % primaryIndexes.size();
+ cycles++;
+ }
+
+ ILSMIndex primaryIndex = primaryIndexes.get(flushPtr);
+ flushPtr = (flushPtr + 1) % primaryIndexes.size();
+ // we need to manually flush this memory component because it may be idle at this point
+ // note that this is different from flushing a filtered memory component
+ PrimaryIndexOperationTracker opTracker =
+ (PrimaryIndexOperationTracker) primaryIndex.getOperationTracker();
+ synchronized (opTracker) {
+ boolean flushable = !primaryIndex.isCurrentMutableComponentEmpty();
+ if (flushable && !opTracker.isFlushLogCreated()) {
+ // if the flush log has already been created, then we can simply wait for
+ // that flush to complete
+ ILSMMemoryComponent memoryComponent = primaryIndex.getCurrentMemoryComponent();
+ if (memoryComponent.getState() == ComponentState.READABLE_WRITABLE) {
+ // before we schedule the flush, mark the memory component as unwritable to prevent
+ // future writers
+ memoryComponent.setUnwritable();
+ }
+
+ opTracker.setFlushOnExit(true);
+ opTracker.flushIfNeeded();
+ // If the flush cannot be scheduled at this time, then there must be active writers.
+ // The flush will be eventually scheduled when writers exit
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Requested flushing {} index {}",
+ isMetadataIndex(primaryIndex) ? "metadata" : "primary", primaryIndex.toString());
+ }
+ }
+ if ((flushable || opTracker.isFlushLogCreated()) && !isMetadataIndex(primaryIndex)) {
+ // global vbc cannot wait on metadata indexes because metadata indexes support full
+ // ACID transactions. Waiting on metadata indexes can introduce deadlocks.
+ return primaryIndex;
+ }
+ }
+ }
+ return null;
+ }
+
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
index fef59e7..186cabb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
@@ -95,9 +95,11 @@
* whether the operation failed
* @param isMutableComponent
* true if the thread intended to modify the component
+ * @return
+ * true if extra cleanup is needed for the component
* @throws HyracksDataException
*/
- void threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
+ boolean threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
throws HyracksDataException;
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 23b3634..69b9547 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -128,6 +128,10 @@
void addInactiveDiskComponent(ILSMDiskComponent diskComponent);
+ List<ILSMMemoryComponent> getInactiveMemoryComponents();
+
+ void addInactiveMemoryComponent(ILSMMemoryComponent memoryComponent);
+
boolean isCurrentMutableComponentEmpty() throws HyracksDataException;
void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> diskComponents,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
index 064ab64..70b1bd3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
@@ -42,13 +42,20 @@
int getWriterCount();
/**
- * Clear the component and its metadata page completely
+ * Reset the memory component's state after the flush completes
*
* @throws HyracksDataException
*/
void reset() throws HyracksDataException;
/**
+ * Cleanup the memory component after flush (can be time consuming)
+ *
+ * @throws HyracksDataException
+ */
+ void cleanup() throws HyracksDataException;
+
+ /**
* @return true if there are data in the memory component, false otherwise
*/
boolean isModified();
@@ -105,6 +112,7 @@
/**
* Called when the memory component is flushed to disk
+ *
* @throws HyracksDataException
*/
void flushed() throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 0ad3029..66fafec 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -92,7 +92,7 @@
}
@Override
- public void threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
+ public boolean threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
throws HyracksDataException {
switch (opType) {
case MERGE:
@@ -122,6 +122,7 @@
if (readerCount <= -1) {
throw new IllegalStateException("Invalid LSM disk component readerCount: " + readerCount);
}
+ return false;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 6c7b1f8..a5ff2e6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -25,7 +25,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -94,6 +93,7 @@
// components with lower indexes are newer than components with higher index
protected final List<ILSMDiskComponent> diskComponents;
protected final List<ILSMDiskComponent> inactiveDiskComponents;
+ protected final List<ILSMMemoryComponent> inactiveMemoryComponents;
protected final double bloomFilterFalsePositiveRate;
protected final IComponentFilterHelper filterHelper;
protected final ILSMComponentFilterFrameFactory filterFrameFactory;
@@ -135,7 +135,8 @@
this.filterManager = filterManager;
this.treeFields = treeFields;
this.filterFields = filterFields;
- this.inactiveDiskComponents = new LinkedList<>();
+ this.inactiveDiskComponents = new ArrayList<>();
+ this.inactiveMemoryComponents = new ArrayList<>();
this.durable = durable;
this.tracer = tracer;
lsmHarness = new LSMHarness(this, ioScheduler, mergePolicy, opTracker, diskBufferCache.isReplicationEnabled(),
@@ -170,8 +171,9 @@
lsmHarness = new ExternalIndexHarness(this, ioScheduler, mergePolicy, opTracker,
diskBufferCache.isReplicationEnabled());
isActive = false;
- diskComponents = new LinkedList<>();
- this.inactiveDiskComponents = new LinkedList<>();
+ diskComponents = new ArrayList<>();
+ this.inactiveDiskComponents = new ArrayList<>();
+ this.inactiveMemoryComponents = new ArrayList<>();
// Memory related objects are nulled
virtualBufferCaches = null;
memoryComponents = null;
@@ -300,6 +302,7 @@
private void resetMemoryComponents() throws HyracksDataException {
if (memoryComponentsAllocated && memoryComponents != null) {
for (ILSMMemoryComponent c : memoryComponents) {
+ c.cleanup();
c.reset();
}
}
@@ -700,6 +703,16 @@
}
@Override
+ public List<ILSMMemoryComponent> getInactiveMemoryComponents() {
+ return inactiveMemoryComponents;
+ }
+
+ @Override
+ public void addInactiveMemoryComponent(ILSMMemoryComponent memoryComponent) {
+ inactiveMemoryComponents.add(memoryComponent);
+ }
+
+ @Override
public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> lsmComponents,
ReplicationOperation operation, LSMOperationType opType) throws HyracksDataException {
//get set of files to be replicated for this component
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index a6c82bc..3b6667e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -151,8 +151,9 @@
}
@Override
- public void threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
+ public boolean threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
throws HyracksDataException {
+ boolean cleanup = false;
switch (opType) {
case FORCE_MODIFICATION:
case MODIFICATION:
@@ -168,14 +169,14 @@
} else {
readerCount--;
if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) {
- reset();
+ cleanup = true;
}
}
break;
case SEARCH:
readerCount--;
if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) {
- reset();
+ cleanup = true;
}
break;
case FLUSH:
@@ -183,25 +184,22 @@
throw new IllegalStateException("Flush sees an illegal LSM memory compoenent state: " + state);
}
readerCount--;
- if (failedOperation) {
+ if (!failedOperation) {
// If flush failed, keep the component state to READABLE_UNWRITABLE_FLUSHING
- return;
- }
- // operation succeeded
- if (readerCount == 0) {
- // TODO: move reset() outside of the synchronized block (on op tracker)
- reset();
- } else {
+ // operation succeeded
state = ComponentState.UNREADABLE_UNWRITABLE;
+ if (readerCount == 0) {
+ cleanup = true;
+ }
}
break;
default:
throw new UnsupportedOperationException("Unsupported operation " + opType);
}
-
if (readerCount <= -1 || writerCount <= -1) {
throw new IllegalStateException("Invalid reader or writer count " + readerCount + " - " + writerCount);
}
+ return cleanup;
}
@Override
@@ -235,7 +233,6 @@
if (filter != null) {
filter.reset();
}
- doReset();
lsmIndex.memoryComponentsReset();
// a flush can be pending on a component that just completed its flush... here is when this can happen:
// primary index has 2 components, secondary index has 2 components.
@@ -248,7 +245,8 @@
}
}
- protected void doReset() throws HyracksDataException {
+ @Override
+ public void cleanup() throws HyracksDataException {
getIndex().deactivate();
getIndex().destroy();
getIndex().create();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMWithBuddyMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMWithBuddyMemoryComponent.java
index 373d7e7..882b4bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMWithBuddyMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMWithBuddyMemoryComponent.java
@@ -33,8 +33,8 @@
public abstract AbstractTreeIndex getBuddyIndex();
@Override
- public void doReset() throws HyracksDataException {
- super.doReset();
+ public void cleanup() throws HyracksDataException {
+ super.cleanup();
getBuddyIndex().deactivate();
getBuddyIndex().destroy();
getBuddyIndex().create();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
index 9a6fb43..3ea0f49 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -46,7 +46,7 @@
}
@Override
- public void threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
+ public boolean threadExit(LSMOperationType opType, boolean failedOperation, boolean isMutableComponent)
throws HyracksDataException {
throw HyracksDataException.create(ErrorCode.ILLEGAL_ATTEMPT_TO_EXIT_EMPTY_COMPONENT);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 20a2555..92fa135 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -20,7 +20,6 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
@@ -109,7 +108,7 @@
return false;
}
try {
- opTracker.wait();
+ opTracker.wait(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw HyracksDataException.create(e);
@@ -176,8 +175,8 @@
if (!ctx.isAccessingComponents() && opType != LSMOperationType.FLUSH && opType != LSMOperationType.MERGE) {
return;
}
- List<ILSMDiskComponent> inactiveDiskComponents;
List<ILSMDiskComponent> inactiveDiskComponentsToBeDeleted = null;
+ List<ILSMMemoryComponent> inactiveMemoryComponentsToBeCleanedUp = null;
try {
synchronized (opTracker) {
try {
@@ -220,12 +219,12 @@
* and not anymore accessed.
* This cleanup is done outside of optracker synchronized block.
*/
- inactiveDiskComponents = lsmIndex.getInactiveDiskComponents();
+ List<ILSMDiskComponent> inactiveDiskComponents = lsmIndex.getInactiveDiskComponents();
if (!inactiveDiskComponents.isEmpty()) {
for (ILSMDiskComponent inactiveComp : inactiveDiskComponents) {
if (inactiveComp.getFileReferenceCount() == 1) {
inactiveDiskComponentsToBeDeleted = inactiveDiskComponentsToBeDeleted == null
- ? new LinkedList<>() : inactiveDiskComponentsToBeDeleted;
+ ? new ArrayList<>() : inactiveDiskComponentsToBeDeleted;
inactiveDiskComponentsToBeDeleted.add(inactiveComp);
}
}
@@ -233,6 +232,11 @@
inactiveDiskComponents.removeAll(inactiveDiskComponentsToBeDeleted);
}
}
+ List<ILSMMemoryComponent> inactiveMemoryComponents = lsmIndex.getInactiveMemoryComponents();
+ if (!inactiveMemoryComponents.isEmpty()) {
+ inactiveMemoryComponentsToBeCleanedUp = new ArrayList<>(inactiveMemoryComponents);
+ inactiveMemoryComponents.clear();
+ }
}
}
} finally {
@@ -256,6 +260,21 @@
throw e; // NOSONAR: The last call in the finally clause
}
}
+ if (inactiveMemoryComponentsToBeCleanedUp != null) {
+ for (ILSMMemoryComponent c : inactiveMemoryComponentsToBeCleanedUp) {
+ tracer.instant(c.toString(), traceCategory, Scope.p, lsmIndex.toString());
+ c.cleanup();
+ synchronized (opTracker) {
+ c.reset();
+ // Notify all waiting threads whenever the mutable component's state
+ // has changed to inactive. This is important because even though we switched
+ // the mutable components, it is possible that the component that we just
+ // switched to is still busy flushing its data to disk. Thus, the notification
+ // that was issued upon scheduling the flush is not enough.
+ opTracker.notifyAll(); // NOSONAR: Always called inside synchronized block
+ }
+ }
+ }
if (opType == LSMOperationType.FLUSH) {
ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0);
// We must call flushed without synchronizing on opTracker to avoid deadlocks
@@ -309,26 +328,16 @@
for (int i = 0; i < componentsCount; i++) {
final ILSMComponent c = componentHolder.get(i);
boolean isMutableComponent = i == 0 && c.getType() == LSMComponentType.MEMORY;
- c.threadExit(opType, failedOperation, isMutableComponent);
+ boolean needsCleanup = c.threadExit(opType, failedOperation, isMutableComponent);
if (c.getType() == LSMComponentType.MEMORY) {
- switch (c.getState()) {
- case READABLE_UNWRITABLE:
- if (isMutableComponent && (opType == LSMOperationType.MODIFICATION
- || opType == LSMOperationType.FORCE_MODIFICATION)) {
- lsmIndex.changeFlushStatusForCurrentMutableCompoent(true);
- }
- break;
- case INACTIVE:
- tracer.instant(c.toString(), traceCategory, Scope.p, lsmIndex.toString());
- // Notify all waiting threads whenever the mutable component's state
- // has changed to inactive. This is important because even though we switched
- // the mutable components, it is possible that the component that we just
- // switched to is still busy flushing its data to disk. Thus, the notification
- // that was issued upon scheduling the flush is not enough.
- opTracker.notifyAll(); // NOSONAR: Always called inside synchronized block
- break;
- default:
- break;
+ if (c.getState() == ComponentState.READABLE_UNWRITABLE) {
+ if (isMutableComponent && (opType == LSMOperationType.MODIFICATION
+ || opType == LSMOperationType.FORCE_MODIFICATION)) {
+ lsmIndex.changeFlushStatusForCurrentMutableCompoent(true);
+ }
+ }
+ if (needsCleanup) {
+ lsmIndex.addInactiveMemoryComponent((ILSMMemoryComponent) c);
}
} else if (c.getState() == ComponentState.INACTIVE) {
lsmIndex.addInactiveDiskComponent((AbstractLSMDiskComponent) c);