[NO ISSUE][TX] Fix Concurrent Access in TransactionContext
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Ensure all access to TransactionContext is thread safe.
Change-Id: Id7cc9e67cd51e06cf78b0ea231d3970e5199573c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2875
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
index 95cabf9..a0944ea 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
@@ -36,7 +36,7 @@
public abstract class AbstractTransactionContext implements ITransactionContext {
protected final TxnId txnId;
- protected final Map<Long, ITransactionOperationTracker> txnOpTrackers;
+ private final Map<Long, ITransactionOperationTracker> txnOpTrackers;
private final AtomicLong firstLSN;
private final AtomicLong lastLSN;
private final AtomicInteger txnState;
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
index 079e99a..083c26b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.transaction.management.service.transaction;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.common.exceptions.ACIDException;
@@ -35,9 +35,9 @@
@ThreadSafe
public class AtomicTransactionContext extends AbstractTransactionContext {
- private final Map<Long, ILSMOperationTracker> opTrackers = new HashMap<>();
- private final Map<Long, AtomicInteger> indexPendingOps = new HashMap<>();
- private final Map<Long, IModificationOperationCallback> callbacks = new HashMap<>();
+ private final Map<Long, ILSMOperationTracker> opTrackers = new ConcurrentHashMap<>();
+ private final Map<Long, AtomicInteger> indexPendingOps = new ConcurrentHashMap<>();
+ private final Map<Long, IModificationOperationCallback> callbacks = new ConcurrentHashMap<>();
public AtomicTransactionContext(TxnId txnId) {
super(txnId);
@@ -47,7 +47,7 @@
public void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback,
boolean primaryIndex) {
super.register(resourceId, partition, index, callback, primaryIndex);
- synchronized (txnOpTrackers) {
+ synchronized (opTrackers) {
if (primaryIndex && !opTrackers.containsKey(resourceId)) {
opTrackers.put(resourceId, index.getOperationTracker());
callbacks.put(resourceId, callback);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
index 9fcb08b..188bb1b3 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.transaction.management.service.transaction;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
@@ -42,23 +42,21 @@
public EntityLevelTransactionContext(TxnId txnId) {
super(txnId);
- this.primaryIndexTrackers = new HashMap<>();
- this.resourcePendingOps = new HashMap<>();
- this.partitionPendingOps = new HashMap<>();
+ this.primaryIndexTrackers = new ConcurrentHashMap<>();
+ this.resourcePendingOps = new ConcurrentHashMap<>();
+ this.partitionPendingOps = new ConcurrentHashMap<>();
}
@Override
public void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback,
boolean primaryIndex) {
super.register(resourceId, partition, index, callback, primaryIndex);
- synchronized (txnOpTrackers) {
- AtomicInteger pendingOps = partitionPendingOps.computeIfAbsent(partition, p -> new AtomicInteger(0));
- resourcePendingOps.put(resourceId, pendingOps);
- if (primaryIndex) {
- Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair =
- new Pair<>((PrimaryIndexOperationTracker) index.getOperationTracker(), callback);
- primaryIndexTrackers.put(partition, pair);
- }
+ AtomicInteger pendingOps = partitionPendingOps.computeIfAbsent(partition, p -> new AtomicInteger(0));
+ resourcePendingOps.put(resourceId, pendingOps);
+ if (primaryIndex) {
+ Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair =
+ new Pair<>((PrimaryIndexOperationTracker) index.getOperationTracker(), callback);
+ primaryIndexTrackers.put(partition, pair);
}
}