add cache for dataset locks (they are given up when the job finishes)
minor cleanup
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 124b406..635ee42 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -51,6 +52,7 @@
private RequestArenaManager reqArenaMgr;
private JobArenaManager jobArenaMgr;
private ConcurrentHashMap<Integer, Long> jobIdSlotMap;
+ private ThreadLocal<DatasetLockCache> dsLockCache;
enum LockAction {
GET,
@@ -80,6 +82,11 @@
reqArenaMgr = new RequestArenaManager(lockManagerShrinkTimer);
jobArenaMgr = new JobArenaManager(lockManagerShrinkTimer);
jobIdSlotMap = new ConcurrentHashMap<>();
+ dsLockCache = new ThreadLocal<DatasetLockCache>() {
+ protected DatasetLockCache initialValue() {
+ return new DatasetLockCache();
+ }
+ };
}
public AsterixTransactionProperties getTransactionProperties() {
@@ -92,15 +99,19 @@
log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+ final int dsId = datasetId.getId();
+ final int jobId = txnContext.getJobId().getId();
+
if (entityHashValue != -1) {
// get the intention lock on the dataset, if we want to lock an individual item
- byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
- lock(datasetId, -1, dsLockMode, txnContext);
+ final byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
+ if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+ lock(datasetId, -1, dsLockMode, txnContext);
+ dsLockCache.get().put(jobId, dsId, dsLockMode);
+ }
}
- int dsId = datasetId.getId();
-
- long jobSlot = getJobSlot(txnContext.getJobId().getId());
+ long jobSlot = getJobSlot(jobId);
ResourceGroup group = table.get(datasetId, entityHashValue);
group.getLatch();
@@ -195,16 +206,21 @@
throws ACIDException {
log("tryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+ final int dsId = datasetId.getId();
+ final int jobId = txnContext.getJobId().getId();
+
if (entityHashValue != -1) {
// get the intention lock on the dataset, if we want to lock an individual item
byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
- if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
- return false;
+ if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+ if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
+ return false;
+ }
+ dsLockCache.get().put(jobId, dsId, dsLockMode);
}
}
- int dsId = datasetId.getId();
- long jobSlot = getJobSlot(txnContext.getJobId().getId());
+ long jobSlot = getJobSlot(jobId);
boolean locked = false;
@@ -245,10 +261,8 @@
group.releaseLatch();
}
- // if we did acquire the dataset lock, but not the entity lock, we need to remove it
- if (!locked && entityHashValue != -1) {
- unlock(datasetId, -1, txnContext);
- }
+ // if we did acquire the dataset lock, but not the entity lock, we keep
+ // it anyway and clean it up at the end of the job
return locked;
}
@@ -315,11 +329,7 @@
group.releaseLatch();
}
- // finally remove the intention lock as well
- if (entityHashValue != -1) {
- // remove the intention lock on the dataset, if we want to lock an individual item
- unlock(datasetId, -1, txnContext);
- }
+ // dataset intention locks are cleaned up at the end of the job
}
@Override
@@ -387,17 +397,6 @@
return reqSlot;
}
- private long findLastHolderForJob(long resource, long job) {
- long holder = resArenaMgr.getLastHolder(resource);
- while (holder != -1) {
- if (job == reqArenaMgr.getJobSlot(holder)) {
- return holder;
- }
- holder = reqArenaMgr.getNextRequest(holder);
- }
- return -1;
- }
-
/**
* when we've got a lock conflict for a different job, we always have to
* wait, if it is for the same job we either have to
@@ -410,8 +409,8 @@
* @return
*/
private LockAction updateActionForSameJob(long resource, long job, byte lockMode) {
- // TODO we can reduce the numer of things we have to look at by carefully
- // distinguishing the different lock modes
+ // TODO we can reduce the number of things we have to look at by
+ // carefully distinguishing the different lock modes
long holder = resArenaMgr.getLastHolder(resource);
LockAction res = LockAction.WAIT;
while (holder != -1) {
@@ -822,6 +821,33 @@
}
}
}
+
+ private static class DatasetLockCache {
+ private long jobId = -1;
+ private HashMap<Integer,Byte> lockCache = new HashMap<Integer,Byte>();
+
+ public boolean contains(final int jobId, final int dsId, byte dsLockMode) {
+ if (this.jobId == jobId) {
+ final Byte cachedLockMode = this.lockCache.get(dsId);
+ if (cachedLockMode != null && cachedLockMode == dsLockMode) {
+ return true;
+ }
+ } else {
+ this.jobId = -1;
+ this.lockCache.clear();
+ }
+ return false;
+ }
+
+ public void put(final int jobId, final int dsId, byte dsLockMode) {
+ this.jobId = jobId;
+ this.lockCache.put(dsId, dsLockMode);
+ }
+
+ public String toString() {
+ return "[ " + jobId + " : " + lockCache.toString() + "]";
+ }
+ }
private static class ResourceGroupTable {
public static final int TABLE_SIZE = 10; // TODO increase