always lock the whole dataset in PrimaryIndexSearchOperationCallback
manage the dataset lock cache in the lock calls on the datset itself
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
index 9957edf..8de7ca5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
@@ -36,9 +36,8 @@
@Override
public boolean proceed(ITupleReference tuple) throws HyracksDataException {
- int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
try {
- return lockManager.tryLock(datasetId, pkHash, LockMode.S, txnCtx);
+ return lockManager.tryLock(datasetId, -1, LockMode.S, txnCtx);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
@@ -46,9 +45,8 @@
@Override
public void reconcile(ITupleReference tuple) throws HyracksDataException {
- int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
try {
- lockManager.lock(datasetId, pkHash, LockMode.S, txnCtx);
+ lockManager.lock(datasetId, -1, LockMode.S, txnCtx);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
@@ -56,12 +54,7 @@
@Override
public void cancel(ITupleReference tuple) throws HyracksDataException {
- int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
- try {
- lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx);
- } catch (ACIDException e) {
- throw new HyracksDataException(e);
- }
+ //no op
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index ddf409e..915271d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -116,11 +116,10 @@
final int jobId = txnContext.getJobId().getId();
if (entityHashValue != -1) {
- // get the intention lock on the dataset, if we want to lock an individual item
- final byte dsLockMode = LockMode.intentionMode(lockMode);
- if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
- lock(datasetId, -1, dsLockMode, txnContext);
- dsLockCache.get().put(jobId, dsId, dsLockMode);
+ lock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext);
+ } else {
+ if (dsLockCache.get().contains(jobId, dsId, lockMode)) {
+ return;
}
}
@@ -153,6 +152,9 @@
throw new IllegalStateException();
}
}
+ if (entityHashValue == -1) {
+ dsLockCache.get().put(jobId, dsId, lockMode);
+ }
} finally {
group.releaseLatch();
}
@@ -218,12 +220,9 @@
final int jobId = txnContext.getJobId().getId();
if (entityHashValue != -1) {
- // get the intention lock on the dataset, if we want to lock an individual item
- final byte dsLockMode = LockMode.intentionMode(lockMode);
- if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
- lock(datasetId, -1, dsLockMode, txnContext);
- dsLockCache.get().put(jobId, dsId, dsLockMode);
- }
+ lock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext);
+ } else {
+ throw new UnsupportedOperationException("instant locks are not supported on datasets");
}
final ResourceGroup group = table.get(dsId, entityHashValue);
@@ -287,13 +286,12 @@
final int jobId = txnContext.getJobId().getId();
if (entityHashValue != -1) {
- // get the intention lock on the dataset, if we want to lock an individual item
- final byte dsLockMode = LockMode.intentionMode(lockMode);
- if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
- if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
- return false;
- }
- dsLockCache.get().put(jobId, dsId, dsLockMode);
+ if (! tryLock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext)) {
+ return false;
+ }
+ } else {
+ if (dsLockCache.get().contains(jobId, dsId, lockMode)) {
+ return true;
}
}
@@ -307,7 +305,7 @@
final long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
final long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
-
+
final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
switch (act) {
case UPD:
@@ -315,6 +313,9 @@
// no break
case GET:
addHolder(reqSlot, resSlot, jobSlot);
+ if (entityHashValue == -1) {
+ dsLockCache.get().put(jobId, dsId, lockMode);
+ }
return true;
case WAIT:
case CONV:
@@ -325,7 +326,7 @@
} finally {
group.releaseLatch();
}
-
+
// if we did acquire the dataset lock, but not the entity lock, we keep
// it anyway and clean it up at the end of the job
}
@@ -340,14 +341,11 @@
final int jobId = txnContext.getJobId().getId();
if (entityHashValue != -1) {
- // get the intention lock on the dataset, if we want to lock an individual item
- final byte dsLockMode = LockMode.intentionMode(lockMode);
- if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
- if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
- return false;
- }
- dsLockCache.get().put(jobId, dsId, dsLockMode);
+ if (! tryLock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext)) {
+ return false;
}
+ } else {
+ throw new UnsupportedOperationException("instant locks are not supported on datasets");
}
final ResourceGroup group = table.get(dsId, entityHashValue);