add DeadlockTracker
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 3dc4fa6..ba8d86d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -163,10 +164,13 @@
private void enqueueWaiter(final ResourceGroup group, final long reqSlot, final long resSlot, final long jobSlot,
final LockAction act, ITransactionContext txnContext) throws ACIDException {
final Queue queue = act.modify ? upgrader : waiter;
- if (!introducesDeadlock(resSlot, jobSlot)) {
- queue.add(reqSlot, resSlot, jobSlot);
+ if (introducesDeadlock(resSlot, jobSlot, NOPTracker.INSTANCE)) {
+ DeadlockTracker tracker = new CollectingTracker();
+ tracker.pushJob(jobSlot);
+ introducesDeadlock(resSlot, jobSlot, tracker);
+ requestAbort(txnContext, tracker.toString());
} else {
- requestAbort(txnContext);
+ queue.add(reqSlot, resSlot, jobSlot);
}
try {
group.await(txnContext);
@@ -175,6 +179,64 @@
}
}
+ interface DeadlockTracker {
+ void pushResource(long resSlot);
+ void pushRequest(long reqSlot);
+ void pushJob(long jobSlot);
+ void pop();
+ }
+
+ static class NOPTracker implements DeadlockTracker {
+ static final DeadlockTracker INSTANCE = new NOPTracker();
+
+ public void pushResource(long resSlot) {}
+ public void pushRequest(long reqSlot) {}
+ public void pushJob(long jobSlot) {}
+ public void pop() {}
+ }
+
+ static class CollectingTracker implements DeadlockTracker {
+ ArrayList<Long> slots = new ArrayList<Long>();
+ ArrayList<String> types = new ArrayList<String>();
+
+ @Override
+ public void pushResource(long resSlot) {
+ types.add("Resource");
+ slots.add(resSlot);
+ System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ }
+
+ @Override
+ public void pushRequest(long reqSlot) {
+ types.add("Request");
+ slots.add(reqSlot);
+ System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ }
+
+ @Override
+ public void pushJob(long jobSlot) {
+ types.add("Job");
+ slots.add(jobSlot);
+ System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ }
+
+ @Override
+ public void pop() {
+ System.err.println("pop " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ types.remove(types.size() - 1);
+ slots.remove(slots.size() - 1);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < slots.size(); ++i) {
+ sb.append(types.get(i) + " " + slots.get(i) + "\n");
+ }
+ return sb.toString();
+ }
+ }
+
/**
* determine if adding a job to the waiters of a resource will introduce a
* cycle in the wait-graph where the job waits on itself
@@ -185,11 +247,15 @@
* the slot that contains the information about the job
* @return true if a cycle would be introduced, false otherwise
*/
- private boolean introducesDeadlock(final long resSlot, final long jobSlot) {
+ private boolean introducesDeadlock(final long resSlot, final long jobSlot,
+ final DeadlockTracker tracker) {
synchronized (jobArenaMgr) {
+ tracker.pushResource(resSlot);
long reqSlot = resArenaMgr.getLastHolder(resSlot);
while (reqSlot >= 0) {
- long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
+ tracker.pushRequest(reqSlot);
+ final long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
+ tracker.pushJob(holderJobSlot);
if (holderJobSlot == jobSlot) {
return true;
}
@@ -197,7 +263,7 @@
long waiter = jobArenaMgr.getLastWaiter(holderJobSlot);
while (waiter >= 0) {
long watingOnResSlot = reqArenaMgr.getResourceId(waiter);
- if (introducesDeadlock(watingOnResSlot, jobSlot)) {
+ if (introducesDeadlock(watingOnResSlot, jobSlot, tracker)) {
return true;
}
waiter = reqArenaMgr.getNextJobRequest(waiter);
@@ -206,12 +272,15 @@
waiter = jobArenaMgr.getLastUpgrader(holderJobSlot);
}
}
+ tracker.pop(); // job
+ tracker.pop(); // request
reqSlot = reqArenaMgr.getNextRequest(reqSlot);
}
+ tracker.pop(); // resource
return false;
}
}
-
+
@Override
public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
@@ -834,14 +903,14 @@
if (txnContext.getTxnState() == ITransactionManager.ABORTED) {
throw new ACIDException("" + txnContext.getJobId() + " is in ABORTED state.");
} else if (txnContext.isTimeout()) {
- requestAbort(txnContext);
+ requestAbort(txnContext, "timeout");
}
}
- private void requestAbort(ITransactionContext txnContext) throws ACIDException {
+ private void requestAbort(ITransactionContext txnContext, String msg) throws ACIDException {
txnContext.setTimeout(true);
throw new ACIDException("Transaction " + txnContext.getJobId()
- + " should abort (requested by the Lock Manager)");
+ + " should abort (requested by the Lock Manager)" + ":\n" + msg);
}
public StringBuilder append(StringBuilder sb) {