1)fixed read-lock escalation bug by mimicking SIX mode with allowing S and IX coexist in dataset-level and 2) released all previousely acquired locks when escalation completed

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1472 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
index 8abc5b1..75ff349 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
@@ -11,7 +11,7 @@
     private int firstWaitingResource; //resource(entity or dataset) which this job is waiting for
     private int upgradingResource; //resource(entity or dataset) which this job is waiting for to upgrade
 
-    private PrimitiveIntHashMap dLockHT; //used for keeping dataset-granule-lock's count acquired by this job. 
+    private PrimitiveIntHashMap datasetISLockHT; //used for keeping dataset-granule-lock's count acquired by this job. 
 
     public JobInfo(EntityInfoManager entityInfoManager, LockWaiterManager lockWaiterManager, TransactionContext txnCtx) {
         this.entityInfoManager = entityInfoManager;
@@ -20,10 +20,10 @@
         this.lastHoldingResource = -1;
         this.firstWaitingResource = -1;
         this.upgradingResource = -1;
-        if (LockManager.ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
+        if (LockManager.ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
             //This table maintains the number of locks acquired by this jobInfo.
             //[Notice] But this doesn't decrease the count even if the lock is released.
-            this.dLockHT = new PrimitiveIntHashMap(1 << 4, 1 << 2, Integer.MAX_VALUE);
+            this.datasetISLockHT = new PrimitiveIntHashMap(1 << 4, 1 << 2, Integer.MAX_VALUE);
         }
     }
 
@@ -43,10 +43,6 @@
         entityInfoManager.setPrevJobResource(resource, lastHoldingResource);
         entityInfoManager.setNextJobResource(resource, -1);
         lastHoldingResource = resource;
-
-        if (LockManager.ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
-            increaseDatasetLockCount(resource);
-        }
     }
 
     public void removeHoldingResource(int resource) {
@@ -187,18 +183,30 @@
         //        }
     }
 
-    private void increaseDatasetLockCount(int entityInfo) {
-        int datasetId = entityInfoManager.getDatasetId(entityInfo);
-        int count = dLockHT.get(datasetId);
+    public void increaseDatasetISLockCount(int datasetId) {
+        int count = datasetISLockHT.get(datasetId);
         if (count == -1) {
-            dLockHT.upsert(datasetId, 1);
+            datasetISLockHT.upsert(datasetId, 1);
         } else {
-            dLockHT.upsert(datasetId, count + 1);
+            datasetISLockHT.upsert(datasetId, count + 1);
         }
     }
 
-    public int getDatasetLockCount(int datasetId) {
-        int count = dLockHT.get(datasetId);
+    public void decreaseDatasetISLockCount(int datasetId) {
+        int count = datasetISLockHT.get(datasetId);
+        if (count >= LockManager.ESCALATE_TRHESHOLD_ENTITY_TO_DATASET) {
+            //do not decrease the count since it is already escalated.
+        } else if (count > 1) {
+            datasetISLockHT.upsert(datasetId, count - 1);
+        } else if (count == 1) {
+            datasetISLockHT.remove(datasetId);
+        } else if (count <= 0) {
+            throw new IllegalStateException("Illegal state of datasetLock count in JobInfo's dLockHT");
+        }
+    }
+
+    public int getDatasetISLockCount(int datasetId) {
+        int count = datasetISLockHT.get(datasetId);
         if (count == -1) {
             return 0;
         } else {
@@ -207,17 +215,6 @@
     }
 
     /**********************************************************************************
-     * public void decreaseDatasetLockCount(int entityInfo) {
-     * int datasetId = entityInfoManager.getDatasetId(entityInfo);
-     * int count = dLockHT.get(datasetId);
-     * if (count > 1) {
-     * dLockHT.upsert(datasetId, count-1);
-     * } else if (count == 1) {
-     * dLockHT.remove(datasetId);
-     * } else if (count <= 0 ) {
-     * throw new IllegalStateException("Illegal state of datasetLock count in JobInfo's dLockHT");
-     * }
-     * }
      * public boolean isDatasetLockGranted(int datasetId) {
      * return dLockHT.get(datasetId) == -1 ? false : true;
      * }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 5687656..1d47c0b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -45,11 +45,11 @@
 
     public static final boolean IS_DEBUG_MODE = false;//true
 
-    public static final boolean ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET = true;
-    public static final int UPGRADE_TRHESHOLD_ENTITY_TO_DATASET = 10000;
-    private static final int DO_UPGRADE = 0;
-    private static final int UPGRADED = 1;
-    private static final int DONOT_UPGRADE = 2;
+    public static final boolean ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET = true;
+    public static final int ESCALATE_TRHESHOLD_ENTITY_TO_DATASET = 1000;
+    private static final int DO_ESCALATE = 0;
+    private static final int ESCALATED = 1;
+    private static final int DONOT_ESCALATE = 2;
 
     private TransactionSubsystem txnSubsystem;
 
@@ -117,6 +117,7 @@
         DatasetLockInfo dLockInfo = null;
         JobInfo jobInfo;
         byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX;
+        boolean isEscalated = false;
 
         latchLockTable();
         validateJob(txnContext);
@@ -129,18 +130,19 @@
         dLockInfo = datasetResourceHT.get(datasetId);
         jobInfo = jobHT.get(jobId);
 
-        if (ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
-            if (jobInfo != null && dLockInfo != null && entityHashValue != -1) {
+        if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+            if (datasetLockMode == LockMode.IS && jobInfo != null && dLockInfo != null) {
                 int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
                 switch (upgradeStatus) {
-                    case DO_UPGRADE:
+                    case DO_ESCALATE:
                         entityHashValue = -1;
+                        isEscalated = true;
                         break;
-                        
-                    case UPGRADED:
+
+                    case ESCALATED:
                         unlatchLockTable();
                         return;
-                        
+
                     default:
                         break;
                 }
@@ -177,6 +179,12 @@
             }
             jobInfo.addHoldingResource(entityInfo);
 
+            if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+                if (datasetLockMode == LockMode.IS) {
+                    jobInfo.increaseDatasetISLockCount(dId);
+                }
+            }
+
             if (IS_DEBUG_MODE) {
                 trackLockRequest("Granted", RequestType.LOCK, datasetId, entityHashValue, lockMode, txnContext,
                         dLockInfo, eLockInfo);
@@ -195,6 +203,15 @@
             lockEntityGranule(datasetId, entityHashValue, lockMode, entityInfo, txnContext);
         }
 
+        if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+            if (isEscalated) {
+                releaseDatasetISLocks(jobInfo, jobId, datasetId, txnContext);
+            }
+            if (datasetLockMode == LockMode.IS) {
+                jobInfo.increaseDatasetISLockCount(dId);
+            }
+        }
+
         if (IS_DEBUG_MODE) {
             trackLockRequest("Granted", RequestType.LOCK, datasetId, entityHashValue, lockMode, txnContext, dLockInfo,
                     eLockInfo);
@@ -203,19 +220,46 @@
         return;
     }
 
+    private void releaseDatasetISLocks(JobInfo jobInfo, JobId jobId, DatasetId datasetId, TransactionContext txnContext)
+            throws ACIDException {
+        int entityInfo;
+        int prevEntityInfo;
+        int entityHashValue;
+        int did;//int-type dataset id
+
+        //while traversing all holding resources, 
+        //release IS locks on the escalated dataset and
+        //release S locks on the corresponding enttites
+        //by calling unlock() method.
+        entityInfo = jobInfo.getLastHoldingResource();
+        while (entityInfo != -1) {
+            prevEntityInfo = entityInfoManager.getPrevJobResource(entityInfo);
+
+            //release a lock only if the datset is the escalated dataset and
+            //the entityHashValue is not -1("not -1" means a non-dataset-level lock)
+            did = entityInfoManager.getDatasetId(entityInfo);
+            entityHashValue = entityInfoManager.getPKHashVal(entityInfo);
+            if (did == datasetId.getId() && entityHashValue != -1) {
+                this.unlock(datasetId, entityHashValue, txnContext);
+            }
+
+            entityInfo = prevEntityInfo;
+        }
+    }
+
     private int needUpgradeFromEntityToDataset(JobInfo jobInfo, int datasetId, byte lockMode) {
         //we currently allow upgrade only if the lockMode is S. 
         if (lockMode != LockMode.S) {
-            return DONOT_UPGRADE;
+            return DONOT_ESCALATE;
         }
 
-        int count = jobInfo.getDatasetLockCount(datasetId);
-        if (count == UPGRADE_TRHESHOLD_ENTITY_TO_DATASET) {
-            return DO_UPGRADE;
-        } else if (count > UPGRADE_TRHESHOLD_ENTITY_TO_DATASET){
-            return UPGRADED;
+        int count = jobInfo.getDatasetISLockCount(datasetId);
+        if (count == ESCALATE_TRHESHOLD_ENTITY_TO_DATASET) {
+            return DO_ESCALATE;
+        } else if (count > ESCALATE_TRHESHOLD_ENTITY_TO_DATASET) {
+            return ESCALATED;
         } else {
-            return DONOT_UPGRADE;
+            return DONOT_ESCALATE;
         }
     }
 
@@ -319,6 +363,27 @@
                     || !dLockInfo.isCompatible(datasetLockMode)) {
 
                 /////////////////////////////////////////////////////////////////////////////////////////////
+                //[Notice] Mimicking SIX mode
+                //When the lock escalation from IS to S in dataset-level is allowed, the following case occurs
+                //DatasetLockInfo's SCount = 1 and the same job who carried out the escalation tries to insert,
+                //then the job should be able to insert without being blocked by itself. 
+                //Our approach is to introduce SIX mode, but we don't have currently, 
+                //so I simply mimicking SIX by allowing S and IX coexist in the dataset level 
+                //only if their job id is identical for the requests. 
+                if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+                    if (datasetLockMode == LockMode.IX && dLockInfo.getSCount() == 1
+                            && jobInfo.isDatasetLockGranted(dId, LockMode.S)) {
+                        entityInfoManager.increaseDatasetLockCount(entityInfo);
+                        //IX holders are implicitly handled without adding holder
+                        dLockInfo.increaseLockCount(datasetLockMode);
+                        //add entityInfo to JobInfo's holding-resource list
+                        jobInfo.addHoldingResource(entityInfo);
+                        return entityInfo;
+                    }
+                }
+                ///////////////////////////////////////////////////////////////////////////////////////////////
+
+                /////////////////////////////////////////////////////////////////////////////////////////////
                 //[Notice]
                 //There has been no same caller as (jId, dId, entityHashValue) triplet.
                 //But there could be the same caller as (jId, dId) pair.
@@ -545,6 +610,7 @@
         DatasetLockInfo dLockInfo = null;
         JobInfo jobInfo;
         int entityInfo = -1;
+        byte datasetLockMode;
 
         if (IS_DEBUG_MODE) {
             if (entityHashValue == -1) {
@@ -584,9 +650,10 @@
                     + entityHashValue + "]: Corresponding lock info doesn't exist.");
         }
 
+        datasetLockMode = entityInfoManager.getDatasetLockMode(entityInfo) == LockMode.S ? LockMode.IS : LockMode.IX;
+
         //decrease the corresponding count of dLockInfo/eLockInfo/entityInfo
-        dLockInfo.decreaseLockCount(entityInfoManager.getDatasetLockMode(entityInfo) == LockMode.S ? LockMode.IS
-                : LockMode.IX);
+        dLockInfo.decreaseLockCount(datasetLockMode);
         entityLockInfoManager.decreaseLockCount(eLockInfo, entityInfoManager.getEntityLockMode(entityInfo));
         entityInfoManager.decreaseDatasetLockCount(entityInfo);
         entityInfoManager.decreaseEntityLockCount(entityInfo);
@@ -663,6 +730,12 @@
         //we don't deallocate datasetLockInfo even if there is no txn referring to the datasetLockInfo
         //since the datasetLockInfo is likely to be referred to again.
 
+        if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+            if (datasetLockMode == LockMode.IS) {
+                jobInfo.decreaseDatasetISLockCount(datasetId.getId());
+            }
+        }
+
         if (IS_DEBUG_MODE) {
             trackLockRequest("Granted", RequestType.UNLOCK, datasetId, entityHashValue, (byte) 0, txnContext,
                     dLockInfo, eLockInfo);
@@ -916,6 +989,7 @@
         JobInfo jobInfo;
         byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX;
         boolean isSuccess = false;
+        boolean isEscalated = false;
 
         latchLockTable();
         validateJob(txnContext);
@@ -928,18 +1002,19 @@
         dLockInfo = datasetResourceHT.get(datasetId);
         jobInfo = jobHT.get(jobId);
 
-        if (ALLOW_UPGRADE_FROM_ENTITY_TO_DATASET) {
-            if (jobInfo != null && dLockInfo != null && entityHashValue != -1) {
+        if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+            if (datasetLockMode == LockMode.IS && jobInfo != null && dLockInfo != null) {
                 int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
                 switch (upgradeStatus) {
-                    case DO_UPGRADE:
+                    case DO_ESCALATE:
                         entityHashValue = -1;
+                        isEscalated = true;
                         break;
-                        
-                    case UPGRADED:
+
+                    case ESCALATED:
                         unlatchLockTable();
                         return true;
-                        
+
                     default:
                         break;
                 }
@@ -976,6 +1051,12 @@
             }
             jobInfo.addHoldingResource(entityInfo);
 
+            if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+                if (datasetLockMode == LockMode.IS) {
+                    jobInfo.increaseDatasetISLockCount(dId);
+                }
+            }
+
             if (IS_DEBUG_MODE) {
                 trackLockRequest("Granted", RequestType.TRY_LOCK, datasetId, entityHashValue, lockMode, txnContext,
                         dLockInfo, eLockInfo);
@@ -1001,6 +1082,15 @@
             }
         }
 
+        if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
+            if (isEscalated) {
+                releaseDatasetISLocks(jobInfo, jobId, datasetId, txnContext);
+            }
+            if (datasetLockMode == LockMode.IS) {
+                jobInfo.increaseDatasetISLockCount(dId);
+            }
+        }
+
         if (IS_DEBUG_MODE) {
             if (isSuccess) {
                 trackLockRequest("Granted", RequestType.TRY_LOCK, datasetId, entityHashValue, lockMode, txnContext,