add implementations for instantLock, tryLock, and instantTryLock
refactor lock(...)
remove references to LockManager.IS_DEBUG_MODE
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index dbb56c5..a208a93 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -95,56 +95,85 @@
}
int dsId = datasetId.getId();
+ int jobId = txnContext.getJobId().getId();
ResourceGroup group = table.get(datasetId, entityHashValue);
group.getLatch();
- // 1) Find the resource in the hash table
-
- int resSlot = findResourceInGroup(group, dsId, entityHashValue);
-
- if (resSlot == -1) {
- // we don't know about this resource, let's alloc a slot
- resSlot = resArenaMgr.allocate();
- resArenaMgr.setDatasetId(resSlot, datasetId.getId());
- resArenaMgr.setPkHashVal(resSlot, entityHashValue);
+ try {
+ // 1) Find the resource in the hash table
+ int resSlot = findResource(group, dsId, entityHashValue);
+ // 2) create a request entry
+ int reqSlot = createRequest(resSlot, jobId, lockMode);
+ // 3) check lock compatibility
+ boolean locked = false;
- if (group.firstResourceIndex.get() == -1) {
- group.firstResourceIndex.set(resSlot);
+ while (! locked) {
+ int curLockMode = resArenaMgr.getMaxMode(resSlot);
+ switch (ACTION_MATRIX[curLockMode][lockMode]) {
+ case UPD:
+ resArenaMgr.setMaxMode(resSlot, lockMode);
+ // no break
+ case GET:
+ addHolderToResource(resSlot, reqSlot);
+ locked = true;
+ break;
+ case WAIT:
+ // TODO can we have more than on upgrader? Or do we need to
+ // abort if we get a second upgrader?
+ if (findLastHolderForJob(resSlot, jobId) != -1) {
+ addUpgraderToResource(resSlot, reqSlot);
+ } else {
+ addWaiterToResource(resSlot, reqSlot);
+ }
+ try {
+ group.await();
+ } catch (InterruptedException e) {
+ throw new ACIDException(txnContext, "interrupted", e);
+ }
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ // TODO where do we check for deadlocks?
+ }
+ } finally {
+ group.releaseLatch();
+ }
+ }
+
+ @Override
+ public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+ throws ACIDException {
+ lock(datasetId, entityHashValue, lockMode, txnContext);
+ unlock(datasetId, entityHashValue, txnContext);
+ }
+
+ @Override
+ public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+ throws ACIDException {
+
+ if (entityHashValue != -1) {
+ // get the intention lock on the dataset, if we want to lock an individual item
+ byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
+ if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
+ return false;
}
}
-
- // 2) create a request entry
+ int dsId = datasetId.getId();
int jobId = txnContext.getJobId().getId();
- int reqSlot = reqArenaMgr.allocate();
- reqArenaMgr.setResourceId(reqSlot, resSlot);
- reqArenaMgr.setLockMode(reqSlot, lockMode); // lock mode is a byte!!
- reqArenaMgr.setJobId(reqSlot, jobId);
-
- int prevHead = -1;
- Integer headOfJobReqQueue = jobReqMap.putIfAbsent(jobId, reqSlot);
- while (headOfJobReqQueue != null) {
- // TODO make sure this works (even if the job gets removed from the table)
- if (jobReqMap.replace(jobId, headOfJobReqQueue, reqSlot)) {
- prevHead = headOfJobReqQueue;
- break;
- }
- headOfJobReqQueue = jobReqMap.putIfAbsent(jobId, reqSlot);
- }
- // this goes across arenas
- reqArenaMgr.setNextJobRequest(reqSlot, prevHead);
- reqArenaMgr.setPrevJobRequest(reqSlot, -1);
- if (prevHead >= 0) {
- reqArenaMgr.setPrevJobRequest(prevHead, reqSlot);
- }
-
- // 3) check lock compatibility
-
- boolean locked = false;
-
- while (! locked) {
+ ResourceGroup group = table.get(datasetId, entityHashValue);
+ group.getLatch();
+
+ try {
+ // 1) Find the resource in the hash table
+ int resSlot = findResource(group, dsId, entityHashValue);
+ // 2) create a request entry
+ int reqSlot = createRequest(resSlot, jobId, lockMode);
+ // 3) check lock compatibility
+
int curLockMode = resArenaMgr.getMaxMode(resSlot);
switch (ACTION_MATRIX[curLockMode][lockMode]) {
case UPD:
@@ -152,30 +181,26 @@
// no break
case GET:
addHolderToResource(resSlot, reqSlot);
- locked = true;
- break;
+ return true;
case WAIT:
- // TODO can we have more than on upgrader? Or do we need to
- // abort if we get a second upgrader?
- if (findLastHolderForJob(resSlot, jobId) != -1) {
- addUpgraderToResource(resSlot, reqSlot);
- } else {
- addWaiterToResource(resSlot, reqSlot);
- }
- try {
- group.await();
- } catch (InterruptedException e) {
- throw new ACIDException(txnContext, "interrupted", e);
- }
- break;
+ return false;
+ default:
+ throw new IllegalStateException();
}
-
// TODO where do we check for deadlocks?
+ } finally {
+ group.releaseLatch();
}
-
- group.releaseLatch();
-
- //internalLock(datasetId, entityHashValue, lockMode, txnContext, false);
+ }
+
+ @Override
+ public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
+ ITransactionContext txnContext) throws ACIDException {
+ if (tryLock(datasetId, entityHashValue, lockMode, txnContext)) {
+ unlock(datasetId, entityHashValue, txnContext);
+ return true;
+ }
+ return false;
}
@Override
@@ -266,8 +291,6 @@
// remove the intention lock on the dataset, if we want to lock an individual item
unlock(datasetId, -1, txnContext);
}
-
- //internalUnlock(datasetId, entityHashValue, txnContext, false, false);
}
@Override
@@ -283,6 +306,48 @@
}
}
+ private int findResource(ResourceGroup group, int dsId, int entityHashValue) {
+ int resSlot = findResourceInGroup(group, dsId, entityHashValue);
+
+ if (resSlot == -1) {
+ // we don't know about this resource, let's alloc a slot
+ resSlot = resArenaMgr.allocate();
+ resArenaMgr.setDatasetId(resSlot, dsId);
+ resArenaMgr.setPkHashVal(resSlot, entityHashValue);
+
+ if (group.firstResourceIndex.get() == -1) {
+ group.firstResourceIndex.set(resSlot);
+ }
+ }
+ return resSlot;
+ }
+
+ private int createRequest(int resSlot, int jobId, byte lockMode) {
+ int reqSlot = reqArenaMgr.allocate();
+ reqArenaMgr.setResourceId(reqSlot, resSlot);
+ reqArenaMgr.setLockMode(reqSlot, lockMode); // lock mode is a byte!!
+ reqArenaMgr.setJobId(reqSlot, jobId);
+
+ int prevHead = -1;
+ Integer headOfJobReqQueue = jobReqMap.putIfAbsent(jobId, reqSlot);
+ while (headOfJobReqQueue != null) {
+ // TODO make sure this works (even if the job gets removed from the table)
+ if (jobReqMap.replace(jobId, headOfJobReqQueue, reqSlot)) {
+ prevHead = headOfJobReqQueue;
+ break;
+ }
+ headOfJobReqQueue = jobReqMap.putIfAbsent(jobId, reqSlot);
+ }
+
+ // this goes across arenas
+ reqArenaMgr.setNextJobRequest(reqSlot, prevHead);
+ reqArenaMgr.setPrevJobRequest(reqSlot, -1);
+ if (prevHead >= 0) {
+ reqArenaMgr.setPrevJobRequest(prevHead, reqSlot);
+ }
+ return reqSlot;
+ }
+
private int findLastHolderForJob(int resource, int job) {
int holder = resArenaMgr.getLastHolder(resource);
while (holder != -1) {
@@ -371,8 +436,6 @@
&& resArenaMgr.getFirstUpgrader(resource) == -1
&& resArenaMgr.getFirstWaiter(resource) == -1;
}
-
-
private void validateJob(ITransactionContext txnContext) throws ACIDException {
if (txnContext.getTxnState() == ITransactionManager.ABORTED) {
@@ -382,48 +445,6 @@
}
}
- //@Override
- public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
- throws ACIDException {
- throw new IllegalStateException();
- //internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
- }
-
- private void instantUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
- throws ACIDException {
- throw new IllegalStateException();
- //internalUnlock(datasetId, entityHashValue, txnContext, true, false);
- }
-
- @Override
- public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
- throws ACIDException {
-
- // try {
- // internalLock(datasetId, entityHashValue, lockMode, txnContext);
- // return;
- // } finally {
- // unlock(datasetId, entityHashValue, txnContext);
- // }
- throw new IllegalStateException();
- //internalLock(datasetId, entityHashValue, lockMode, txnContext, true);
- //instantUnlock(datasetId, entityHashValue, txnContext);
- }
-
- @Override
- public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
- throws ACIDException {
- throw new IllegalStateException();
- //return internalTryLock(datasetId, entityHashValue, lockMode, txnContext, false);
- }
-
- @Override
- public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
- ITransactionContext txnContext) throws ACIDException {
- throw new IllegalStateException();
- //return internalInstantTryLock(datasetId, entityHashValue, lockMode, txnContext);
- }
-
private void trackLockRequest(String msg, int requestType, DatasetId datasetIdObj, int entityHashValue,
byte lockMode, ITransactionContext txnContext, DatasetLockInfo dLockInfo, int eLockInfo) {
/*
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestMemoryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestMemoryManager.java
index e0bee72..2d47185 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestMemoryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestMemoryManager.java
@@ -269,10 +269,6 @@
int currentSlot = freeSlotNum;
freeSlotNum = getNextFreeSlot(currentSlot);
occupiedSlots++;
- if (LockManager.IS_DEBUG_MODE) {
- System.out.println(Thread.currentThread().getName()
- + " allocate: " + currentSlot);
- }
return currentSlot;
}
@@ -280,10 +276,6 @@
setNextFreeSlot(slotNum, freeSlotNum);
freeSlotNum = slotNum;
occupiedSlots--;
- if (LockManager.IS_DEBUG_MODE) {
- System.out.println(Thread.currentThread().getName()
- + " deallocate: " + slotNum);
- }
}
public int getNextFreeSlot(int slotNum) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceMemoryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceMemoryManager.java
index a61c221..aeeb395 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceMemoryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceMemoryManager.java
@@ -284,10 +284,6 @@
bb.putInt(currentSlot * ITEM_SIZE + FIRST_UPGRADER_OFF, -1);
bb.putInt(currentSlot * ITEM_SIZE + MAX_MODE_OFF, LockMode.NL);
occupiedSlots++;
- if (LockManager.IS_DEBUG_MODE) {
- System.out.println(Thread.currentThread().getName()
- + " allocate: " + currentSlot);
- }
return currentSlot;
}
@@ -295,10 +291,6 @@
setNextFreeSlot(slotNum, freeSlotNum);
freeSlotNum = slotNum;
occupiedSlots--;
- if (LockManager.IS_DEBUG_MODE) {
- System.out.println(Thread.currentThread().getName()
- + " deallocate: " + slotNum);
- }
}
public int getNextFreeSlot(int slotNum) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/StructuredMemoryManager.java.txt b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/StructuredMemoryManager.java.txt
index e79093b..0f82d72 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/StructuredMemoryManager.java.txt
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/StructuredMemoryManager.java.txt
@@ -204,10 +204,6 @@
freeSlotNum = getNextFreeSlot(currentSlot);
@INIT_SLOT@
occupiedSlots++;
- if (LockManager.IS_DEBUG_MODE) {
- System.out.println(Thread.currentThread().getName()
- + " allocate: " + currentSlot);
- }
return currentSlot;
}
@@ -215,10 +211,6 @@
setNextFreeSlot(slotNum, freeSlotNum);
freeSlotNum = slotNum;
occupiedSlots--;
- if (LockManager.IS_DEBUG_MODE) {
- System.out.println(Thread.currentThread().getName()
- + " deallocate: " + slotNum);
- }
}
public int getNextFreeSlot(int slotNum) {