merge -r1655:1701 from asterix_lsm_stabilization to asterix_lsm_stabilization_kisskys
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization_kisskys@1702 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index aeb54b5..b1a28ca 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -46,6 +46,7 @@
public static final boolean IS_DEBUG_MODE = false;//true
public static final boolean ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET = true;
+ //Threshold must be greater than 1 and should be reasonably large enough not to escalate too soon.
public static final int ESCALATE_TRHESHOLD_ENTITY_TO_DATASET = 1000;
private static final int DO_ESCALATE = 0;
private static final int ESCALATED = 1;
@@ -103,11 +104,11 @@
@Override
public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, TransactionContext txnContext)
throws ACIDException {
- internalLock(datasetId, entityHashValue, lockMode, txnContext);
+ internalLock(datasetId, entityHashValue, lockMode, txnContext, false);
}
- private void internalLock(DatasetId datasetId, int entityHashValue, byte lockMode, TransactionContext txnContext)
- throws ACIDException {
+ private void internalLock(DatasetId datasetId, int entityHashValue, byte lockMode, TransactionContext txnContext,
+ boolean isInstant) throws ACIDException {
JobId jobId = txnContext.getJobId();
int jId = jobId.getId(); //int-type jobId
@@ -117,7 +118,7 @@
DatasetLockInfo dLockInfo = null;
JobInfo jobInfo;
byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX;
- boolean isEscalated = false;
+ boolean doEscalate = false;
latchLockTable();
validateJob(txnContext);
@@ -131,12 +132,12 @@
jobInfo = jobHT.get(jobId);
if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
- if (datasetLockMode == LockMode.IS && jobInfo != null && dLockInfo != null) {
- int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
- switch (upgradeStatus) {
+ if (!isInstant && datasetLockMode == LockMode.IS && jobInfo != null && dLockInfo != null) {
+ int escalateStatus = needEscalateFromEntityToDataset(jobInfo, dId, lockMode);
+ switch (escalateStatus) {
case DO_ESCALATE:
entityHashValue = -1;
- isEscalated = true;
+ doEscalate = true;
break;
case ESCALATED:
@@ -180,8 +181,12 @@
jobInfo.addHoldingResource(entityInfo);
if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
- if (datasetLockMode == LockMode.IS) {
+ if (!isInstant && datasetLockMode == LockMode.IS) {
jobInfo.increaseDatasetISLockCount(dId);
+ if (doEscalate) {
+ throw new IllegalStateException("ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set to "
+ + ESCALATE_TRHESHOLD_ENTITY_TO_DATASET);
+ }
}
}
@@ -204,11 +209,21 @@
}
if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
- if (isEscalated) {
- releaseDatasetISLocks(jobInfo, jobId, datasetId, txnContext);
- }
- if (jobInfo != null && datasetLockMode == LockMode.IS) {
- jobInfo.increaseDatasetISLockCount(dId);
+ if (!isInstant) {
+ if (doEscalate) {
+ //jobInfo must not be null.
+ assert jobInfo != null;
+ jobInfo.increaseDatasetISLockCount(dId);
+ //release pre-acquired locks
+ releaseDatasetISLocks(jobInfo, jobId, datasetId, txnContext);
+ } else if (datasetLockMode == LockMode.IS) {
+ if (jobInfo == null) {
+ jobInfo = jobHT.get(jobId);
+ //jobInfo must not be null;
+ assert jobInfo != null;
+ }
+ jobInfo.increaseDatasetISLockCount(dId);
+ }
}
}
@@ -247,7 +262,7 @@
}
}
- private int needUpgradeFromEntityToDataset(JobInfo jobInfo, int datasetId, byte lockMode) {
+ private int needEscalateFromEntityToDataset(JobInfo jobInfo, int datasetId, byte lockMode) {
//we currently allow upgrade only if the lockMode is S.
if (lockMode != LockMode.S) {
return DONOT_ESCALATE;
@@ -594,17 +609,21 @@
@Override
public void unlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext) throws ACIDException {
- internalUnlock(datasetId, entityHashValue, txnContext, false);
+ internalUnlock(datasetId, entityHashValue, txnContext, false, false);
}
@Override
public void unlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext, boolean commitFlag)
throws ACIDException {
- internalUnlock(datasetId, entityHashValue, txnContext, commitFlag);
+ internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
+ }
+
+ private void instantUnlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext) throws ACIDException {
+ internalUnlock(datasetId, entityHashValue, txnContext, true, false);
}
private void internalUnlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext,
- boolean commitFlag) throws ACIDException {
+ boolean isInstant, boolean commitFlag) throws ACIDException {
JobId jobId = txnContext.getJobId();
int eLockInfo = -1;
DatasetLockInfo dLockInfo = null;
@@ -731,7 +750,7 @@
//since the datasetLockInfo is likely to be referred to again.
if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
- if (datasetLockMode == LockMode.IS) {
+ if (!isInstant && datasetLockMode == LockMode.IS) {
jobInfo.decreaseDatasetISLockCount(datasetId.getId());
}
}
@@ -949,14 +968,14 @@
// } finally {
// unlock(datasetId, entityHashValue, txnContext);
// }
- internalLock(datasetId, entityHashValue, lockMode, txnContext);
- unlock(datasetId, entityHashValue, txnContext);
+ internalLock(datasetId, entityHashValue, lockMode, txnContext, true);
+ instantUnlock(datasetId, entityHashValue, txnContext);
}
@Override
public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, TransactionContext txnContext)
throws ACIDException {
- return internalTryLock(datasetId, entityHashValue, lockMode, txnContext);
+ return internalTryLock(datasetId, entityHashValue, lockMode, txnContext, false);
}
@Override
@@ -971,15 +990,15 @@
// unlock(datasetId, entityHashValue, txnContext);
// }
// }
- isGranted = internalTryLock(datasetId, entityHashValue, lockMode, txnContext);
+ isGranted = internalTryLock(datasetId, entityHashValue, lockMode, txnContext, true);
if (isGranted) {
- unlock(datasetId, entityHashValue, txnContext);
+ instantUnlock(datasetId, entityHashValue, txnContext);
}
return isGranted;
}
private boolean internalTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
- TransactionContext txnContext) throws ACIDException {
+ TransactionContext txnContext, boolean isInstant) throws ACIDException {
JobId jobId = txnContext.getJobId();
int jId = jobId.getId(); //int-type jobId
int dId = datasetId.getId(); //int-type datasetId
@@ -989,7 +1008,7 @@
JobInfo jobInfo;
byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX;
boolean isSuccess = false;
- boolean isEscalated = false;
+ boolean doEscalate = false;
latchLockTable();
validateJob(txnContext);
@@ -1003,12 +1022,12 @@
jobInfo = jobHT.get(jobId);
if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
- if (datasetLockMode == LockMode.IS && jobInfo != null && dLockInfo != null) {
- int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
+ if (!isInstant && datasetLockMode == LockMode.IS && jobInfo != null && dLockInfo != null) {
+ int upgradeStatus = needEscalateFromEntityToDataset(jobInfo, dId, lockMode);
switch (upgradeStatus) {
case DO_ESCALATE:
entityHashValue = -1;
- isEscalated = true;
+ doEscalate = true;
break;
case ESCALATED:
@@ -1052,8 +1071,14 @@
jobInfo.addHoldingResource(entityInfo);
if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
- if (datasetLockMode == LockMode.IS) {
+ if (!isInstant && datasetLockMode == LockMode.IS) {
jobInfo.increaseDatasetISLockCount(dId);
+ if (doEscalate) {
+ //This exception is thrown when the threshold value is set to 1.
+ //We don't want to allow the lock escalation when there is a first lock request on a dataset.
+ throw new IllegalStateException("ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set to "
+ + ESCALATE_TRHESHOLD_ENTITY_TO_DATASET);
+ }
}
}
@@ -1083,11 +1108,21 @@
}
if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
- if (isEscalated) {
- releaseDatasetISLocks(jobInfo, jobId, datasetId, txnContext);
- }
- if (jobInfo != null && datasetLockMode == LockMode.IS) {
- jobInfo.increaseDatasetISLockCount(dId);
+ if (!isInstant) {
+ if (doEscalate) {
+ //jobInfo must not be null.
+ assert jobInfo != null;
+ jobInfo.increaseDatasetISLockCount(dId);
+ //release pre-acquired locks
+ releaseDatasetISLocks(jobInfo, jobId, datasetId, txnContext);
+ } else if (datasetLockMode == LockMode.IS) {
+ if (jobInfo == null) {
+ jobInfo = jobHT.get(jobId);
+ //jobInfo must not be null;
+ assert jobInfo != null;
+ }
+ jobInfo.increaseDatasetISLockCount(dId);
+ }
}
}