[ASTERIXDB-2786] Fix synchronization of GVBC
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- fix synchronization of GVBC's register and unregister
- fix the access of disk components in GVBCTest
- fix the error message printing to show the full stack
trace
- refactor GVBC to avoid checking primary indexes that are being
flushed
Change-Id: I97aba2139f013610649f7d6ec48fed5eca86a4de
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8384
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
index 2034aa8..abfe2ec 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
@@ -58,6 +58,7 @@
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -153,21 +154,22 @@
Assert.fail();
}
for (int i = 0; i < NUM_PARTITIONS; i++) {
- Assert.assertFalse(primaryIndexes[i].getDiskComponents().isEmpty());
- Assert.assertTrue(
- primaryIndexes[i].getDiskComponents().stream().anyMatch(c -> ((AbstractTreeIndex) c.getIndex())
- .getFileReference().getFile().length() > FILTERED_MEMORY_COMPONENT_SIZE));
+ List<ILSMDiskComponent> diskComponents = new ArrayList<>(primaryIndexes[i].getDiskComponents());
+ Assert.assertFalse(diskComponents.isEmpty());
+ Assert.assertTrue(diskComponents.stream().anyMatch(c -> ((AbstractTreeIndex) c.getIndex())
+ .getFileReference().getFile().length() > FILTERED_MEMORY_COMPONENT_SIZE));
- Assert.assertFalse(filteredPrimaryIndexes[i].getDiskComponents().isEmpty());
- Assert.assertTrue(filteredPrimaryIndexes[i].getDiskComponents().stream()
- .allMatch(c -> ((AbstractTreeIndex) c.getIndex()).getFileReference().getFile()
- .length() <= FILTERED_MEMORY_COMPONENT_SIZE));
+ List<ILSMDiskComponent> filteredDiskComponents =
+ new ArrayList<>(filteredPrimaryIndexes[i].getDiskComponents());
+ Assert.assertFalse(filteredDiskComponents.isEmpty());
+ Assert.assertTrue(filteredDiskComponents.stream().allMatch(c -> ((AbstractTreeIndex) c.getIndex())
+ .getFileReference().getFile().length() <= FILTERED_MEMORY_COMPONENT_SIZE));
}
nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
nc.getTransactionManager().commitTransaction(filteredTxnCtx.getTxnId());
} catch (Throwable e) {
- LOGGER.error(e);
+ LOGGER.error("testFlushes failed", e);
Assert.fail(e.getMessage());
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
index d8c76ab..f772038 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
@@ -107,7 +107,7 @@
public void register(ILSMMemoryComponent memoryComponent) {
ILSMIndex index = memoryComponent.getLsmIndex();
if (index.isPrimaryIndex()) {
- synchronized (primaryIndexes) {
+ synchronized (this) {
if (!primaryIndexes.contains(index)) {
// make sure only add index once
primaryIndexes.add(index);
@@ -134,7 +134,7 @@
public void unregister(ILSMMemoryComponent memoryComponent) {
ILSMIndex index = memoryComponent.getLsmIndex();
if (index.isPrimaryIndex()) {
- synchronized (primaryIndexes) {
+ synchronized (this) {
int pos = primaryIndexes.indexOf(index);
if (pos >= 0) {
primaryIndexes.remove(index);
@@ -485,43 +485,41 @@
int cycles = 0;
while (vbc.getUsage() >= flushPageBudget && cycles <= primaryIndexes.size()) {
// find the first modified memory component while avoiding infinite loops
- while (cycles <= primaryIndexes.size()
- && primaryIndexes.get(flushPtr).isCurrentMutableComponentEmpty()) {
- flushPtr = (flushPtr + 1) % primaryIndexes.size();
- cycles++;
- }
-
ILSMIndex primaryIndex = primaryIndexes.get(flushPtr);
flushPtr = (flushPtr + 1) % primaryIndexes.size();
- // we need to manually flush this memory component because it may be idle at this point
- // note that this is different from flushing a filtered memory component
- PrimaryIndexOperationTracker opTracker =
- (PrimaryIndexOperationTracker) primaryIndex.getOperationTracker();
- synchronized (opTracker) {
- boolean flushable = !primaryIndex.isCurrentMutableComponentEmpty();
- if (flushable && !opTracker.isFlushLogCreated()) {
- // if the flush log has already been created, then we can simply wait for
- // that flush to complete
- ILSMMemoryComponent memoryComponent = primaryIndex.getCurrentMemoryComponent();
- if (memoryComponent.getState() == ComponentState.READABLE_WRITABLE) {
- // before we schedule the flush, mark the memory component as unwritable to prevent
- // future writers
- memoryComponent.setUnwritable();
- }
+ cycles++;
+ if (!primaryIndex.isCurrentMutableComponentEmpty() && !flushingIndexes.contains(primaryIndex)) {
+ // we need to manually flush this memory component because it may be idle at this point
+ // note that this is different from flushing a filtered memory component
+ PrimaryIndexOperationTracker opTracker =
+ (PrimaryIndexOperationTracker) primaryIndex.getOperationTracker();
+ synchronized (opTracker) {
+ boolean flushable = !primaryIndex.isCurrentMutableComponentEmpty();
+ if (flushable && !opTracker.isFlushLogCreated()) {
+ // if the flush log has already been created, then we can simply wait for
+ // that flush to complete
+ ILSMMemoryComponent memoryComponent = primaryIndex.getCurrentMemoryComponent();
+ if (memoryComponent.getState() == ComponentState.READABLE_WRITABLE) {
+ // before we schedule the flush, mark the memory component as unwritable to prevent
+ // future writers
+ memoryComponent.setUnwritable();
+ }
- opTracker.setFlushOnExit(true);
- opTracker.flushIfNeeded();
- // If the flush cannot be scheduled at this time, then there must be active writers.
- // The flush will be eventually scheduled when writers exit
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Requested flushing {} index {}",
- isMetadataIndex(primaryIndex) ? "metadata" : "primary", primaryIndex.toString());
+ opTracker.setFlushOnExit(true);
+ opTracker.flushIfNeeded();
+ // If the flush cannot be scheduled at this time, then there must be active writers.
+ // The flush will be eventually scheduled when writers exit
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("Requested flushing {} index {}",
+ isMetadataIndex(primaryIndex) ? "metadata" : "primary",
+ primaryIndex.toString());
+ }
}
- }
- if ((flushable || opTracker.isFlushLogCreated()) && !isMetadataIndex(primaryIndex)) {
- // global vbc cannot wait on metadata indexes because metadata indexes support full
- // ACID transactions. Waiting on metadata indexes can introduce deadlocks.
- return primaryIndex;
+ if ((flushable || opTracker.isFlushLogCreated()) && !isMetadataIndex(primaryIndex)) {
+ // global vbc cannot wait on metadata indexes because metadata indexes support full
+ // ACID transactions. Waiting on metadata indexes can introduce deadlocks.
+ return primaryIndex;
+ }
}
}
}