add simple deadlock detection
add JobArenaManager
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
index 7c932f4..2768916 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
@@ -83,6 +83,14 @@
request.addField("next request", RecordType.Type.INT, null);
typeMap.put(request.name, request);
+
+ RecordType job = new RecordType("Job");
+ job.addField("job id", RecordType.Type.INT, null);
+ job.addField("last holder", RecordType.Type.INT, "-1");
+ job.addField("last waiter", RecordType.Type.INT, "-1");
+ job.addField("last upgrader", RecordType.Type.INT, "-1");
+
+ typeMap.put(job.name, job);
}
public void execute() throws MojoExecutionException {
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index 67dc5a0..8b2eb06 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -40,7 +40,11 @@
<configuration>
<arenaManagerTemplate>ArenaManager.java</arenaManagerTemplate>
<recordManagerTemplate>RecordManager.java</recordManagerTemplate>
- <recordTypes><param>Request</param><param>Resource</param></recordTypes>
+ <recordTypes>
+ <param>Job</param>
+ <param>Request</param>
+ <param>Resource</param>
+ </recordTypes>
<outputDir>${project.build.directory}/generated-sources/java/edu/uci/ics/asterix/transaction/management/service/locking</outputDir>
</configuration>
<executions>
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index a208a93..d34d301 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -50,7 +50,8 @@
private ResourceGroupTable table;
private ResourceArenaManager resArenaMgr;
private RequestArenaManager reqArenaMgr;
- private ConcurrentHashMap<Integer, Integer> jobReqMap; // TODO different impl
+ private JobArenaManager jobArenaMgr;
+ private ConcurrentHashMap<Integer, Integer> jobIdSlotMap;
enum LockAction {
GET,
@@ -77,7 +78,8 @@
resArenaMgr = new ResourceArenaManager();
reqArenaMgr = new RequestArenaManager();
- jobReqMap = new ConcurrentHashMap<>();
+ jobArenaMgr = new JobArenaManager();
+ jobIdSlotMap = new ConcurrentHashMap<>();
}
public AsterixTransactionProperties getTransactionProperties() {
@@ -95,16 +97,17 @@
}
int dsId = datasetId.getId();
- int jobId = txnContext.getJobId().getId();
-
+
+ int jobSlot = getJobSlot(txnContext.getJobId().getId());
+
ResourceGroup group = table.get(datasetId, entityHashValue);
group.getLatch();
try {
// 1) Find the resource in the hash table
- int resSlot = findResource(group, dsId, entityHashValue);
+ int resSlot = getResourceSlot(group, dsId, entityHashValue);
// 2) create a request entry
- int reqSlot = createRequest(resSlot, jobId, lockMode);
+ int reqSlot = getRequestSlot(resSlot, jobSlot, lockMode);
// 3) check lock compatibility
boolean locked = false;
@@ -115,16 +118,20 @@
resArenaMgr.setMaxMode(resSlot, lockMode);
// no break
case GET:
- addHolderToResource(resSlot, reqSlot);
+ addHolder(reqSlot, resSlot, jobSlot);
locked = true;
break;
case WAIT:
// TODO can we have more than on upgrader? Or do we need to
// abort if we get a second upgrader?
- if (findLastHolderForJob(resSlot, jobId) != -1) {
- addUpgraderToResource(resSlot, reqSlot);
+ if (findLastHolderForJob(resSlot, jobSlot) != -1) {
+ addUpgrader(reqSlot, resSlot, jobSlot);
} else {
- addWaiterToResource(resSlot, reqSlot);
+ if (! introducesDeadlock(resSlot, jobSlot)) {
+ addWaiter(reqSlot, resSlot, jobSlot);
+ } else {
+ requestAbort(txnContext);
+ }
}
try {
group.await();
@@ -134,14 +141,40 @@
break;
default:
throw new IllegalStateException();
- }
- // TODO where do we check for deadlocks?
+ }
}
} finally {
group.releaseLatch();
}
}
+ /**
+ * determine if adding a job to the waiters of a resource will introduce a
+ * cycle in the wait-graph where the job waits on itself
+ * @param resSlot the slot that contains the information about the resource
+ * @param jobSlot the slot that contains the information about the job
+ * @return true if a cycle would be introduced, false otherwise
+ */
+ private boolean introducesDeadlock(int resSlot, int jobSlot) {
+ int reqSlot = resArenaMgr.getLastHolder(resSlot);
+ while (reqSlot >= 0) {
+ int holderJobSlot = reqArenaMgr.getJobId(reqSlot);
+ if (holderJobSlot == jobSlot) {
+ return true;
+ }
+ int waiter = jobArenaMgr.getLastWaiter(holderJobSlot);
+ while (waiter >= 0) {
+ int watingOnResSlot = reqArenaMgr.getResourceId(waiter);
+ if (introducesDeadlock(watingOnResSlot, jobSlot)) {
+ return true;
+ }
+ waiter = reqArenaMgr.getNextJobRequest(waiter);
+ }
+ reqSlot = reqArenaMgr.getNextRequest(reqSlot);
+ }
+ return false;
+ }
+
@Override
public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
@@ -162,16 +195,16 @@
}
int dsId = datasetId.getId();
- int jobId = txnContext.getJobId().getId();
-
+ int jobSlot = getJobSlot(txnContext.getJobId().getId());
+
ResourceGroup group = table.get(datasetId, entityHashValue);
group.getLatch();
try {
// 1) Find the resource in the hash table
- int resSlot = findResource(group, dsId, entityHashValue);
+ int resSlot = getResourceSlot(group, dsId, entityHashValue);
// 2) create a request entry
- int reqSlot = createRequest(resSlot, jobId, lockMode);
+ int reqSlot = getRequestSlot(resSlot, jobSlot, lockMode);
// 3) check lock compatibility
int curLockMode = resArenaMgr.getMaxMode(resSlot);
@@ -180,7 +213,7 @@
resArenaMgr.setMaxMode(resSlot, lockMode);
// no break
case GET:
- addHolderToResource(resSlot, reqSlot);
+ addHolder(reqSlot, resSlot, jobSlot);
return true;
case WAIT:
return false;
@@ -217,6 +250,7 @@
}
int jobId = txnContext.getJobId().getId();
+ int jobSlot = getJobSlot(jobId);
// since locking is properly nested, finding the last holder is good enough
int holder = resArenaMgr.getLastHolder(resource);
@@ -225,7 +259,7 @@
}
// remove from the list of holders
- if (reqArenaMgr.getJobId(holder) == jobId) {
+ if (reqArenaMgr.getJobId(holder) == jobSlot) {
int next = reqArenaMgr.getNextRequest(holder);
resArenaMgr.setLastHolder(resource, next);
} else {
@@ -235,7 +269,7 @@
if (holder == -1) {
throw new IllegalStateException("no holder for resource " + resource + " and job " + jobId);
}
- if (jobId == reqArenaMgr.getJobId(holder)) {
+ if (jobSlot == reqArenaMgr.getJobId(holder)) {
break;
}
prev = holder;
@@ -248,11 +282,7 @@
int prevForJob = reqArenaMgr.getPrevJobRequest(holder);
int nextForJob = reqArenaMgr.getNextJobRequest(holder);
if (prevForJob == -1) {
- if (nextForJob == -1) {
- jobReqMap.remove(jobId);
- } else {
- jobReqMap.put(jobId, nextForJob);
- }
+ jobArenaMgr.setLastHolder(jobSlot, nextForJob);
} else {
reqArenaMgr.setNextJobRequest(prevForJob, nextForJob);
}
@@ -276,7 +306,7 @@
resArenaMgr.deallocate(resource);
} else {
final int oldMaxMode = resArenaMgr.getMaxMode(resource);
- final int newMaxMode = getNewMaxMode(resource, oldMaxMode);
+ final int newMaxMode = determineNewMaxMode(resource, oldMaxMode);
resArenaMgr.setMaxMode(resource, newMaxMode);
if (oldMaxMode != newMaxMode) {
// the locking mode didn't change, current waiters won't be
@@ -296,17 +326,36 @@
@Override
public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
int jobId = txnContext.getJobId().getId();
- Integer head = jobReqMap.get(jobId);
- while (head != null) {
- int resource = reqArenaMgr.getResourceId(head);
+ Integer jobSlot = jobIdSlotMap.get(jobId);
+ int holder = jobArenaMgr.getLastHolder(jobSlot);
+ while (holder != -1) {
+ int resource = reqArenaMgr.getResourceId(holder);
int dsId = resArenaMgr.getDatasetId(resource);
int pkHashVal = resArenaMgr.getPkHashVal(resource);
unlock(new DatasetId(dsId), pkHashVal, txnContext);
- head = jobReqMap.get(jobId);
+ holder = jobArenaMgr.getLastHolder(jobSlot);
}
+ jobArenaMgr.deallocate(jobSlot);
}
- private int findResource(ResourceGroup group, int dsId, int entityHashValue) {
+ private int getJobSlot(int jobId) {
+ Integer jobSlot = jobIdSlotMap.get(jobId);
+ if (jobSlot == null) {
+ jobSlot = new Integer(jobArenaMgr.allocate());
+ Integer oldSlot = jobIdSlotMap.putIfAbsent(jobId, jobSlot);
+ if (oldSlot != null) {
+ // if another thread allocated a slot for this jobId between
+ // get(..) and putIfAbsent(..), we'll use that slot and
+ // deallocate the one we allocated
+ jobArenaMgr.deallocate(jobSlot);
+ jobSlot = oldSlot;
+ }
+ }
+ assert(jobSlot >= 0);
+ return jobSlot;
+ }
+
+ private int getResourceSlot(ResourceGroup group, int dsId, int entityHashValue) {
int resSlot = findResourceInGroup(group, dsId, entityHashValue);
if (resSlot == -1) {
@@ -322,29 +371,11 @@
return resSlot;
}
- private int createRequest(int resSlot, int jobId, byte lockMode) {
+ private int getRequestSlot(int resSlot, int jobSlot, byte lockMode) {
int reqSlot = reqArenaMgr.allocate();
reqArenaMgr.setResourceId(reqSlot, resSlot);
reqArenaMgr.setLockMode(reqSlot, lockMode); // lock mode is a byte!!
- reqArenaMgr.setJobId(reqSlot, jobId);
-
- int prevHead = -1;
- Integer headOfJobReqQueue = jobReqMap.putIfAbsent(jobId, reqSlot);
- while (headOfJobReqQueue != null) {
- // TODO make sure this works (even if the job gets removed from the table)
- if (jobReqMap.replace(jobId, headOfJobReqQueue, reqSlot)) {
- prevHead = headOfJobReqQueue;
- break;
- }
- headOfJobReqQueue = jobReqMap.putIfAbsent(jobId, reqSlot);
- }
-
- // this goes across arenas
- reqArenaMgr.setNextJobRequest(reqSlot, prevHead);
- reqArenaMgr.setPrevJobRequest(reqSlot, -1);
- if (prevHead >= 0) {
- reqArenaMgr.setPrevJobRequest(prevHead, reqSlot);
- }
+ reqArenaMgr.setJobId(reqSlot, jobSlot);
return reqSlot;
}
@@ -374,13 +405,17 @@
return -1;
}
- private void addHolderToResource(int resource, int request) {
- final int lastHolder = resArenaMgr.getLastHolder(resource);
+ private void addHolder(int request, int resource, int job) {
+ int lastHolder = resArenaMgr.getLastHolder(resource);
reqArenaMgr.setNextRequest(request, lastHolder);
resArenaMgr.setLastHolder(resource, request);
+
+ lastHolder = jobArenaMgr.getLastHolder(job);
+ insertIntoJobQueue(request, lastHolder);
+ jobArenaMgr.setLastHolder(resource, request);
}
- private void addWaiterToResource(int resource, int request) {
+ private void addWaiter(int request, int resource, int job) {
int waiter = resArenaMgr.getFirstWaiter(resource);
reqArenaMgr.setNextRequest(request, -1);
if (waiter == -1) {
@@ -388,9 +423,13 @@
} else {
appendToRequestQueue(waiter, request);
}
+
+ waiter = jobArenaMgr.getLastWaiter(job);
+ insertIntoJobQueue(request, waiter);
+ jobArenaMgr.setLastWaiter(job, request);
}
- private void addUpgraderToResource(int resource, int request) {
+ private void addUpgrader(int request, int resource, int job) {
int upgrader = resArenaMgr.getFirstUpgrader(resource);
reqArenaMgr.setNextRequest(request, -1);
if (upgrader == -1) {
@@ -398,6 +437,18 @@
} else {
appendToRequestQueue(upgrader, request);
}
+
+ upgrader = jobArenaMgr.getLastUpgrader(job);
+ insertIntoJobQueue(request, upgrader);
+ jobArenaMgr.setLastUpgrader(job, request);
+ }
+
+ private void insertIntoJobQueue(int newRequest, int oldRequest) {
+ reqArenaMgr.setNextJobRequest(newRequest, oldRequest);
+ reqArenaMgr.setPrevJobRequest(newRequest, -1);
+ if (oldRequest >= 0) {
+ reqArenaMgr.setPrevJobRequest(oldRequest, newRequest);
+ }
}
private void appendToRequestQueue(int head, int appendee) {
@@ -409,7 +460,7 @@
reqArenaMgr.setNextRequest(head, appendee);
}
- private int getNewMaxMode(int resource, int oldMaxMode) {
+ private int determineNewMaxMode(int resource, int oldMaxMode) {
int newMaxMode = LockMode.NL;
int holder = resArenaMgr.getLastHolder(resource);
while (holder != -1) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index 09d7000..2426927 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -32,6 +32,7 @@
import edu.uci.ics.asterix.common.transactions.ILockManager;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -151,6 +152,7 @@
if (isSuccess) {
log("\n*** Test Passed ***");
}
+ ((LogManager) txnProvider.getLogManager()).stop(false, null);
}
public boolean handleRequest(LockRequest request) throws ACIDException {