more efficient implementations for instantLock and instantTryLock
add method to determine the "intention mode" of a lock mode
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 9f98680..44bc607 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -108,7 +108,6 @@
@Override
public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
-
log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
final int dsId = datasetId.getId();
@@ -116,30 +115,25 @@
if (entityHashValue != -1) {
// get the intention lock on the dataset, if we want to lock an individual item
- final byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
+ final byte dsLockMode = LockMode.intentionMode(lockMode);
if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
lock(datasetId, -1, dsLockMode, txnContext);
dsLockCache.get().put(jobId, dsId, dsLockMode);
}
}
- long jobSlot = findOrAllocJobSlot(jobId);
+ final long jobSlot = findOrAllocJobSlot(jobId);
- ResourceGroup group = table.get(datasetId, entityHashValue);
+ final ResourceGroup group = table.get(datasetId, entityHashValue);
group.getLatch();
-
try {
validateJob(txnContext);
- // 1) Find the resource in the hash table
- long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
- // 2) create a request entry
- long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
- // 3) check lock compatibility
+ final long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
+ final long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
boolean locked = false;
-
while (! locked) {
- LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+ final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
switch (act) {
case UPD:
resArenaMgr.setMaxMode(resSlot, lockMode);
@@ -150,17 +144,7 @@
break;
case WAIT:
case CONV:
- final Queue queue = act.modify ? Upgrader : Waiter;
- if (! introducesDeadlock(resSlot, jobSlot)) {
- queue.add(reqSlot, resSlot, jobSlot);
- } else {
- requestAbort(txnContext);
- }
- try {
- group.await(txnContext);
- } finally {
- queue.remove(reqSlot, resSlot, jobSlot);
- }
+ enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext);
break;
case ERR:
default:
@@ -172,6 +156,22 @@
}
}
+ private void enqueueWaiter(final ResourceGroup group, final long reqSlot,
+ final long resSlot, final long jobSlot, final LockAction act,
+ ITransactionContext txnContext) throws ACIDException {
+ final Queue queue = act.modify ? Upgrader : Waiter;
+ if (! introducesDeadlock(resSlot, jobSlot)) {
+ queue.add(reqSlot, resSlot, jobSlot);
+ } else {
+ requestAbort(txnContext);
+ }
+ try {
+ group.await(txnContext);
+ } finally {
+ queue.remove(reqSlot, resSlot, jobSlot);
+ }
+ }
+
/**
* determine if adding a job to the waiters of a resource will introduce a
* cycle in the wait-graph where the job waits on itself
@@ -179,7 +179,7 @@
* @param jobSlot the slot that contains the information about the job
* @return true if a cycle would be introduced, false otherwise
*/
- private boolean introducesDeadlock(long resSlot, long jobSlot) {
+ private boolean introducesDeadlock(final long resSlot, final long jobSlot) {
long reqSlot = resArenaMgr.getLastHolder(resSlot);
while (reqSlot >= 0) {
long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
@@ -208,9 +208,69 @@
public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
log("instantLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
+ final int dsId = datasetId.getId();
+ final int jobId = txnContext.getJobId().getId();
+
+ if (entityHashValue != -1) {
+ // get the intention lock on the dataset, if we want to lock an individual item
+ final byte dsLockMode = LockMode.intentionMode(lockMode);
+ if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+ lock(datasetId, -1, dsLockMode, txnContext);
+ dsLockCache.get().put(jobId, dsId, dsLockMode);
+ }
+ }
- lock(datasetId, entityHashValue, lockMode, txnContext);
- unlock(datasetId, entityHashValue, lockMode, txnContext);
+ final ResourceGroup group = table.get(datasetId, entityHashValue);
+ if (group.firstResourceIndex.get() == -1l) {
+ validateJob(txnContext);
+ // if we do not have a resource in the group, we know that the
+ // resource that we are looking for is not locked
+ return;
+ }
+
+ // we only allocate a request slot if we actually have to wait
+ long reqSlot = -1;
+
+ group.getLatch();
+ try {
+ validateJob(txnContext);
+
+ final long resSlot = findResourceInGroup(group, dsId, entityHashValue);
+ if (resSlot < 0) {
+ // if we don't find the resource, there are no locks on it.
+ return;
+ }
+
+ final long jobSlot = findOrAllocJobSlot(jobId);
+
+ boolean locked = false;
+ while (! locked) {
+ final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+ switch (act) {
+ case UPD:
+ case GET:
+ locked = true;
+ break;
+ case WAIT:
+ case CONV:
+ if (reqSlot == -1) {
+ reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
+ }
+ enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext);
+ break;
+ case ERR:
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ } finally {
+ if (reqSlot != -1) {
+ // deallocate request, if we allocated one earlier
+ reqArenaMgr.deallocate(reqSlot);
+ }
+ group.releaseLatch();
+ }
}
@Override
@@ -223,7 +283,7 @@
if (entityHashValue != -1) {
// get the intention lock on the dataset, if we want to lock an individual item
- byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
+ final byte dsLockMode = LockMode.intentionMode(lockMode);
if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
return false;
@@ -232,35 +292,28 @@
}
}
- long jobSlot = findOrAllocJobSlot(jobId);
+ final long jobSlot = findOrAllocJobSlot(jobId);
- boolean locked = false;
-
- ResourceGroup group = table.get(datasetId, entityHashValue);
+ final ResourceGroup group = table.get(datasetId, entityHashValue);
group.getLatch();
try {
validateJob(txnContext);
- // 1) Find the resource in the hash table
- long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
- // 2) create a request entry
- long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
- // 3) check lock compatibility
+ final long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
+ final long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
- LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+ final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
switch (act) {
case UPD:
resArenaMgr.setMaxMode(resSlot, lockMode);
// no break
case GET:
addHolder(reqSlot, resSlot, jobSlot);
- locked = true;
- break;
+ return true;
case WAIT:
case CONV:
- locked = false;
- break;
+ return false;
default:
throw new IllegalStateException();
}
@@ -270,8 +323,6 @@
// if we did acquire the dataset lock, but not the entity lock, we keep
// it anyway and clean it up at the end of the job
-
- return locked;
}
@Override
@@ -279,11 +330,55 @@
ITransactionContext txnContext) throws ACIDException {
log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
- if (tryLock(datasetId, entityHashValue, lockMode, txnContext)) {
- unlock(datasetId, entityHashValue, lockMode, txnContext);
+ final int dsId = datasetId.getId();
+ final int jobId = txnContext.getJobId().getId();
+
+ if (entityHashValue != -1) {
+ // get the intention lock on the dataset, if we want to lock an individual item
+ final byte dsLockMode = LockMode.intentionMode(lockMode);
+ if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+ if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
+ return false;
+ }
+ dsLockCache.get().put(jobId, dsId, dsLockMode);
+ }
+ }
+
+ final ResourceGroup group = table.get(datasetId, entityHashValue);
+ if (group.firstResourceIndex.get() == -1l) {
+ validateJob(txnContext);
+ // if we do not have a resource in the group, we know that the
+ // resource that we are looking for is not locked
return true;
}
- return false;
+
+ group.getLatch();
+ try {
+ validateJob(txnContext);
+
+ final long resSlot = findResourceInGroup(group, dsId, entityHashValue);
+ if (resSlot < 0) {
+ // if we don't find the resource, there are no locks on it.
+ return true;
+ }
+
+ final long jobSlot = findOrAllocJobSlot(jobId);
+
+ LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+ switch (act) {
+ case UPD:
+ case GET:
+ return true;
+ case WAIT:
+ case CONV:
+ return false;
+ case ERR:
+ default:
+ throw new IllegalStateException();
+ }
+ } finally {
+ group.releaseLatch();
+ }
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
index 1a73fe0..c17689b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
@@ -37,6 +37,15 @@
public static final byte S = 3;
public static final byte X = 4;
+ public static byte intentionMode(byte mode) {
+ switch (mode) {
+ case S: return IS;
+ case X: return IX;
+ default: throw new IllegalArgumentException(
+ "no intention lock mode for " + toString(mode));
+ }
+ }
+
public static String toString(byte mode) {
switch (mode) {
case NL: return "NL";