fix case where the same job re-acquires the exact same lock
add debugging code
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
index 66f366e..bd8dee4 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
@@ -100,6 +100,11 @@
final Field field = resource.fields.get(i);
field.appendInitializers(sb, indent, 3);
}
+ } else if (line.contains("@CHECK_SLOT@")) {
+ for (int i = 0; i < resource.size(); ++i) {
+ final Field field = resource.fields.get(i);
+ field.appendChecks(sb, indent, 3);
+ }
} else if (line.contains("@PRINT_BUFFER@")) {
resource.appendBufferPrinter(sb, indent, 3);
sb.append('\n');
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
index 1606afc..26ca4c4 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
@@ -50,7 +50,7 @@
}
return sb.toString();
}
-
+
StringBuilder appendMemoryManagerGetMethod(StringBuilder sb, String indent, int level) {
sb = indent(sb, indent, level);
sb.append("public ")
@@ -59,7 +59,11 @@
.append(methodName("get"))
.append("(int slotNum) {\n");
sb = indent(sb, indent, level + 1);
- sb.append("final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;\n");
+ sb.append("final Buffer buf = buffers.get(slotNum / NO_SLOTS);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("buf.checkSlot(slotNum % NO_SLOTS);\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("final ByteBuffer b = buf.bb;\n");
sb = indent(sb, indent, level + 1);
sb.append("return b.")
.append(bbGetter(type))
@@ -155,6 +159,22 @@
return sb;
}
+ StringBuilder appendChecks(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("if (bb.")
+ .append(bbGetter(type))
+ .append("(itemOffset + ")
+ .append(offsetName())
+ .append(") == ")
+ .append(deadMemInitializer(type))
+ .append(") {\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("System.err.println(allocList.get(slotNum));\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ return sb;
+ }
+
String offsetName() {
String words[] = name.split(" ");
assert(words.length > 0);
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
index 01df3f0..801d556 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
@@ -15,6 +15,8 @@
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -178,10 +180,14 @@
}
static class Buffer {
+ public static final boolean TRACK_ALLOC = true;
+
private ByteBuffer bb;
private int freeSlotNum;
private int occupiedSlots = -1; //-1 represents 'deinitialized' state.
+ private ArrayList<Alloc> allocList;
+
Buffer() {
initialize();
}
@@ -195,6 +201,13 @@
setNextFreeSlot(i, i + 1);
}
setNextFreeSlot(NO_SLOTS - 1, -1); //-1 represents EOL(end of link)
+
+ if (TRACK_ALLOC) {
+ allocList = new ArrayList<Alloc>(NO_SLOTS);
+ for (int i = 0; i < NO_SLOTS; ++i) {
+ allocList.add(new Alloc());
+ }
+ }
}
public void deinitialize() {
@@ -219,6 +232,7 @@
freeSlotNum = getNextFreeSlot(slotNum);
@INIT_SLOT@
occupiedSlots++;
+ if (TRACK_ALLOC) allocList.get(slotNum).alloc();
return slotNum;
}
@@ -227,6 +241,7 @@
setNextFreeSlot(slotNum, freeSlotNum);
freeSlotNum = slotNum;
occupiedSlots--;
+ if (TRACK_ALLOC) allocList.get(slotNum).free();
}
public int getNextFreeSlot(int slotNum) {
@@ -252,6 +267,36 @@
append(sb);
return sb.toString();
}
+
+ private void checkSlot(int slotNum) {
+ final int itemOffset = (slotNum % NO_SLOTS) * ITEM_SIZE;
+ // @CHECK_SLOT@
+ }
+
+ static class Alloc {
+ String alloc;
+ String free;
+
+ void alloc() {
+ alloc = getStackTrace();
+ }
+
+ void free() {
+ free = getStackTrace();
+ }
+
+ private String getStackTrace() {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ new Exception().printStackTrace(pw);
+ pw.close();
+ return sw.toString();
+ }
+
+ public String toString() {
+ return "allocation stack:\n" + alloc + "\nfree stack\n" + free;
+ }
+ }
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index b1507eb..9679f86 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -56,16 +56,17 @@
enum LockAction {
GET,
UPD, // special version of GET that updates the max lock mode
- WAIT
+ WAIT,
+ CONV // convert (upgrade) a lock (e.g. from S to X)
}
static LockAction[][] ACTION_MATRIX = {
// new NL IS IX S X
- { LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD }, // NL
- { LockAction.WAIT, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT }, // IS
- { LockAction.WAIT, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT }, // IX
- { LockAction.WAIT, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT }, // S
- { LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT } // X
+ { LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD }, // NL
+ { LockAction.GET, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT }, // IS
+ { LockAction.GET, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT }, // IX
+ { LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT }, // S
+ { LockAction.GET, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT } // X
};
public ConcurrentLockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
@@ -90,6 +91,8 @@
public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
+ log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
if (entityHashValue != -1) {
// get the intention lock on the dataset, if we want to lock an individual item
byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
@@ -113,7 +116,11 @@
while (! locked) {
int curLockMode = resArenaMgr.getMaxMode(resSlot);
- switch (ACTION_MATRIX[curLockMode][lockMode]) {
+ LockAction act = ACTION_MATRIX[curLockMode][lockMode];
+ if (act == LockAction.WAIT) {
+ act = updateActionForSameJob(resSlot, jobSlot, lockMode);
+ }
+ switch (act) {
case UPD:
resArenaMgr.setMaxMode(resSlot, lockMode);
// no break
@@ -122,29 +129,21 @@
locked = true;
break;
case WAIT:
+ if (! introducesDeadlock(resSlot, jobSlot)) {
+ addWaiter(reqSlot, resSlot, jobSlot);
+ } else {
+ requestAbort(txnContext);
+ }
+ group.await(txnContext);
+ removeWaiter(reqSlot, resSlot, jobSlot);
+ break;
+ case CONV:
// TODO can we have more than on upgrader? Or do we need to
// abort if we get a second upgrader?
- boolean upgrade = findLastHolderForJob(resSlot, jobSlot) != -1;
- if (upgrade) {
- addUpgrader(reqSlot, resSlot, jobSlot);
- } else {
- if (! introducesDeadlock(resSlot, jobSlot)) {
- addWaiter(reqSlot, resSlot, jobSlot);
- } else {
- requestAbort(txnContext);
- }
- }
- try {
- group.await();
- if (upgrade) {
- removeUpgrader(reqSlot, resSlot, jobSlot);
- } else {
- removeWaiter(reqSlot, resSlot, jobSlot);
- }
- } catch (InterruptedException e) {
- throw new ACIDException(txnContext, "interrupted", e);
- }
- break;
+ addUpgrader(reqSlot, resSlot, jobSlot);
+ group.await(txnContext);
+ removeUpgrader(reqSlot, resSlot, jobSlot);
+ break;
default:
throw new IllegalStateException();
}
@@ -184,6 +183,8 @@
@Override
public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
+ log("instantLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
lock(datasetId, entityHashValue, lockMode, txnContext);
unlock(datasetId, entityHashValue, txnContext);
}
@@ -191,6 +192,7 @@
@Override
public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
+ log("tryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
if (entityHashValue != -1) {
// get the intention lock on the dataset, if we want to lock an individual item
@@ -203,6 +205,8 @@
int dsId = datasetId.getId();
int jobSlot = getJobSlot(txnContext.getJobId().getId());
+ boolean locked = false;
+
ResourceGroup group = table.get(datasetId, entityHashValue);
group.getLatch();
@@ -212,17 +216,24 @@
// 2) create a request entry
int reqSlot = getRequestSlot(resSlot, jobSlot, lockMode);
// 3) check lock compatibility
-
+
int curLockMode = resArenaMgr.getMaxMode(resSlot);
- switch (ACTION_MATRIX[curLockMode][lockMode]) {
+ LockAction act = ACTION_MATRIX[curLockMode][lockMode];
+ if (act == LockAction.WAIT) {
+ act = updateActionForSameJob(resSlot, jobSlot, lockMode);
+ }
+ switch (act) {
case UPD:
resArenaMgr.setMaxMode(resSlot, lockMode);
// no break
case GET:
addHolder(reqSlot, resSlot, jobSlot);
- return true;
+ locked = true;
+ break;
case WAIT:
- return false;
+ case CONV:
+ locked = false;
+ break;
default:
throw new IllegalStateException();
}
@@ -230,11 +241,20 @@
} finally {
group.releaseLatch();
}
+
+ // if we did acquire the dataset lock, but not the entity lock, we need to remove it
+ if (!locked && entityHashValue != -1) {
+ unlock(datasetId, -1, txnContext);
+ }
+
+ return locked;
}
@Override
public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
ITransactionContext txnContext) throws ACIDException {
+ log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
if (tryLock(datasetId, entityHashValue, lockMode, txnContext)) {
unlock(datasetId, entityHashValue, txnContext);
return true;
@@ -244,49 +264,54 @@
@Override
public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException {
+ log("unlock", datasetId.getId(), entityHashValue, LockMode.NL, txnContext);
ResourceGroup group = table.get(datasetId, entityHashValue);
group.getLatch();
- int dsId = datasetId.getId();
- int resource = findResourceInGroup(group, dsId, entityHashValue);
-
- if (resource < 0) {
- throw new IllegalStateException("resource (" + dsId + ", " + entityHashValue + ") not found");
- }
-
- int jobId = txnContext.getJobId().getId();
- int jobSlot = getJobSlot(jobId);
+ try {
- // since locking is properly nested, finding the last holder for a job is good enough
- int holder = removeLastHolder(resource, jobSlot);
-
- // deallocate request
- reqArenaMgr.deallocate(holder);
- // deallocate resource or fix max lock mode
- if (resourceNotUsed(resource)) {
- int prev = group.firstResourceIndex.get();
- if (prev == resource) {
- group.firstResourceIndex.set(resArenaMgr.getNext(resource));
- } else {
- while (resArenaMgr.getNext(prev) != resource) {
- prev = resArenaMgr.getNext(prev);
+ int dsId = datasetId.getId();
+ int resource = findResourceInGroup(group, dsId, entityHashValue);
+
+ if (resource < 0) {
+ throw new IllegalStateException("resource (" + dsId + ", " + entityHashValue + ") not found");
+ }
+
+ int jobId = txnContext.getJobId().getId();
+ int jobSlot = getJobSlot(jobId);
+
+ // since locking is properly nested, finding the last holder for a job is good enough
+ int holder = removeLastHolder(resource, jobSlot);
+
+ // deallocate request
+ reqArenaMgr.deallocate(holder);
+ // deallocate resource or fix max lock mode
+ if (resourceNotUsed(resource)) {
+ int prev = group.firstResourceIndex.get();
+ if (prev == resource) {
+ group.firstResourceIndex.set(resArenaMgr.getNext(resource));
+ } else {
+ while (resArenaMgr.getNext(prev) != resource) {
+ prev = resArenaMgr.getNext(prev);
+ }
+ resArenaMgr.setNext(prev, resArenaMgr.getNext(resource));
}
- resArenaMgr.setNext(prev, resArenaMgr.getNext(resource));
+ resArenaMgr.deallocate(resource);
+ } else {
+ final int oldMaxMode = resArenaMgr.getMaxMode(resource);
+ final int newMaxMode = determineNewMaxMode(resource, oldMaxMode);
+ resArenaMgr.setMaxMode(resource, newMaxMode);
+ if (oldMaxMode != newMaxMode) {
+ // the locking mode didn't change, current waiters won't be
+ // able to acquire the lock, so we do not need to signal them
+ group.wakeUp();
+ }
}
- resArenaMgr.deallocate(resource);
- } else {
- final int oldMaxMode = resArenaMgr.getMaxMode(resource);
- final int newMaxMode = determineNewMaxMode(resource, oldMaxMode);
- resArenaMgr.setMaxMode(resource, newMaxMode);
- if (oldMaxMode != newMaxMode) {
- // the locking mode didn't change, current waiters won't be
- // able to acquire the lock, so we do not need to signal them
- group.wakeUp();
- }
+ } finally {
+ group.releaseLatch();
}
- group.releaseLatch();
-
+
// finally remove the intention lock as well
if (entityHashValue != -1) {
// remove the intention lock on the dataset, if we want to lock an individual item
@@ -296,8 +321,14 @@
@Override
public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
+ log("releaseLocks", -1, -1, LockMode.NL, txnContext);
+
int jobId = txnContext.getJobId().getId();
Integer jobSlot = jobIdSlotMap.get(jobId);
+ if (jobSlot == null) {
+ // we don't know the job, so there are no locks for it - we're done
+ return;
+ }
int holder = jobArenaMgr.getLastHolder(jobSlot);
while (holder != -1) {
int resource = reqArenaMgr.getResourceId(holder);
@@ -335,10 +366,8 @@
resSlot = resArenaMgr.allocate();
resArenaMgr.setDatasetId(resSlot, dsId);
resArenaMgr.setPkHashVal(resSlot, entityHashValue);
-
- if (group.firstResourceIndex.get() == -1) {
- group.firstResourceIndex.set(resSlot);
- }
+ resArenaMgr.setNext(resSlot, group.firstResourceIndex.get());
+ group.firstResourceIndex.set(resSlot);
}
return resSlot;
}
@@ -362,6 +391,35 @@
return -1;
}
+ /**
+ * when we've got a lock conflict for a different job, we always have to
+ * wait, if it is for the same job we either have to
+ * a) (wait and) convert the lock once conversion becomes viable or
+ * b) acquire the lock if we want to lock the same resource with the same
+ * lock mode for the same job.
+ * @param resource the resource slot that's being locked
+ * @param job the job slot of the job locking the resource
+ * @param lockMode the lock mode that the resource should be locked with
+ * @return
+ */
+ private LockAction updateActionForSameJob(int resource, int job, byte lockMode) {
+ // TODO we can reduce the numer of things we have to look at by carefully
+ // distinguishing the different lock modes
+ int holder = resArenaMgr.getLastHolder(resource);
+ LockAction res = LockAction.WAIT;
+ while (holder != -1) {
+ if (job == reqArenaMgr.getJobId(holder)) {
+ if (reqArenaMgr.getLockMode(holder) == lockMode) {
+ return LockAction.GET;
+ } else {
+ res = LockAction.CONV;
+ }
+ }
+ holder = reqArenaMgr.getNextRequest(holder);
+ }
+ return res;
+ }
+
private int findResourceInGroup(ResourceGroup group, int dsId, int entityHashValue) {
int resSlot = group.firstResourceIndex.get();
while (resSlot != -1) {
@@ -570,6 +628,25 @@
&& resArenaMgr.getFirstWaiter(resource) == -1;
}
+ private void log(String string, int id, int entityHashValue, byte lockMode, ITransactionContext txnContext) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ op : ").append(string);
+ if (id != -1) {
+ sb.append(" , dataset : ").append(id);
+ }
+ if (entityHashValue != -1) {
+ sb.append(" , entity : ").append(entityHashValue);
+ }
+ if (lockMode != LockMode.NL) {
+ sb.append(" , mode : ").append(LockMode.toString(lockMode));
+ }
+ if (txnContext != null) {
+ sb.append(" , jobId : ").append(txnContext.getJobId());
+ }
+ sb.append(" }");
+ //System.err.println("XXX" + sb.toString());
+ }
+
private void validateJob(ITransactionContext txnContext) throws ACIDException {
if (txnContext.getTxnState() == ITransactionManager.ABORTED) {
throw new ACIDException("" + txnContext.getJobId() + " is in ABORTED state.");
@@ -896,10 +973,12 @@
}
void getLatch() {
+ log("latch " + toString());
latch.writeLock().lock();
}
void releaseLatch() {
+ log("release " + toString());
latch.writeLock().unlock();
}
@@ -907,12 +986,28 @@
return latch.hasQueuedThreads();
}
- void await() throws InterruptedException {
- condition.await();
+ void await(ITransactionContext txnContext) throws ACIDException {
+ log("wait for " + toString());
+ try {
+ condition.await();
+ } catch (InterruptedException e) {
+ throw new ACIDException(txnContext, "interrupted", e);
+ }
}
void wakeUp() {
+ log("notify " + toString());
condition.signalAll();
}
+
+ void log(String s) {
+ //System.out.println("XXXX " + s);
+ }
+
+ public String toString() {
+ return "{ id : " + hashCode()
+ + ", first : " + firstResourceIndex.toString()
+ + ", waiters : " + (hasWaiters() ? "true" : "false") + " }";
+ }
}
}