changed to upgrade record-level read locks to dataset-level read lock if the number of lock requests exceeds a threshold

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1459 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
index 1a3ac03..8abc5b1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
@@ -11,7 +11,7 @@
     private int firstWaitingResource; //resource(entity or dataset) which this job is waiting for
     private int upgradingResource; //resource(entity or dataset) which this job is waiting for to upgrade
 
-    //private PrimitiveIntHashMap dLockHT; //used for keeping dataset-granule-lock's count acquired by this job. 
+    private PrimitiveIntHashMap dLockHT; //used for keeping dataset-granule-lock's count acquired by this job. 
 
     public JobInfo(EntityInfoManager entityInfoManager, LockWaiterManager lockWaiterManager, TransactionContext txnCtx) {
         this.entityInfoManager = entityInfoManager;
@@ -20,7 +20,11 @@
         this.lastHoldingResource = -1;
         this.firstWaitingResource = -1;
         this.upgradingResource = -1;
-        //this.dLockHT = new PrimitiveIntHashMap(1<<6, 1<<3, 180000);
+        if (LockManager.ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+            //This table maintains the number of locks acquired by this jobInfo.
+            //[Notice] But this doesn't decrease the count even if the lock is released.
+            this.dLockHT = new PrimitiveIntHashMap(1 << 4, 1 << 2, Integer.MAX_VALUE);
+        }
     }
 
     public void addHoldingResource(int resource) {
@@ -40,7 +44,9 @@
         entityInfoManager.setNextJobResource(resource, -1);
         lastHoldingResource = resource;
 
-        //increaseDatasetLockCount(resource);
+        if (LockManager.ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+            increaseDatasetLockCount(resource);
+        }
     }
 
     public void removeHoldingResource(int resource) {
@@ -181,16 +187,26 @@
         //        }
     }
 
+    private void increaseDatasetLockCount(int entityInfo) {
+        int datasetId = entityInfoManager.getDatasetId(entityInfo);
+        int count = dLockHT.get(datasetId);
+        if (count == -1) {
+            dLockHT.upsert(datasetId, 1);
+        } else {
+            dLockHT.upsert(datasetId, count + 1);
+        }
+    }
+
+    public int getDatasetLockCount(int datasetId) {
+        int count = dLockHT.get(datasetId);
+        if (count == -1) {
+            return 0;
+        } else {
+            return count;
+        }
+    }
+
     /**********************************************************************************
-     * public void increaseDatasetLockCount(int entityInfo) {
-     * int datasetId = entityInfoManager.getDatasetId(entityInfo);
-     * int count = dLockHT.get(datasetId);
-     * if (count == -1) {
-     * dLockHT.upsert(datasetId, 1);
-     * } else {
-     * dLockHT.upsert(datasetId, count+1);
-     * }
-     * }
      * public void decreaseDatasetLockCount(int entityInfo) {
      * int datasetId = entityInfoManager.getDatasetId(entityInfo);
      * int count = dLockHT.get(datasetId);
@@ -244,7 +260,7 @@
         }
     }
 
-    public String printHoldingResource () {
+    public String printHoldingResource() {
         StringBuilder s = new StringBuilder();
         int entityInfo = lastHoldingResource;
 
@@ -262,7 +278,7 @@
         }
         return s.toString();
     }
-    
+
     /////////////////////////////////////////////////////////
     //  set/get method for private variable
     /////////////////////////////////////////////////////////
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index fb5bb68..f20b8be 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -45,6 +45,12 @@
 
     public static final boolean IS_DEBUG_MODE = false;//true
 
+    public static final boolean ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET = true;
+    public static final int UPGRAGE_TRHESHOLD_ENTITY_TO_DATASET = 100000;
+    private static final int DO_UPGRADE = 0;
+    private static final int UPGRADED = 1;
+    private static final int DONOT_UPGRADE = 2;
+
     private TransactionSubsystem txnSubsystem;
 
     //all threads accessing to LockManager's tables such as jobHT and datasetResourceHT
@@ -123,6 +129,24 @@
         dLockInfo = datasetResourceHT.get(datasetId);
         jobInfo = jobHT.get(jobId);
 
+        if (ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+            if (jobInfo != null && dLockInfo != null && entityHashValue != -1) {
+                int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
+                switch (upgradeStatus) {
+                    case DO_UPGRADE:
+                        entityHashValue = -1;
+                        break;
+                        
+                    case UPGRADED:
+                        unlatchLockTable();
+                        return;
+                        
+                    default:
+                        break;
+                }
+            }
+        }
+
         //#. if the datasetLockInfo doesn't exist in datasetResourceHT 
         if (dLockInfo == null || dLockInfo.isNoHolder()) {
             if (dLockInfo == null) {
@@ -179,6 +203,22 @@
         return;
     }
 
+    private int needUpgradeFromEntityToDataset(JobInfo jobInfo, int datasetId, byte lockMode) {
+        //we currently allow upgrade only if the lockMode is S. 
+        if (lockMode != LockMode.S) {
+            return DONOT_UPGRADE;
+        }
+
+        int count = jobInfo.getDatasetLockCount(datasetId);
+        if (count == UPGRAGE_TRHESHOLD_ENTITY_TO_DATASET) {
+            return DO_UPGRADE;
+        } else if (count > UPGRAGE_TRHESHOLD_ENTITY_TO_DATASET){
+            return UPGRADED;
+        } else {
+            return DONOT_UPGRADE;
+        }
+    }
+
     private void validateJob(TransactionContext txnContext) throws ACIDException {
         if (txnContext.getTxnState() == TransactionState.ABORTED) {
             unlatchLockTable();
@@ -888,6 +928,24 @@
         dLockInfo = datasetResourceHT.get(datasetId);
         jobInfo = jobHT.get(jobId);
 
+        if (ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+            if (jobInfo != null && dLockInfo != null && entityHashValue != -1) {
+                int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
+                switch (upgradeStatus) {
+                    case DO_UPGRADE:
+                        entityHashValue = -1;
+                        break;
+                        
+                    case UPGRADED:
+                        unlatchLockTable();
+                        return true;
+                        
+                    default:
+                        break;
+                }
+            }
+        }
+
         //#. if the datasetLockInfo doesn't exist in datasetResourceHT 
         if (dLockInfo == null || dLockInfo.isNoHolder()) {
             if (dLockInfo == null) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java
index be9c080..a65f385 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/PrimitiveIntHashMap.java
@@ -530,7 +530,7 @@
         for (i = 1; i < NUM_OF_SLOTS; i++) {
             if (cArray[bucketNum][i*2] == key) {
                 if (isUpsert) {
-                    cArray[bucketNum][emptySlot*2+1] = value;
+                    cArray[bucketNum][i*2+1] = value;
                 }
                 return 0;
             }