more efficient implementations for instantLock and instantTryLock
add method to determine the "intention mode" of a lock mode
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 9f98680..44bc607 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -108,7 +108,6 @@
     @Override
     public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
             throws ACIDException {
-        
         log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
         
         final int dsId = datasetId.getId();        
@@ -116,30 +115,25 @@
         
         if (entityHashValue != -1) {
             // get the intention lock on the dataset, if we want to lock an individual item
-            final byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
+            final byte dsLockMode = LockMode.intentionMode(lockMode);
             if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
                 lock(datasetId, -1, dsLockMode, txnContext);
                 dsLockCache.get().put(jobId, dsId, dsLockMode);
             }
         }
 
-        long jobSlot = findOrAllocJobSlot(jobId);
+        final long jobSlot = findOrAllocJobSlot(jobId);
         
-        ResourceGroup group = table.get(datasetId, entityHashValue);
+        final ResourceGroup group = table.get(datasetId, entityHashValue);
         group.getLatch();
-
         try {
             validateJob(txnContext);
             
-            // 1) Find the resource in the hash table
-            long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
-            // 2) create a request entry
-            long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
-            // 3) check lock compatibility
+            final long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
+            final long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
             boolean locked = false;
-
             while (! locked) {
-                LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+                final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
                 switch (act) {
                     case UPD:
                         resArenaMgr.setMaxMode(resSlot, lockMode);
@@ -150,17 +144,7 @@
                         break;
                     case WAIT:
                     case CONV:
-                        final Queue queue = act.modify ? Upgrader : Waiter;
-                        if (! introducesDeadlock(resSlot, jobSlot)) {
-                            queue.add(reqSlot, resSlot, jobSlot);
-                        } else {
-                            requestAbort(txnContext);
-                        }
-                        try {
-                            group.await(txnContext);
-                        } finally {
-                            queue.remove(reqSlot, resSlot, jobSlot);
-                        }
+                        enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext);
                         break;
                     case ERR:
                     default:
@@ -172,6 +156,22 @@
         }
     }
 
+    private void enqueueWaiter(final ResourceGroup group, final long reqSlot,
+            final long resSlot, final long jobSlot, final LockAction act,
+            ITransactionContext txnContext) throws ACIDException {
+        final Queue queue = act.modify ? Upgrader : Waiter;
+        if (! introducesDeadlock(resSlot, jobSlot)) {
+            queue.add(reqSlot, resSlot, jobSlot);
+        } else {
+            requestAbort(txnContext);
+        }
+        try {
+            group.await(txnContext);
+        } finally {
+            queue.remove(reqSlot, resSlot, jobSlot);
+        }
+    }
+
     /**
      * determine if adding a job to the waiters of a resource will introduce a 
      * cycle in the wait-graph where the job waits on itself
@@ -179,7 +179,7 @@
      * @param jobSlot the slot that contains the information about the job
      * @return true if a cycle would be introduced, false otherwise
      */
-    private boolean introducesDeadlock(long resSlot, long jobSlot) {
+    private boolean introducesDeadlock(final long resSlot, final long jobSlot) {
         long reqSlot = resArenaMgr.getLastHolder(resSlot);
         while (reqSlot >= 0) {
             long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
@@ -208,9 +208,69 @@
     public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
             throws ACIDException {
         log("instantLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+        
+        final int dsId = datasetId.getId();        
+        final int jobId = txnContext.getJobId().getId();
+        
+        if (entityHashValue != -1) {
+            // get the intention lock on the dataset, if we want to lock an individual item
+            final byte dsLockMode = LockMode.intentionMode(lockMode);
+            if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+                lock(datasetId, -1, dsLockMode, txnContext);
+                dsLockCache.get().put(jobId, dsId, dsLockMode);
+            }
+        }
 
-        lock(datasetId, entityHashValue, lockMode, txnContext);
-        unlock(datasetId, entityHashValue, lockMode, txnContext);
+        final ResourceGroup group = table.get(datasetId, entityHashValue);
+        if (group.firstResourceIndex.get() == -1l) {
+            validateJob(txnContext);
+            // if we do not have a resource in the group, we know that the
+            // resource that we are looking for is not locked 
+            return;
+        }
+        
+        // we only allocate a request slot if we actually have to wait
+        long reqSlot = -1;
+
+        group.getLatch();
+        try {
+            validateJob(txnContext);
+            
+            final long resSlot = findResourceInGroup(group, dsId, entityHashValue);
+            if (resSlot < 0) {
+                // if we don't find the resource, there are no locks on it.
+                return;
+            }
+
+            final long jobSlot = findOrAllocJobSlot(jobId);
+            
+            boolean locked = false;
+            while (! locked) {
+                final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+                switch (act) {
+                    case UPD:
+                    case GET:
+                        locked = true;
+                        break;
+                    case WAIT:
+                    case CONV:
+                        if (reqSlot == -1) {
+                            reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
+                        }
+                        enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext);
+                        break;
+                    case ERR:
+                    default:
+                        throw new IllegalStateException();
+                }
+            }
+        } finally {
+            if (reqSlot != -1) {
+                // deallocate request, if we allocated one earlier
+                reqArenaMgr.deallocate(reqSlot);
+            }
+            group.releaseLatch();
+        }
     }
 
     @Override
@@ -223,7 +283,7 @@
 
         if (entityHashValue != -1) {
             // get the intention lock on the dataset, if we want to lock an individual item
-            byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
+            final byte dsLockMode = LockMode.intentionMode(lockMode);
             if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
                 if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
                     return false;
@@ -232,35 +292,28 @@
             }
         }
 
-        long jobSlot = findOrAllocJobSlot(jobId);
+        final long jobSlot = findOrAllocJobSlot(jobId);
         
-        boolean locked = false;
-
-        ResourceGroup group = table.get(datasetId, entityHashValue);
+        final ResourceGroup group = table.get(datasetId, entityHashValue);
         group.getLatch();
 
         try {
             validateJob(txnContext);
 
-            // 1) Find the resource in the hash table
-            long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
-            // 2) create a request entry
-            long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
-            // 3) check lock compatibility
+            final long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
+            final long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
             
-            LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+            final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
             switch (act) {
                 case UPD:
                     resArenaMgr.setMaxMode(resSlot, lockMode);
                     // no break
                 case GET:
                     addHolder(reqSlot, resSlot, jobSlot);
-                    locked = true;
-                    break;
+                    return true;
                 case WAIT:
                 case CONV:
-                    locked = false;
-                    break;
+                    return false;
                 default:
                     throw new IllegalStateException();
             }
@@ -270,8 +323,6 @@
         
         // if we did acquire the dataset lock, but not the entity lock, we keep
         // it anyway and clean it up at the end of the job
-        
-        return locked;
     }
 
     @Override
@@ -279,11 +330,55 @@
             ITransactionContext txnContext) throws ACIDException {
         log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
         
-        if (tryLock(datasetId, entityHashValue, lockMode, txnContext)) {
-            unlock(datasetId, entityHashValue, lockMode, txnContext);
+        final int dsId = datasetId.getId();
+        final int jobId = txnContext.getJobId().getId();
+
+        if (entityHashValue != -1) {
+            // get the intention lock on the dataset, if we want to lock an individual item
+            final byte dsLockMode = LockMode.intentionMode(lockMode);
+            if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
+                if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
+                    return false;
+                }
+                dsLockCache.get().put(jobId, dsId, dsLockMode);
+            }
+        }
+
+        final ResourceGroup group = table.get(datasetId, entityHashValue);
+        if (group.firstResourceIndex.get() == -1l) {
+            validateJob(txnContext);
+            // if we do not have a resource in the group, we know that the
+            // resource that we are looking for is not locked 
             return true;
         }
-        return false;
+        
+        group.getLatch();
+        try {
+            validateJob(txnContext);
+            
+            final long resSlot = findResourceInGroup(group, dsId, entityHashValue);
+            if (resSlot < 0) {
+                // if we don't find the resource, there are no locks on it.
+                return true;
+            }
+
+            final long jobSlot = findOrAllocJobSlot(jobId);
+            
+            LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
+            switch (act) {
+                case UPD:
+                case GET:
+                    return true;
+                case WAIT:
+                case CONV:
+                    return false;
+                case ERR:
+                default:
+                    throw new IllegalStateException();
+                }
+        } finally {
+            group.releaseLatch();
+        }
     }
 
     @Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
index 1a73fe0..c17689b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
@@ -37,6 +37,15 @@
             public static final byte S  = 3;
             public static final byte X  = 4;
             
+            public static byte intentionMode(byte mode) {
+                switch (mode) {
+                    case S:  return IS;
+                    case X:  return IX;
+                    default: throw new IllegalArgumentException(
+                            "no intention lock mode for " + toString(mode));
+                }                
+            }
+            
             public static String toString(byte mode) {
                 switch (mode) {
                     case NL: return "NL";