include upgraders in deadlock detection
continue walking though buffers when allocating in the RecordManager (instead of starting over every time)
throw if a NL lock is requested
adapt NO_SLOTS
give 2 dimensions to LockAction enum
some factorization of lock and tryLock methods
use java.util.logging in the ConcurrentLockManager
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
index eeaf27a..9afc673 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
@@ -23,7 +23,7 @@
public static final boolean CHECK_SLOTS = @DEBUG@;
public static final boolean TRACK_ALLOC_LOC = @DEBUG@;
- static final int NO_SLOTS = 10;
+ static final int NO_SLOTS = 1000;
@CONSTS@
@@ -54,10 +54,12 @@
synchronized int allocate() {
if (buffers.get(current).isFull()) {
final int size = buffers.size();
+ final int start = current + 1;
SlotSource source = SlotSource.NEW;
- for (int i = 0; i < size; ++i) {
+ for (int j = start; j < start + size; ++j) {
// If we find a buffer with space, we use it. Otherwise we
// remember the first uninitialized one and use that one.
+ final int i = j % size;
final Buffer buffer = buffers.get(i);
if (buffer.isInitialized() && ! buffer.isFull()) {
source = SlotSource.NON_FULL;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 717f6c4..9f98680 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -22,6 +22,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
@@ -41,6 +43,8 @@
public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent {
+ private static final Logger LOGGER = Logger.getLogger(ConcurrentLockManager.class.getName());
+
public static final boolean IS_DEBUG_MODE = false;//true
private TransactionSubsystem txnSubsystem;
@@ -53,19 +57,27 @@
private ThreadLocal<DatasetLockCache> dsLockCache;
enum LockAction {
- GET,
- UPD, // special version of GET that updates the max lock mode
- WAIT,
- CONV // convert (upgrade) a lock (e.g. from S to X)
+ ERR(false, false),
+ GET(false, false),
+ UPD(false, true), // special version of GET that updates the max lock mode
+ WAIT(true, false),
+ CONV(true, true) // convert (upgrade) a lock (e.g. from S to X)
+ ;
+ boolean wait;
+ boolean modify;
+ LockAction(boolean wait, boolean modify) {
+ this.wait = wait;
+ this.modify = modify;
+ }
}
static LockAction[][] ACTION_MATRIX = {
// new NL IS IX S X
- { LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD }, // NL
- { LockAction.GET, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT }, // IS
- { LockAction.GET, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT }, // IX
- { LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT }, // S
- { LockAction.GET, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT } // X
+ { LockAction.ERR, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD }, // NL
+ { LockAction.ERR, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT }, // IS
+ { LockAction.ERR, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT }, // IX
+ { LockAction.ERR, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT }, // S
+ { LockAction.ERR, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT } // X
};
public ConcurrentLockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
@@ -127,11 +139,7 @@
boolean locked = false;
while (! locked) {
- int curLockMode = resArenaMgr.getMaxMode(resSlot);
- LockAction act = ACTION_MATRIX[curLockMode][lockMode];
- if (act == LockAction.WAIT) {
- act = updateActionForSameJob(resSlot, jobSlot, lockMode);
- }
+ LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
switch (act) {
case UPD:
resArenaMgr.setMaxMode(resSlot, lockMode);
@@ -141,21 +149,20 @@
locked = true;
break;
case WAIT:
+ case CONV:
+ final Queue queue = act.modify ? Upgrader : Waiter;
if (! introducesDeadlock(resSlot, jobSlot)) {
- addWaiter(reqSlot, resSlot, jobSlot);
+ queue.add(reqSlot, resSlot, jobSlot);
} else {
requestAbort(txnContext);
}
- group.await(txnContext);
- removeWaiter(reqSlot, resSlot, jobSlot);
+ try {
+ group.await(txnContext);
+ } finally {
+ queue.remove(reqSlot, resSlot, jobSlot);
+ }
break;
- case CONV:
- // TODO can we have more than on upgrader? Or do we need to
- // abort if we get a second upgrader?
- addUpgrader(reqSlot, resSlot, jobSlot);
- group.await(txnContext);
- removeUpgrader(reqSlot, resSlot, jobSlot);
- break;
+ case ERR:
default:
throw new IllegalStateException();
}
@@ -179,6 +186,7 @@
if (holderJobSlot == jobSlot) {
return true;
}
+ boolean scanWaiters = true;
long waiter = jobArenaMgr.getLastWaiter(holderJobSlot);
while (waiter >= 0) {
long watingOnResSlot = reqArenaMgr.getResourceId(waiter);
@@ -186,6 +194,10 @@
return true;
}
waiter = reqArenaMgr.getNextJobRequest(waiter);
+ if (waiter < 0 && scanWaiters) {
+ scanWaiters = false;
+ waiter = jobArenaMgr.getLastUpgrader(holderJobSlot);
+ }
}
reqSlot = reqArenaMgr.getNextRequest(reqSlot);
}
@@ -236,11 +248,7 @@
long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
// 3) check lock compatibility
- int curLockMode = resArenaMgr.getMaxMode(resSlot);
- LockAction act = ACTION_MATRIX[curLockMode][lockMode];
- if (act == LockAction.WAIT) {
- act = updateActionForSameJob(resSlot, jobSlot, lockMode);
- }
+ LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
switch (act) {
case UPD:
resArenaMgr.setMaxMode(resSlot, lockMode);
@@ -396,6 +404,15 @@
return reqSlot;
}
+ private LockAction determineLockAction(long resSlot, long jobSlot, byte lockMode) {
+ final int curLockMode = resArenaMgr.getMaxMode(resSlot);
+ final LockAction act = ACTION_MATRIX[curLockMode][lockMode];
+ if (act == LockAction.WAIT) {
+ return updateActionForSameJob(resSlot, jobSlot, lockMode);
+ }
+ return act;
+ }
+
/**
* when we've got a lock conflict for a different job, we always have to
* wait, if it is for the same job we either have to
@@ -495,67 +512,73 @@
}
}
- private void addWaiter(long request, long resource, long job) {
- long waiter = resArenaMgr.getFirstWaiter(resource);
- reqArenaMgr.setNextRequest(request, -1);
- if (waiter == -1) {
- resArenaMgr.setFirstWaiter(resource, request);
- } else {
- appendToRequestQueue(waiter, request);
+ interface Queue {
+ void add(long request, long resource, long job);
+ void remove(long request, long resource, long job);
+ }
+
+ Queue Waiter = new Queue() {
+ public void add(long request, long resource, long job) {
+ long waiter = resArenaMgr.getFirstWaiter(resource);
+ reqArenaMgr.setNextRequest(request, -1);
+ if (waiter == -1) {
+ resArenaMgr.setFirstWaiter(resource, request);
+ } else {
+ appendToRequestQueue(waiter, request);
+ }
+ synchronized (jobArenaMgr) {
+ waiter = jobArenaMgr.getLastWaiter(job);
+ insertIntoJobQueue(request, waiter);
+ jobArenaMgr.setLastWaiter(job, request);
+ }
}
+ public void remove(long request, long resource, long job) {
+ long waiter = resArenaMgr.getFirstWaiter(resource);
+ if (waiter == request) {
+ long next = reqArenaMgr.getNextRequest(waiter);
+ resArenaMgr.setFirstWaiter(resource, next);
+ } else {
+ waiter = removeRequestFromQueueForSlot(waiter, request);
+ }
+ synchronized (jobArenaMgr) {
+ // remove from the list of requests for a job
+ long newHead = removeRequestFromJob(job, waiter);
+ jobArenaMgr.setLastWaiter(job, newHead);
+ }
+ }
+ };
- synchronized (jobArenaMgr) {
- waiter = jobArenaMgr.getLastWaiter(job);
- insertIntoJobQueue(request, waiter);
- jobArenaMgr.setLastWaiter(job, request);
+ Queue Upgrader = new Queue() {
+ public void add(long request, long resource, long job) {
+ long upgrader = resArenaMgr.getFirstUpgrader(resource);
+ reqArenaMgr.setNextRequest(request, -1);
+ if (upgrader == -1) {
+ resArenaMgr.setFirstUpgrader(resource, request);
+ } else {
+ appendToRequestQueue(upgrader, request);
+ }
+ synchronized (jobArenaMgr) {
+ upgrader = jobArenaMgr.getLastUpgrader(job);
+ insertIntoJobQueue(request, upgrader);
+ jobArenaMgr.setLastUpgrader(job, request);
+ }
}
- }
-
- private void removeWaiter(long request, long resource, long job) {
- long waiter = resArenaMgr.getFirstWaiter(resource);
- if (waiter == request) {
- long next = reqArenaMgr.getNextRequest(waiter);
- resArenaMgr.setFirstWaiter(resource, next);
- } else {
- waiter = removeRequestFromQueueForSlot(waiter, request);
+ public void remove(long request, long resource, long job) {
+ long upgrader = resArenaMgr.getFirstUpgrader(resource);
+ if (upgrader == request) {
+ long next = reqArenaMgr.getNextRequest(upgrader);
+ resArenaMgr.setFirstUpgrader(resource, next);
+ } else {
+ upgrader = removeRequestFromQueueForSlot(upgrader, request);
+ }
+ synchronized (jobArenaMgr) {
+ // remove from the list of requests for a job
+ long newHead = removeRequestFromJob(job, upgrader);
+ jobArenaMgr.setLastUpgrader(job, newHead);
+ }
}
- synchronized (jobArenaMgr) {
- // remove from the list of requests for a job
- long newHead = removeRequestFromJob(job, waiter);
- jobArenaMgr.setLastWaiter(job, newHead);
- }
- }
-
- private void addUpgrader(long request, long resource, long job) {
- long upgrader = resArenaMgr.getFirstUpgrader(resource);
- reqArenaMgr.setNextRequest(request, -1);
- if (upgrader == -1) {
- resArenaMgr.setFirstUpgrader(resource, request);
- } else {
- appendToRequestQueue(upgrader, request);
- }
- synchronized (jobArenaMgr) {
- upgrader = jobArenaMgr.getLastUpgrader(job);
- insertIntoJobQueue(request, upgrader);
- jobArenaMgr.setLastUpgrader(job, request);
- }
- }
-
- private void removeUpgrader(long request, long resource, long job) {
- long upgrader = resArenaMgr.getFirstUpgrader(resource);
- if (upgrader == request) {
- long next = reqArenaMgr.getNextRequest(upgrader);
- resArenaMgr.setFirstUpgrader(resource, next);
- } else {
- upgrader = removeRequestFromQueueForSlot(upgrader, request);
- }
- synchronized (jobArenaMgr) {
- // remove from the list of requests for a job
- long newHead = removeRequestFromJob(job, upgrader);
- jobArenaMgr.setLastUpgrader(job, newHead);
- }
- }
-
+ };
+
private void insertIntoJobQueue(long newRequest, long oldRequest) {
reqArenaMgr.setNextJobRequest(newRequest, oldRequest);
reqArenaMgr.setPrevJobRequest(newRequest, -1);
@@ -654,6 +677,9 @@
}
private void log(String string, int id, int entityHashValue, byte lockMode, ITransactionContext txnContext) {
+ if (! LOGGER.isLoggable(Level.FINEST)) {
+ return;
+ }
StringBuilder sb = new StringBuilder();
sb.append("{ op : ").append(string);
if (id != -1) {
@@ -669,7 +695,7 @@
sb.append(" , jobId : ").append(txnContext.getJobId());
}
sb.append(" }");
- //System.err.println("XXX" + sb.toString());
+ LOGGER.finest(sb.toString());
}
private void validateJob(ITransactionContext txnContext) throws ACIDException {
@@ -796,6 +822,7 @@
ResourceGroup get(DatasetId dId, int entityHashValue) {
// TODO ensure good properties of hash function
int h = Math.abs(dId.getId() ^ entityHashValue);
+ if (h < 0) h = 0;
return table[h % TABLE_SIZE];
}
@@ -829,12 +856,12 @@
}
void getLatch() {
- log("latch " + toString());
+ log("latch");
latch.writeLock().lock();
}
void releaseLatch() {
- log("release " + toString());
+ log("release");
latch.writeLock().unlock();
}
@@ -843,7 +870,7 @@
}
void await(ITransactionContext txnContext) throws ACIDException {
- log("wait for " + toString());
+ log("wait for");
try {
condition.await();
} catch (InterruptedException e) {
@@ -852,12 +879,15 @@
}
void wakeUp() {
- log("notify " + toString());
+ log("notify");
condition.signalAll();
}
void log(String s) {
- //System.out.println("XXXX " + s);
+ if (! LOGGER.isLoggable(Level.FINEST)) {
+ return;
+ }
+ LOGGER.finest(s + " " + toString());
}
public String toString() {