add cache for dataset locks (they are given up when the job finishes)
minor cleanup
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 124b406..635ee42 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -17,6 +17,7 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -51,6 +52,7 @@
     private RequestArenaManager reqArenaMgr;
     private JobArenaManager jobArenaMgr;
     private ConcurrentHashMap<Integer, Long> jobIdSlotMap;
+    private ThreadLocal<DatasetLockCache> dsLockCache;
         
     enum LockAction {
         GET,
@@ -80,6 +82,11 @@
         reqArenaMgr = new RequestArenaManager(lockManagerShrinkTimer);
         jobArenaMgr = new JobArenaManager(lockManagerShrinkTimer);
         jobIdSlotMap = new ConcurrentHashMap<>();
+        dsLockCache = new ThreadLocal<DatasetLockCache>() {
+            protected DatasetLockCache initialValue() {
+                return new DatasetLockCache();
+            }
+        };
     }
 
     public AsterixTransactionProperties getTransactionProperties() {
@@ -92,15 +99,19 @@
         
         log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
         
+        final int dsId = datasetId.getId();        
+        final int jobId = txnContext.getJobId().getId();
+        
         if (entityHashValue != -1) {
             // get the intention lock on the dataset, if we want to lock an individual item
-            byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
-            lock(datasetId, -1, dsLockMode, txnContext);
+            final byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
+            if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+                lock(datasetId, -1, dsLockMode, txnContext);
+                dsLockCache.get().put(jobId, dsId, dsLockMode);
+            }
         }
 
-        int dsId = datasetId.getId();
-        
-        long jobSlot = getJobSlot(txnContext.getJobId().getId());
+        long jobSlot = getJobSlot(jobId);
         
         ResourceGroup group = table.get(datasetId, entityHashValue);
         group.getLatch();
@@ -195,16 +206,21 @@
             throws ACIDException {
         log("tryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
         
+        final int dsId = datasetId.getId();
+        final int jobId = txnContext.getJobId().getId();
+
         if (entityHashValue != -1) {
             // get the intention lock on the dataset, if we want to lock an individual item
             byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
-            if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
-                return false;
+            if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+                if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
+                    return false;
+                }
+                dsLockCache.get().put(jobId, dsId, dsLockMode);
             }
         }
 
-        int dsId = datasetId.getId();
-        long jobSlot = getJobSlot(txnContext.getJobId().getId());
+        long jobSlot = getJobSlot(jobId);
         
         boolean locked = false;
 
@@ -245,10 +261,8 @@
             group.releaseLatch();
         }
         
-        // if we did acquire the dataset lock, but not the entity lock, we need to remove it
-        if (!locked && entityHashValue != -1) {
-            unlock(datasetId, -1, txnContext);
-        }
+        // if we did acquire the dataset lock, but not the entity lock, we keep
+        // it anyway and clean it up at the end of the job
         
         return locked;
     }
@@ -315,11 +329,7 @@
             group.releaseLatch();
         }
 
-        // finally remove the intention lock as well
-        if (entityHashValue != -1) {
-            // remove the intention lock on the dataset, if we want to lock an individual item
-            unlock(datasetId, -1, txnContext);
-        }
+        // dataset intention locks are cleaned up at the end of the job
     }
     
     @Override
@@ -387,17 +397,6 @@
         return reqSlot;
     }
 
-    private long findLastHolderForJob(long resource, long job) {
-        long holder = resArenaMgr.getLastHolder(resource);
-        while (holder != -1) {
-            if (job == reqArenaMgr.getJobSlot(holder)) {
-                return holder;
-            }
-            holder = reqArenaMgr.getNextRequest(holder);
-        }
-        return -1;
-    }
-    
     /**
      * when we've got a lock conflict for a different job, we always have to
      * wait, if it is for the same job we either have to
@@ -410,8 +409,8 @@
      * @return
      */
     private LockAction updateActionForSameJob(long resource, long job, byte lockMode) {
-        // TODO we can reduce the numer of things we have to look at by carefully
-        // distinguishing the different lock modes
+        // TODO we can reduce the number of things we have to look at by
+        // carefully distinguishing the different lock modes
         long holder = resArenaMgr.getLastHolder(resource);
         LockAction res = LockAction.WAIT;
         while (holder != -1) {
@@ -822,6 +821,33 @@
             }
         }
     }
+    
+    private static class DatasetLockCache {
+        private long jobId = -1;
+        private HashMap<Integer,Byte> lockCache =  new HashMap<Integer,Byte>();
+        
+        public boolean contains(final int jobId, final int dsId, byte dsLockMode) {
+            if (this.jobId == jobId) {
+                final Byte cachedLockMode = this.lockCache.get(dsId);
+                if (cachedLockMode != null && cachedLockMode == dsLockMode) {
+                    return true;
+                }            
+            } else {
+                this.jobId = -1;
+                this.lockCache.clear();
+            }
+            return false;
+        }
+        
+        public void put(final int jobId, final int dsId, byte dsLockMode) {
+            this.jobId = jobId;
+            this.lockCache.put(dsId, dsLockMode);
+        }
+        
+        public String toString() {
+            return "[ " + jobId + " : " + lockCache.toString() + "]";
+        }
+    }
 
     private static class ResourceGroupTable {
         public static final int TABLE_SIZE = 10; // TODO increase