changed to upgrade record-level read locks to dataset-level read lock if the number of lock requests exceeds a threshold
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1459 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
index 1a3ac03..8abc5b1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
@@ -11,7 +11,7 @@
private int firstWaitingResource; //resource(entity or dataset) which this job is waiting for
private int upgradingResource; //resource(entity or dataset) which this job is waiting for to upgrade
- //private PrimitiveIntHashMap dLockHT; //used for keeping dataset-granule-lock's count acquired by this job.
+ private PrimitiveIntHashMap dLockHT; //used for keeping dataset-granule-lock's count acquired by this job.
public JobInfo(EntityInfoManager entityInfoManager, LockWaiterManager lockWaiterManager, TransactionContext txnCtx) {
this.entityInfoManager = entityInfoManager;
@@ -20,7 +20,11 @@
this.lastHoldingResource = -1;
this.firstWaitingResource = -1;
this.upgradingResource = -1;
- //this.dLockHT = new PrimitiveIntHashMap(1<<6, 1<<3, 180000);
+ if (LockManager.ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+ //This table maintains the number of locks acquired by this jobInfo.
+ //[Notice] But this doesn't decrease the count even if the lock is released.
+ this.dLockHT = new PrimitiveIntHashMap(1 << 4, 1 << 2, Integer.MAX_VALUE);
+ }
}
public void addHoldingResource(int resource) {
@@ -40,7 +44,9 @@
entityInfoManager.setNextJobResource(resource, -1);
lastHoldingResource = resource;
- //increaseDatasetLockCount(resource);
+ if (LockManager.ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+ increaseDatasetLockCount(resource);
+ }
}
public void removeHoldingResource(int resource) {
@@ -181,16 +187,26 @@
// }
}
+ private void increaseDatasetLockCount(int entityInfo) {
+ int datasetId = entityInfoManager.getDatasetId(entityInfo);
+ int count = dLockHT.get(datasetId);
+ if (count == -1) {
+ dLockHT.upsert(datasetId, 1);
+ } else {
+ dLockHT.upsert(datasetId, count + 1);
+ }
+ }
+
+ public int getDatasetLockCount(int datasetId) {
+ int count = dLockHT.get(datasetId);
+ if (count == -1) {
+ return 0;
+ } else {
+ return count;
+ }
+ }
+
/**********************************************************************************
- * public void increaseDatasetLockCount(int entityInfo) {
- * int datasetId = entityInfoManager.getDatasetId(entityInfo);
- * int count = dLockHT.get(datasetId);
- * if (count == -1) {
- * dLockHT.upsert(datasetId, 1);
- * } else {
- * dLockHT.upsert(datasetId, count+1);
- * }
- * }
* public void decreaseDatasetLockCount(int entityInfo) {
* int datasetId = entityInfoManager.getDatasetId(entityInfo);
* int count = dLockHT.get(datasetId);
@@ -244,7 +260,7 @@
}
}
- public String printHoldingResource () {
+ public String printHoldingResource() {
StringBuilder s = new StringBuilder();
int entityInfo = lastHoldingResource;
@@ -262,7 +278,7 @@
}
return s.toString();
}
-
+
/////////////////////////////////////////////////////////
// set/get method for private variable
/////////////////////////////////////////////////////////
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index fb5bb68..f20b8be 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -45,6 +45,12 @@
public static final boolean IS_DEBUG_MODE = false;//true
+ public static final boolean ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET = true;
+ public static final int UPGRAGE_TRHESHOLD_ENTITY_TO_DATASET = 100000;
+ private static final int DO_UPGRADE = 0;
+ private static final int UPGRADED = 1;
+ private static final int DONOT_UPGRADE = 2;
+
private TransactionSubsystem txnSubsystem;
//all threads accessing to LockManager's tables such as jobHT and datasetResourceHT
@@ -123,6 +129,24 @@
dLockInfo = datasetResourceHT.get(datasetId);
jobInfo = jobHT.get(jobId);
+ if (ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+ if (jobInfo != null && dLockInfo != null && entityHashValue != -1) {
+ int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
+ switch (upgradeStatus) {
+ case DO_UPGRADE:
+ entityHashValue = -1;
+ break;
+
+ case UPGRADED:
+ unlatchLockTable();
+ return;
+
+ default:
+ break;
+ }
+ }
+ }
+
//#. if the datasetLockInfo doesn't exist in datasetResourceHT
if (dLockInfo == null || dLockInfo.isNoHolder()) {
if (dLockInfo == null) {
@@ -179,6 +203,22 @@
return;
}
+ private int needUpgradeFromEntityToDataset(JobInfo jobInfo, int datasetId, byte lockMode) {
+ //we currently allow upgrade only if the lockMode is S.
+ if (lockMode != LockMode.S) {
+ return DONOT_UPGRADE;
+ }
+
+ int count = jobInfo.getDatasetLockCount(datasetId);
+ if (count == UPGRAGE_TRHESHOLD_ENTITY_TO_DATASET) {
+ return DO_UPGRADE;
+ } else if (count > UPGRAGE_TRHESHOLD_ENTITY_TO_DATASET){
+ return UPGRADED;
+ } else {
+ return DONOT_UPGRADE;
+ }
+ }
+
private void validateJob(TransactionContext txnContext) throws ACIDException {
if (txnContext.getTxnState() == TransactionState.ABORTED) {
unlatchLockTable();
@@ -888,6 +928,24 @@
dLockInfo = datasetResourceHT.get(datasetId);
jobInfo = jobHT.get(jobId);
+ if (ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+ if (jobInfo != null && dLockInfo != null && entityHashValue != -1) {
+ int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
+ switch (upgradeStatus) {
+ case DO_UPGRADE:
+ entityHashValue = -1;
+ break;
+
+ case UPGRADED:
+ unlatchLockTable();
+ return true;
+
+ default:
+ break;
+ }
+ }
+ }
+
//#. if the datasetLockInfo doesn't exist in datasetResourceHT
if (dLockInfo == null || dLockInfo.isNoHolder()) {
if (dLockInfo == null) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java
index be9c080..a65f385 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java
@@ -530,7 +530,7 @@
for (i = 1; i < NUM_OF_SLOTS; i++) {
if (cArray[bucketNum][i*2] == key) {
if (isUpsert) {
- cArray[bucketNum][emptySlot*2+1] = value;
+ cArray[bucketNum][i*2+1] = value;
}
return 0;
}