remove jobId from jobIdSlotMap in releaseLocks
add some counters
add some logging
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index c4eeb9d..6319062 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -43,7 +43,8 @@
public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent {
private static final Logger LOGGER = Logger.getLogger(ConcurrentLockManager.class.getName());
-
+ private static final Level LVL = Level.FINER;
+
public static final boolean IS_DEBUG_MODE = false;//true
private TransactionSubsystem txnSubsystem;
@@ -53,6 +54,13 @@
private JobArenaManager jobArenaMgr;
private ConcurrentHashMap<Integer, Long> jobIdSlotMap;
private ThreadLocal<DatasetLockCache> dsLockCache;
+
+ private volatile int lCnt;
+ private volatile int ilCnt;
+ private volatile int tlCnt;
+ private volatile int itlCnt;
+ private volatile int ulCnt;
+ private volatile int rlCnt;
enum LockAction {
ERR(false, false),
@@ -97,6 +105,13 @@
return new DatasetLockCache();
}
};
+
+ lCnt = 0;
+ ilCnt = 0;
+ tlCnt = 0;
+ itlCnt = 0;
+ ulCnt = 0;
+ rlCnt = 0;
}
public AsterixTransactionProperties getTransactionProperties() {
@@ -107,6 +122,7 @@
public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+ ++lCnt;
final int dsId = datasetId.getId();
final int jobId = txnContext.getJobId().getId();
@@ -208,6 +224,7 @@
public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
log("instantLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+ ++ilCnt;
final int dsId = datasetId.getId();
final int jobId = txnContext.getJobId().getId();
@@ -265,6 +282,7 @@
} finally {
if (reqSlot != -1) {
// deallocate request, if we allocated one earlier
+ LOGGER.info("XXX del req slot " + TypeUtil.Global.toString(reqSlot));
reqArenaMgr.deallocate(reqSlot);
}
group.releaseLatch();
@@ -275,6 +293,7 @@
public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
log("tryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+ ++tlCnt;
final int dsId = datasetId.getId();
final int jobId = txnContext.getJobId().getId();
@@ -327,6 +346,7 @@
public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
ITransactionContext txnContext) throws ACIDException {
log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+ ++itlCnt;
final int dsId = datasetId.getId();
final int jobId = txnContext.getJobId().getId();
@@ -382,6 +402,7 @@
@Override
public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext) throws ACIDException {
log("unlock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+ ++ulCnt;
ResourceGroup group = table.get(datasetId, entityHashValue);
group.getLatch();
@@ -399,6 +420,7 @@
long holder = removeLastHolder(resource, jobSlot, lockMode);
// deallocate request
+ LOGGER.info("XXX del req slot " + TypeUtil.Global.toString(holder));
reqArenaMgr.deallocate(holder);
// deallocate resource or fix max lock mode
if (resourceNotUsed(resource)) {
@@ -411,6 +433,7 @@
}
resArenaMgr.setNext(prev, resArenaMgr.getNext(resource));
}
+ LOGGER.info("XXX del res slot " + TypeUtil.Global.toString(resource));
resArenaMgr.deallocate(resource);
} else {
final int oldMaxMode = resArenaMgr.getMaxMode(resource);
@@ -432,6 +455,7 @@
@Override
public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
log("releaseLocks", -1, -1, LockMode.ANY, txnContext);
+ ++rlCnt;
int jobId = txnContext.getJobId().getId();
Long jobSlot = jobIdSlotMap.get(jobId);
@@ -439,6 +463,12 @@
// we don't know the job, so there are no locks for it - we're done
return;
}
+ //System.err.println(table.append(new StringBuilder(), true).toString());
+ if (LOGGER.isLoggable(LVL)) {
+ LOGGER.log(LVL, "jobArenaMgr " + jobArenaMgr.addTo(new Stats()).toString());
+ LOGGER.log(LVL, "resArenaMgr " + resArenaMgr.addTo(new Stats()).toString());
+ LOGGER.log(LVL, "reqArenaMgr " + reqArenaMgr.addTo(new Stats()).toString());
+ }
synchronized (jobArenaMgr) {
long holder = jobArenaMgr.getLastHolder(jobSlot);
while (holder != -1) {
@@ -448,24 +478,26 @@
unlock(new DatasetId(dsId), pkHashVal, LockMode.ANY, txnContext);
holder = jobArenaMgr.getLastHolder(jobSlot);
}
+ LOGGER.info("XXX del job slot " + TypeUtil.Global.toString(jobSlot));
jobArenaMgr.deallocate(jobSlot);
- }
- //System.err.println(table.append(new StringBuilder(), true).toString());
- //System.out.println("jobArenaMgr " + jobArenaMgr.addTo(new Stats()).toString());
- //System.out.println("resArenaMgr " + resArenaMgr.addTo(new Stats()).toString());
- //System.out.println("reqArenaMgr " + reqArenaMgr.addTo(new Stats()).toString());
+ jobIdSlotMap.remove(jobId);
+ }
+ logCounters();
+ //LOGGER.info(toString());
}
private long findOrAllocJobSlot(int jobId) {
Long jobSlot = jobIdSlotMap.get(jobId);
if (jobSlot == null) {
jobSlot = new Long(jobArenaMgr.allocate());
+ LOGGER.info("XXX new job slot " + TypeUtil.Global.toString(jobSlot) + " (" + jobId + ")");
jobArenaMgr.setJobId(jobSlot, jobId);
Long oldSlot = jobIdSlotMap.putIfAbsent(jobId, jobSlot);
if (oldSlot != null) {
// if another thread allocated a slot for this jobId between
// get(..) and putIfAbsent(..), we'll use that slot and
// deallocate the one we allocated
+ LOGGER.info("XXX del job slot " + TypeUtil.Global.toString(jobSlot) + " due to conflict");
jobArenaMgr.deallocate(jobSlot);
jobSlot = oldSlot;
}
@@ -484,6 +516,9 @@
resArenaMgr.setPkHashVal(resSlot, entityHashValue);
resArenaMgr.setNext(resSlot, group.firstResourceIndex.get());
group.firstResourceIndex.set(resSlot);
+ LOGGER.info("XXX new res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue + ")");
+ } else {
+ LOGGER.info("XXX fnd res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue + ")");
}
return resSlot;
}
@@ -493,6 +528,10 @@
reqArenaMgr.setResourceId(reqSlot, resSlot);
reqArenaMgr.setLockMode(reqSlot, lockMode); // lock mode is a byte!!
reqArenaMgr.setJobSlot(reqSlot, jobSlot);
+ LOGGER.info("XXX new req slot " + TypeUtil.Global.toString(reqSlot)
+ + " (" + TypeUtil.Global.toString(resSlot)
+ + ", " + TypeUtil.Global.toString(jobSlot)
+ + ", " + LockMode.toString(lockMode) + ")");
return reqSlot;
}
@@ -535,6 +574,11 @@
}
private long findResourceInGroup(ResourceGroup group, int dsId, int entityHashValue) {
+
+ if ((lCnt + ilCnt + tlCnt + itlCnt + ulCnt + rlCnt) % 10000 == 0) {
+ logCounters();
+ }
+
long resSlot = group.firstResourceIndex.get();
while (resSlot != -1) {
// either we already have a lock on this resource or we have a
@@ -549,6 +593,18 @@
return -1;
}
+ private void logCounters() {
+ final Level lvl = Level.INFO;
+ if (LOGGER.isLoggable(lvl)) {
+ LOGGER.log(lvl, "number of lock requests : " + lCnt);
+ LOGGER.log(lvl, "number of instant lock requests : " + ilCnt);
+ LOGGER.log(lvl, "number of try lock requests : " + tlCnt);
+ LOGGER.log(lvl, "number of instant try lock requests : " + itlCnt);
+ LOGGER.log(lvl, "number of unlock requests : " + ulCnt);
+ LOGGER.log(lvl, "number of release locks requests : " + rlCnt);
+ }
+ }
+
private void addHolder(long request, long resource, long job) {
long lastHolder = resArenaMgr.getLastHolder(resource);
reqArenaMgr.setNextRequest(request, lastHolder);
@@ -763,7 +819,7 @@
}
private void log(String string, int id, int entityHashValue, byte lockMode, ITransactionContext txnContext) {
- if (! LOGGER.isLoggable(Level.FINEST)) {
+ if (! LOGGER.isLoggable(LVL)) {
return;
}
StringBuilder sb = new StringBuilder();
@@ -781,7 +837,7 @@
sb.append(" , jobId : ").append(txnContext.getJobId());
}
sb.append(" }");
- LOGGER.finest(sb.toString());
+ LOGGER.log(LVL, sb.toString());
}
private void validateJob(ITransactionContext txnContext) throws ACIDException {
@@ -852,6 +908,7 @@
os.write(toString().getBytes());
os.flush();
} catch (IOException e) {
+ LOGGER.warning("caught exception when dumping state of ConcurrentLockManager: " + e.toString());
//ignore
}
}
@@ -976,6 +1033,7 @@
try {
condition.await();
} catch (InterruptedException e) {
+ LOGGER.finer("interrupted while wating on ResourceGroup");
throw new ACIDException(txnContext, "interrupted", e);
}
}
@@ -986,8 +1044,8 @@
}
void log(String s) {
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(s + " " + toString());
+ if (LOGGER.isLoggable(LVL)) {
+ LOGGER.log(LVL, s + " " + toString());
}
}