some more cleanup (mostly removal of debug/logging code)
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
index ee66969..39a344e 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
@@ -23,7 +23,7 @@
public static final boolean CHECK_SLOTS = @DEBUG@;
public static final boolean TRACK_ALLOC_LOC = @DEBUG@;
- static final int NO_SLOTS = 20;
+ static final int NO_SLOTS = 1000;
@CONSTS@
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 2924756..6d310f3 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -166,17 +167,6 @@
private void enqueueWaiter(final ResourceGroup group, final long reqSlot, final long resSlot, final long jobSlot,
final LockAction act, ITransactionContext txnContext) throws ACIDException {
- if (LOGGER.isLoggable(LVL)) {
- StringBuilder sb = new StringBuilder();
- final int jobId = jobArenaMgr.getJobId(jobSlot);
- final int datasetId = resArenaMgr.getDatasetId(resSlot);
- final int pkHashVal = resArenaMgr.getPkHashVal(resSlot);
- sb.append("job " + jobId + " wants to ");
- sb.append(act.modify ? "upgrade lock" : "wait");
- sb.append(" for [" + datasetId + ", " + pkHashVal + "]");
- LOGGER.log(LVL, sb.toString());
- }
-
final Queue queue = act.modify ? upgrader : waiter;
if (introducesDeadlock(resSlot, jobSlot, NOPTracker.INSTANCE)) {
DeadlockTracker tracker = new CollectingTracker();
@@ -564,15 +554,8 @@
jobArenaMgr.deallocate(jobSlot);
jobIdSlotMap.remove(jobId);
stats.logCounters(LOGGER, Level.INFO, true);
- if (LOGGER.isLoggable(LVL)) {
- LOGGER.log(LVL, "after releaseLocks");
- LOGGER.log(LVL, "jobArenaMgr " + jobArenaMgr.addTo(new RecordManagerStats()).toString());
- LOGGER.log(LVL, "resArenaMgr " + resArenaMgr.addTo(new RecordManagerStats()).toString());
- LOGGER.log(LVL, "reqArenaMgr " + reqArenaMgr.addTo(new RecordManagerStats()).toString());
- }
- //LOGGER.log(LVL, toString());
}
-
+
private long findOrAllocJobSlot(int jobId) {
Long jobSlot = jobIdSlotMap.get(jobId);
if (jobSlot == null) {
@@ -626,11 +609,10 @@
private LockAction determineLockAction(long resSlot, long jobSlot, byte lockMode) {
final int curLockMode = resArenaMgr.getMaxMode(resSlot);
- LockAction act = ACTION_MATRIX[curLockMode][lockMode];
+ final LockAction act = ACTION_MATRIX[curLockMode][lockMode];
if (act == LockAction.WAIT) {
- act = updateActionForSameJob(resSlot, jobSlot, lockMode);
+ return updateActionForSameJob(resSlot, jobSlot, lockMode);
}
- LOGGER.log(LVL, "determineLockAction(" + resSlot + ", " + jobSlot + ", " + lockMode + ") -> " + act);
return act;
}
@@ -692,7 +674,6 @@
insertIntoJobQueue(request, lastJobHolder);
jobArenaMgr.setLastHolder(job, request);
}
- //assertLocksCanBefound();
}
private long removeLastHolder(long resource, long jobSlot, byte lockMode) throws ACIDException {
@@ -700,6 +681,7 @@
if (holder < 0) {
throw new IllegalStateException("no holder for resource " + resource);
}
+
// remove from the list of holders for a resource
if (requestMatches(holder, jobSlot, lockMode)) {
// if the head of the queue matches, we need to update the resource
@@ -708,6 +690,7 @@
} else {
holder = removeRequestFromQueueForJob(holder, jobSlot, lockMode);
}
+
synchronized (jobArenaMgr) {
// remove from the list of requests for a job
long newHead = removeRequestFromJob(holder, jobArenaMgr.getLastHolder(jobSlot));
@@ -755,7 +738,6 @@
insertIntoJobQueue(request, waiter);
jobArenaMgr.setLastWaiter(job, request);
}
- //assertLocksCanBefound();
}
public void remove(long request, long resource, long job) {
@@ -771,7 +753,6 @@
long newHead = removeRequestFromJob(waiter, jobArenaMgr.getLastWaiter(job));
jobArenaMgr.setLastWaiter(job, newHead);
}
- //assertLocksCanBefound();
}
};
@@ -789,7 +770,6 @@
insertIntoJobQueue(request, upgrader);
jobArenaMgr.setLastUpgrader(job, request);
}
- //assertLocksCanBefound();
}
public void remove(long request, long resource, long job) {
@@ -805,12 +785,10 @@
long newHead = removeRequestFromJob(upgrader, jobArenaMgr.getLastUpgrader(job));
jobArenaMgr.setLastUpgrader(job, newHead);
}
- //assertLocksCanBefound();
}
};
private void insertIntoJobQueue(long newRequest, long oldRequest) {
- LOGGER.log(LVL, "insertIntoJobQueue(" + newRequest + ", " + oldRequest + ")");
reqArenaMgr.setNextJobRequest(newRequest, oldRequest);
reqArenaMgr.setPrevJobRequest(newRequest, -1);
if (oldRequest >= 0) {
@@ -1049,25 +1027,28 @@
private void assertLocksCanBefoundInJobQueue() throws ACIDException {
for (int i = 0; i < ResourceGroupTable.TABLE_SIZE; ++i) {
- final ResourceGroup group = table.table[i];
- group.waitForLatch();
- try {
- long resSlot = group.firstResourceIndex.get();
- while (resSlot != -1) {
- int dsId = resArenaMgr.getDatasetId(resSlot);
- int entityHashValue = resArenaMgr.getPkHashVal(resSlot);
- long reqSlot = resArenaMgr.getLastHolder(resSlot);
- while (reqSlot != -1) {
- byte lockMode = (byte) reqArenaMgr.getLockMode(reqSlot);
- long jobSlot = reqArenaMgr.getJobSlot(reqSlot);
- int jobId = jobArenaMgr.getJobId(jobSlot);
- assertLockCanBeFoundInJobQueue(dsId, entityHashValue, lockMode, jobId);
- reqSlot = reqArenaMgr.getNextRequest(reqSlot);
+ final ResourceGroup group = table.get(i);
+ if (group.tryLatch(100, TimeUnit.MILLISECONDS)) {
+ try {
+ long resSlot = group.firstResourceIndex.get();
+ while (resSlot != -1) {
+ int dsId = resArenaMgr.getDatasetId(resSlot);
+ int entityHashValue = resArenaMgr.getPkHashVal(resSlot);
+ long reqSlot = resArenaMgr.getLastHolder(resSlot);
+ while (reqSlot != -1) {
+ byte lockMode = (byte) reqArenaMgr.getLockMode(reqSlot);
+ long jobSlot = reqArenaMgr.getJobSlot(reqSlot);
+ int jobId = jobArenaMgr.getJobId(jobSlot);
+ assertLockCanBeFoundInJobQueue(dsId, entityHashValue, lockMode, jobId);
+ reqSlot = reqArenaMgr.getNextRequest(reqSlot);
+ }
+ resSlot = resArenaMgr.getNext(resSlot);
}
- resSlot = resArenaMgr.getNext(resSlot);
+ } finally {
+ group.releaseLatch();
}
- } finally {
- group.releaseLatch();
+ } else {
+ LOGGER.warning("Could not check locks for " + group);
}
}
}
@@ -1217,7 +1198,7 @@
private static class ResourceGroupTable {
public static final int TABLE_SIZE = 1024; // TODO increase?
- ResourceGroup[] table;
+ private ResourceGroup[] table;
public ResourceGroupTable() {
table = new ResourceGroup[TABLE_SIZE];
@@ -1225,13 +1206,17 @@
table[i] = new ResourceGroup();
}
}
-
+
ResourceGroup get(int dId, int entityHashValue) {
// TODO ensure good properties of hash function
int h = Math.abs(dId ^ entityHashValue);
if (h < 0) h = 0;
return table[h % TABLE_SIZE];
}
+
+ ResourceGroup get(int i) {
+ return table[i];
+ }
public void getAllLatches() {
for (int i = 0; i < TABLE_SIZE; ++i) {
@@ -1279,15 +1264,13 @@
latch.writeLock().lock();
}
- void waitForLatch() throws ACIDException {
- log("waitForLatch");
- while (!latch.writeLock().tryLock()) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- LOGGER.finer("interrupted while sleeping");
- throw new ACIDException("interrupted", e);
- }
+ boolean tryLatch(long timeout, TimeUnit unit) throws ACIDException {
+ log("tryLatch");
+ try {
+ return latch.writeLock().tryLock(timeout, unit);
+ } catch (InterruptedException e) {
+ LOGGER.finer("interrupted while wating on ResourceGroup");
+ throw new ACIDException("interrupted", e);
}
}
@@ -1316,7 +1299,7 @@
}
void log(String s) {
- if (LOGGER.isLoggable(Level.FINER)) {
+ if (LOGGER.isLoggable(LVL)) {
LOGGER.log(LVL, s + " " + toString());
}
}