1)fixed read-lock escalation bug by mimicking SIX mode with allowing S and IX coexist in dataset-level and 2) released all previousely acquired locks when escalation completed
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1472 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
index 8abc5b1..75ff349 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
@@ -11,7 +11,7 @@
private int firstWaitingResource; //resource(entity or dataset) which this job is waiting for
private int upgradingResource; //resource(entity or dataset) which this job is waiting for to upgrade
- private PrimitiveIntHashMap dLockHT; //used for keeping dataset-granule-lock's count acquired by this job.
+ private PrimitiveIntHashMap datasetISLockHT; //used for keeping dataset-granule-lock's count acquired by this job.
public JobInfo(EntityInfoManager entityInfoManager, LockWaiterManager lockWaiterManager, TransactionContext txnCtx) {
this.entityInfoManager = entityInfoManager;
@@ -20,10 +20,10 @@
this.lastHoldingResource = -1;
this.firstWaitingResource = -1;
this.upgradingResource = -1;
- if (LockManager.ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+ if (LockManager.ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
//This table maintains the number of locks acquired by this jobInfo.
//[Notice] But this doesn't decrease the count even if the lock is released.
- this.dLockHT = new PrimitiveIntHashMap(1 << 4, 1 << 2, Integer.MAX_VALUE);
+ this.datasetISLockHT = new PrimitiveIntHashMap(1 << 4, 1 << 2, Integer.MAX_VALUE);
}
}
@@ -43,10 +43,6 @@
entityInfoManager.setPrevJobResource(resource, lastHoldingResource);
entityInfoManager.setNextJobResource(resource, -1);
lastHoldingResource = resource;
-
- if (LockManager.ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
- increaseDatasetLockCount(resource);
- }
}
public void removeHoldingResource(int resource) {
@@ -187,18 +183,30 @@
// }
}
- private void increaseDatasetLockCount(int entityInfo) {
- int datasetId = entityInfoManager.getDatasetId(entityInfo);
- int count = dLockHT.get(datasetId);
+ public void increaseDatasetISLockCount(int datasetId) {
+ int count = datasetISLockHT.get(datasetId);
if (count == -1) {
- dLockHT.upsert(datasetId, 1);
+ datasetISLockHT.upsert(datasetId, 1);
} else {
- dLockHT.upsert(datasetId, count + 1);
+ datasetISLockHT.upsert(datasetId, count + 1);
}
}
- public int getDatasetLockCount(int datasetId) {
- int count = dLockHT.get(datasetId);
+ public void decreaseDatasetISLockCount(int datasetId) {
+ int count = datasetISLockHT.get(datasetId);
+ if (count >= LockManager.ESCALATE_TRHESHOLD_ENTITY_TO_DATASET) {
+ //do not decrease the count since it is already escalated.
+ } else if (count > 1) {
+ datasetISLockHT.upsert(datasetId, count - 1);
+ } else if (count == 1) {
+ datasetISLockHT.remove(datasetId);
+ } else if (count <= 0) {
+ throw new IllegalStateException("Illegal state of datasetLock count in JobInfo's dLockHT");
+ }
+ }
+
+ public int getDatasetISLockCount(int datasetId) {
+ int count = datasetISLockHT.get(datasetId);
if (count == -1) {
return 0;
} else {
@@ -207,17 +215,6 @@
}
/**********************************************************************************
- * public void decreaseDatasetLockCount(int entityInfo) {
- * int datasetId = entityInfoManager.getDatasetId(entityInfo);
- * int count = dLockHT.get(datasetId);
- * if (count > 1) {
- * dLockHT.upsert(datasetId, count-1);
- * } else if (count == 1) {
- * dLockHT.remove(datasetId);
- * } else if (count <= 0 ) {
- * throw new IllegalStateException("Illegal state of datasetLock count in JobInfo's dLockHT");
- * }
- * }
* public boolean isDatasetLockGranted(int datasetId) {
* return dLockHT.get(datasetId) == -1 ? false : true;
* }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 5687656..1d47c0b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -45,11 +45,11 @@
public static final boolean IS_DEBUG_MODE = false;//true
- public static final boolean ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET = true;
- public static final int UPGRADE_TRHESHOLD_ENTITY_TO_DATASET = 10000;
- private static final int DO_UPGRADE = 0;
- private static final int UPGRADED = 1;
- private static final int DONOT_UPGRADE = 2;
+ public static final boolean ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET = true;
+ public static final int ESCALATE_TRHESHOLD_ENTITY_TO_DATASET = 1000;
+ private static final int DO_ESCALATE = 0;
+ private static final int ESCALATED = 1;
+ private static final int DONOT_ESCALATE = 2;
private TransactionSubsystem txnSubsystem;
@@ -117,6 +117,7 @@
DatasetLockInfo dLockInfo = null;
JobInfo jobInfo;
byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX;
+ boolean isEscalated = false;
latchLockTable();
validateJob(txnContext);
@@ -129,18 +130,19 @@
dLockInfo = datasetResourceHT.get(datasetId);
jobInfo = jobHT.get(jobId);
- if (ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
- if (jobInfo != null && dLockInfo != null && entityHashValue != -1) {
+ if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+ if (datasetLockMode == LockMode.IS && jobInfo != null && dLockInfo != null) {
int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
switch (upgradeStatus) {
- case DO_UPGRADE:
+ case DO_ESCALATE:
entityHashValue = -1;
+ isEscalated = true;
break;
-
- case UPGRADED:
+
+ case ESCALATED:
unlatchLockTable();
return;
-
+
default:
break;
}
@@ -177,6 +179,12 @@
}
jobInfo.addHoldingResource(entityInfo);
+ if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+ if (datasetLockMode == LockMode.IS) {
+ jobInfo.increaseDatasetISLockCount(dId);
+ }
+ }
+
if (IS_DEBUG_MODE) {
trackLockRequest("Granted", RequestType.LOCK, datasetId, entityHashValue, lockMode, txnContext,
dLockInfo, eLockInfo);
@@ -195,6 +203,15 @@
lockEntityGranule(datasetId, entityHashValue, lockMode, entityInfo, txnContext);
}
+ if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+ if (isEscalated) {
+ releaseDatasetISLocks(jobInfo, jobId, datasetId, txnContext);
+ }
+ if (datasetLockMode == LockMode.IS) {
+ jobInfo.increaseDatasetISLockCount(dId);
+ }
+ }
+
if (IS_DEBUG_MODE) {
trackLockRequest("Granted", RequestType.LOCK, datasetId, entityHashValue, lockMode, txnContext, dLockInfo,
eLockInfo);
@@ -203,19 +220,46 @@
return;
}
+ private void releaseDatasetISLocks(JobInfo jobInfo, JobId jobId, DatasetId datasetId, TransactionContext txnContext)
+ throws ACIDException {
+ int entityInfo;
+ int prevEntityInfo;
+ int entityHashValue;
+ int did;//int-type dataset id
+
+ //while traversing all holding resources,
+ //release IS locks on the escalated dataset and
+ //release S locks on the corresponding enttites
+ //by calling unlock() method.
+ entityInfo = jobInfo.getLastHoldingResource();
+ while (entityInfo != -1) {
+ prevEntityInfo = entityInfoManager.getPrevJobResource(entityInfo);
+
+ //release a lock only if the datset is the escalated dataset and
+ //the entityHashValue is not -1("not -1" means a non-dataset-level lock)
+ did = entityInfoManager.getDatasetId(entityInfo);
+ entityHashValue = entityInfoManager.getPKHashVal(entityInfo);
+ if (did == datasetId.getId() && entityHashValue != -1) {
+ this.unlock(datasetId, entityHashValue, txnContext);
+ }
+
+ entityInfo = prevEntityInfo;
+ }
+ }
+
private int needUpgradeFromEntityToDataset(JobInfo jobInfo, int datasetId, byte lockMode) {
//we currently allow upgrade only if the lockMode is S.
if (lockMode != LockMode.S) {
- return DONOT_UPGRADE;
+ return DONOT_ESCALATE;
}
- int count = jobInfo.getDatasetLockCount(datasetId);
- if (count == UPGRADE_TRHESHOLD_ENTITY_TO_DATASET) {
- return DO_UPGRADE;
- } else if (count > UPGRADE_TRHESHOLD_ENTITY_TO_DATASET){
- return UPGRADED;
+ int count = jobInfo.getDatasetISLockCount(datasetId);
+ if (count == ESCALATE_TRHESHOLD_ENTITY_TO_DATASET) {
+ return DO_ESCALATE;
+ } else if (count > ESCALATE_TRHESHOLD_ENTITY_TO_DATASET) {
+ return ESCALATED;
} else {
- return DONOT_UPGRADE;
+ return DONOT_ESCALATE;
}
}
@@ -319,6 +363,27 @@
|| !dLockInfo.isCompatible(datasetLockMode)) {
/////////////////////////////////////////////////////////////////////////////////////////////
+ //[Notice] Mimicking SIX mode
+ //When the lock escalation from IS to S in dataset-level is allowed, the following case occurs
+ //DatasetLockInfo's SCount = 1 and the same job who carried out the escalation tries to insert,
+ //then the job should be able to insert without being blocked by itself.
+ //Our approach is to introduce SIX mode, but we don't have currently,
+ //so I simply mimicking SIX by allowing S and IX coexist in the dataset level
+ //only if their job id is identical for the requests.
+ if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+ if (datasetLockMode == LockMode.IX && dLockInfo.getSCount() == 1
+ && jobInfo.isDatasetLockGranted(dId, LockMode.S)) {
+ entityInfoManager.increaseDatasetLockCount(entityInfo);
+ //IX holders are implicitly handled without adding holder
+ dLockInfo.increaseLockCount(datasetLockMode);
+ //add entityInfo to JobInfo's holding-resource list
+ jobInfo.addHoldingResource(entityInfo);
+ return entityInfo;
+ }
+ }
+ ///////////////////////////////////////////////////////////////////////////////////////////////
+
+ /////////////////////////////////////////////////////////////////////////////////////////////
//[Notice]
//There has been no same caller as (jId, dId, entityHashValue) triplet.
//But there could be the same caller as (jId, dId) pair.
@@ -545,6 +610,7 @@
DatasetLockInfo dLockInfo = null;
JobInfo jobInfo;
int entityInfo = -1;
+ byte datasetLockMode;
if (IS_DEBUG_MODE) {
if (entityHashValue == -1) {
@@ -584,9 +650,10 @@
+ entityHashValue + "]: Corresponding lock info doesn't exist.");
}
+ datasetLockMode = entityInfoManager.getDatasetLockMode(entityInfo) == LockMode.S ? LockMode.IS : LockMode.IX;
+
//decrease the corresponding count of dLockInfo/eLockInfo/entityInfo
- dLockInfo.decreaseLockCount(entityInfoManager.getDatasetLockMode(entityInfo) == LockMode.S ? LockMode.IS
- : LockMode.IX);
+ dLockInfo.decreaseLockCount(datasetLockMode);
entityLockInfoManager.decreaseLockCount(eLockInfo, entityInfoManager.getEntityLockMode(entityInfo));
entityInfoManager.decreaseDatasetLockCount(entityInfo);
entityInfoManager.decreaseEntityLockCount(entityInfo);
@@ -663,6 +730,12 @@
//we don't deallocate datasetLockInfo even if there is no txn referring to the datasetLockInfo
//since the datasetLockInfo is likely to be referred to again.
+ if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+ if (datasetLockMode == LockMode.IS) {
+ jobInfo.decreaseDatasetISLockCount(datasetId.getId());
+ }
+ }
+
if (IS_DEBUG_MODE) {
trackLockRequest("Granted", RequestType.UNLOCK, datasetId, entityHashValue, (byte) 0, txnContext,
dLockInfo, eLockInfo);
@@ -916,6 +989,7 @@
JobInfo jobInfo;
byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX;
boolean isSuccess = false;
+ boolean isEscalated = false;
latchLockTable();
validateJob(txnContext);
@@ -928,18 +1002,19 @@
dLockInfo = datasetResourceHT.get(datasetId);
jobInfo = jobHT.get(jobId);
- if (ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
- if (jobInfo != null && dLockInfo != null && entityHashValue != -1) {
+ if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+ if (datasetLockMode == LockMode.IS && jobInfo != null && dLockInfo != null) {
int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
switch (upgradeStatus) {
- case DO_UPGRADE:
+ case DO_ESCALATE:
entityHashValue = -1;
+ isEscalated = true;
break;
-
- case UPGRADED:
+
+ case ESCALATED:
unlatchLockTable();
return true;
-
+
default:
break;
}
@@ -976,6 +1051,12 @@
}
jobInfo.addHoldingResource(entityInfo);
+ if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+ if (datasetLockMode == LockMode.IS) {
+ jobInfo.increaseDatasetISLockCount(dId);
+ }
+ }
+
if (IS_DEBUG_MODE) {
trackLockRequest("Granted", RequestType.TRY_LOCK, datasetId, entityHashValue, lockMode, txnContext,
dLockInfo, eLockInfo);
@@ -1001,6 +1082,15 @@
}
}
+ if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+ if (isEscalated) {
+ releaseDatasetISLocks(jobInfo, jobId, datasetId, txnContext);
+ }
+ if (datasetLockMode == LockMode.IS) {
+ jobInfo.increaseDatasetISLockCount(dId);
+ }
+ }
+
if (IS_DEBUG_MODE) {
if (isSuccess) {
trackLockRequest("Granted", RequestType.TRY_LOCK, datasetId, entityHashValue, lockMode, txnContext,