[NO ISSUE][TX] Fix Concurrent Access in TransactionContext

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Ensure all access to TransactionContext is thread safe.

Change-Id: Id7cc9e67cd51e06cf78b0ea231d3970e5199573c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2875
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
index 95cabf9..a0944ea 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
@@ -36,7 +36,7 @@
 public abstract class AbstractTransactionContext implements ITransactionContext {
 
     protected final TxnId txnId;
-    protected final Map<Long, ITransactionOperationTracker> txnOpTrackers;
+    private final Map<Long, ITransactionOperationTracker> txnOpTrackers;
     private final AtomicLong firstLSN;
     private final AtomicLong lastLSN;
     private final AtomicInteger txnState;
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
index 079e99a..083c26b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.transaction.management.service.transaction;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -35,9 +35,9 @@
 @ThreadSafe
 public class AtomicTransactionContext extends AbstractTransactionContext {
 
-    private final Map<Long, ILSMOperationTracker> opTrackers = new HashMap<>();
-    private final Map<Long, AtomicInteger> indexPendingOps = new HashMap<>();
-    private final Map<Long, IModificationOperationCallback> callbacks = new HashMap<>();
+    private final Map<Long, ILSMOperationTracker> opTrackers = new ConcurrentHashMap<>();
+    private final Map<Long, AtomicInteger> indexPendingOps = new ConcurrentHashMap<>();
+    private final Map<Long, IModificationOperationCallback> callbacks = new ConcurrentHashMap<>();
 
     public AtomicTransactionContext(TxnId txnId) {
         super(txnId);
@@ -47,7 +47,7 @@
     public void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback,
             boolean primaryIndex) {
         super.register(resourceId, partition, index, callback, primaryIndex);
-        synchronized (txnOpTrackers) {
+        synchronized (opTrackers) {
             if (primaryIndex && !opTrackers.containsKey(resourceId)) {
                 opTrackers.put(resourceId, index.getOperationTracker());
                 callbacks.put(resourceId, callback);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
index 9fcb08b..188bb1b3 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.transaction.management.service.transaction;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
@@ -42,23 +42,21 @@
 
     public EntityLevelTransactionContext(TxnId txnId) {
         super(txnId);
-        this.primaryIndexTrackers = new HashMap<>();
-        this.resourcePendingOps = new HashMap<>();
-        this.partitionPendingOps = new HashMap<>();
+        this.primaryIndexTrackers = new ConcurrentHashMap<>();
+        this.resourcePendingOps = new ConcurrentHashMap<>();
+        this.partitionPendingOps = new ConcurrentHashMap<>();
     }
 
     @Override
     public void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback,
             boolean primaryIndex) {
         super.register(resourceId, partition, index, callback, primaryIndex);
-        synchronized (txnOpTrackers) {
-            AtomicInteger pendingOps = partitionPendingOps.computeIfAbsent(partition, p -> new AtomicInteger(0));
-            resourcePendingOps.put(resourceId, pendingOps);
-            if (primaryIndex) {
-                Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair =
-                        new Pair<>((PrimaryIndexOperationTracker) index.getOperationTracker(), callback);
-                primaryIndexTrackers.put(partition, pair);
-            }
+        AtomicInteger pendingOps = partitionPendingOps.computeIfAbsent(partition, p -> new AtomicInteger(0));
+        resourcePendingOps.put(resourceId, pendingOps);
+        if (primaryIndex) {
+            Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> pair =
+                    new Pair<>((PrimaryIndexOperationTracker) index.getOperationTracker(), callback);
+            primaryIndexTrackers.put(partition, pair);
         }
     }