[NO ISSUE][TXN] Prevent deadlock in Metadata transactions
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Flushes in metadata datasets are triggerred by entity update
logs, unlike regular transactions where flushes are triggerred
by entity commit logs.
- Because entity update logs can be writting to disk before the
operation completes, there is a chance that an operation that
caused the component to be full exits after the log is flushed
and so, a flush operation is not scheduled.
- This change proposes a simple fix. The fix is that metadata
operation will also check if a flush is needed and will schedule
one if needed.
Change-Id: I07a18840dc54fe052b7bd294595f816f6d8a4d2f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2413
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 1a76b66..f7f2806 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -86,17 +86,19 @@
throws HyracksDataException {
if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
decrementNumActiveOperations(modificationCallback);
- if (numActiveOperations.get() == 0) {
- flushIfRequested();
- } else if (numActiveOperations.get() < 0) {
- throw new HyracksDataException("The number of active operations cannot be negative!");
- }
+ flushIfNeeded();
} else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
|| opType == LSMOperationType.REPLICATE) {
dsInfo.undeclareActiveIOOperation();
}
}
+ public synchronized void flushIfNeeded() throws HyracksDataException {
+ if (numActiveOperations.get() == 0) {
+ flushIfRequested();
+ }
+ }
+
public void flushIfRequested() throws HyracksDataException {
// If we need a flush, and this is the last completing operation, then schedule the flush,
// or if there is a flush scheduled by the checkpoint (flushOnExit), then schedule it
@@ -177,6 +179,9 @@
//modificationCallback can be NoOpOperationCallback when redo/undo operations are executed.
if (modificationCallback != NoOpOperationCallback.INSTANCE) {
numActiveOperations.decrementAndGet();
+ if (numActiveOperations.get() < 0) {
+ throw new IllegalStateException("The number of active operations cannot be negative!");
+ }
((AbstractOperationCallback) modificationCallback).afterOperation();
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 64d0389..f81d7da 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.functions.FunctionSignature;
@@ -347,7 +348,6 @@
DatasetTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDatasetTupleTranslator(true);
ITupleReference datasetTuple = tupleReaderWriter.getTupleFromMetadataEntity(dataset);
insertTupleIntoIndex(txnId, MetadataPrimaryIndexes.DATASET_DATASET, datasetTuple);
-
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
// Add the primary index for the dataset.
InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails();
@@ -501,6 +501,8 @@
default:
throw new IllegalStateException("Unknown operation type: " + op);
}
+ PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) lsmIndex.getOperationTracker();
+ opTracker.flushIfNeeded(); // there is a window where the flush is not triggerred after an operation
} finally {
datasetLifecycleManager.close(resourceName);
}