fix case where the same job re-acquires the exact same lock
add debugging code
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
index 66f366e..bd8dee4 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/Generator.java
@@ -100,6 +100,11 @@
                         final Field field = resource.fields.get(i);
                         field.appendInitializers(sb, indent, 3);
                     }
+                } else if (line.contains("@CHECK_SLOT@")) {
+                    for (int i = 0; i < resource.size(); ++i) {                        
+                        final Field field = resource.fields.get(i);
+                        field.appendChecks(sb, indent, 3);
+                    }
                 } else if (line.contains("@PRINT_BUFFER@")) {
                     resource.appendBufferPrinter(sb, indent, 3);
                     sb.append('\n');
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
index 1606afc..26ca4c4 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordType.java
@@ -50,7 +50,7 @@
             }
             return sb.toString();        
         }
-                
+
         StringBuilder appendMemoryManagerGetMethod(StringBuilder sb, String indent, int level) {
             sb = indent(sb, indent, level);
             sb.append("public ")
@@ -59,7 +59,11 @@
               .append(methodName("get"))
               .append("(int slotNum) {\n");
             sb = indent(sb, indent, level + 1);
-            sb.append("final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;\n");
+            sb.append("final Buffer buf = buffers.get(slotNum / NO_SLOTS);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("buf.checkSlot(slotNum % NO_SLOTS);\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("final ByteBuffer b = buf.bb;\n");
             sb = indent(sb, indent, level + 1);
             sb.append("return b.")
               .append(bbGetter(type))
@@ -155,6 +159,22 @@
             return sb;
         }
         
+        StringBuilder appendChecks(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("if (bb.")
+              .append(bbGetter(type))
+              .append("(itemOffset + ")
+              .append(offsetName())
+              .append(") == ")
+              .append(deadMemInitializer(type))
+              .append(") {\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("System.err.println(allocList.get(slotNum));\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            return sb;
+        }
+        
         String offsetName() {
             String words[] = name.split(" ");
             assert(words.length > 0);
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
index 01df3f0..801d556 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.asterix.transaction.management.service.locking;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
@@ -178,10 +180,14 @@
     }
 
     static class Buffer {
+        public static final boolean TRACK_ALLOC = true;
+        
         private ByteBuffer bb;
         private int freeSlotNum;
         private int occupiedSlots = -1; //-1 represents 'deinitialized' state.
         
+        private ArrayList<Alloc> allocList;
+        
         Buffer() {
             initialize();
         }
@@ -195,6 +201,13 @@
                 setNextFreeSlot(i, i + 1);
             }
             setNextFreeSlot(NO_SLOTS - 1, -1); //-1 represents EOL(end of link)
+            
+            if (TRACK_ALLOC) {
+                allocList = new ArrayList<Alloc>(NO_SLOTS);
+                for (int i = 0; i < NO_SLOTS; ++i) {
+                    allocList.add(new Alloc());
+                }
+            }
         }
         
         public void deinitialize() {
@@ -219,6 +232,7 @@
             freeSlotNum = getNextFreeSlot(slotNum);
             @INIT_SLOT@
             occupiedSlots++;
+            if (TRACK_ALLOC) allocList.get(slotNum).alloc();
             return slotNum;
         }
     
@@ -227,6 +241,7 @@
             setNextFreeSlot(slotNum, freeSlotNum);
             freeSlotNum = slotNum;
             occupiedSlots--;
+            if (TRACK_ALLOC) allocList.get(slotNum).free();
         }
 
         public int getNextFreeSlot(int slotNum) {
@@ -252,6 +267,36 @@
             append(sb);
             return sb.toString();
         }
+        
+        private void checkSlot(int slotNum) {
+            final int itemOffset = (slotNum % NO_SLOTS) * ITEM_SIZE;
+            // @CHECK_SLOT@
+        }
+        
+        static class Alloc {
+            String alloc;
+            String free;
+            
+            void alloc() {
+                alloc = getStackTrace();
+            }
+            
+            void free() {
+                free = getStackTrace();
+            }
+
+            private String getStackTrace() {
+                StringWriter sw = new StringWriter();
+                PrintWriter pw = new PrintWriter(sw);
+                new Exception().printStackTrace(pw);
+                pw.close();
+                return sw.toString();                
+            }
+            
+            public String toString() {
+                return "allocation stack:\n" + alloc + "\nfree stack\n" + free;
+            }
+        }
     }
     
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index b1507eb..9679f86 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -56,16 +56,17 @@
     enum LockAction {
         GET,
         UPD, // special version of GET that updates the max lock mode
-        WAIT
+        WAIT,
+        CONV // convert (upgrade) a lock (e.g. from S to X)
     }
     
     static LockAction[][] ACTION_MATRIX = {
         // new    NL              IS               IX                S                X
-        { LockAction.GET,  LockAction.UPD,  LockAction.UPD,  LockAction.UPD,  LockAction.UPD  }, // NL
-        { LockAction.WAIT, LockAction.GET,  LockAction.UPD,  LockAction.UPD,  LockAction.WAIT }, // IS
-        { LockAction.WAIT, LockAction.GET,  LockAction.GET,  LockAction.WAIT, LockAction.WAIT }, // IX
-        { LockAction.WAIT, LockAction.GET,  LockAction.WAIT, LockAction.GET,  LockAction.WAIT }, // S
-        { LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT }  // X
+        { LockAction.GET, LockAction.UPD,  LockAction.UPD,  LockAction.UPD,  LockAction.UPD  }, // NL
+        { LockAction.GET, LockAction.GET,  LockAction.UPD,  LockAction.UPD,  LockAction.WAIT }, // IS
+        { LockAction.GET, LockAction.GET,  LockAction.GET,  LockAction.WAIT, LockAction.WAIT }, // IX
+        { LockAction.GET, LockAction.GET,  LockAction.WAIT, LockAction.GET,  LockAction.WAIT }, // S
+        { LockAction.GET, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT }  // X
     };
         
     public ConcurrentLockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
@@ -90,6 +91,8 @@
     public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
             throws ACIDException {
         
+        log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+        
         if (entityHashValue != -1) {
             // get the intention lock on the dataset, if we want to lock an individual item
             byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
@@ -113,7 +116,11 @@
 
             while (! locked) {
                 int curLockMode = resArenaMgr.getMaxMode(resSlot);
-                switch (ACTION_MATRIX[curLockMode][lockMode]) {
+                LockAction act = ACTION_MATRIX[curLockMode][lockMode];
+                if (act == LockAction.WAIT) {
+                    act = updateActionForSameJob(resSlot, jobSlot, lockMode);
+                }
+                switch (act) {
                     case UPD:
                         resArenaMgr.setMaxMode(resSlot, lockMode);
                         // no break
@@ -122,29 +129,21 @@
                         locked = true;
                         break;
                     case WAIT:
+                        if (! introducesDeadlock(resSlot, jobSlot)) {
+                            addWaiter(reqSlot, resSlot, jobSlot);
+                        } else {
+                            requestAbort(txnContext);
+                        }
+                        group.await(txnContext);
+                        removeWaiter(reqSlot, resSlot, jobSlot);
+                        break;
+                    case CONV:
                         // TODO can we have more than on upgrader? Or do we need to
                         // abort if we get a second upgrader?
-                        boolean upgrade = findLastHolderForJob(resSlot, jobSlot) != -1;
-                        if (upgrade) {
-                            addUpgrader(reqSlot, resSlot, jobSlot);
-                        } else {
-                            if (! introducesDeadlock(resSlot, jobSlot)) {
-                                addWaiter(reqSlot, resSlot, jobSlot);
-                            } else {
-                                requestAbort(txnContext);
-                            }
-                        }
-                        try {
-                            group.await();
-                            if (upgrade) {
-                                removeUpgrader(reqSlot, resSlot, jobSlot);
-                            } else {
-                                removeWaiter(reqSlot, resSlot, jobSlot);
-                            }
-                        } catch (InterruptedException e) {
-                            throw new ACIDException(txnContext, "interrupted", e);
-                        }
-                        break;       
+                        addUpgrader(reqSlot, resSlot, jobSlot);
+                        group.await(txnContext);
+                        removeUpgrader(reqSlot, resSlot, jobSlot);
+                        break;                        
                     default:
                         throw new IllegalStateException();
                 }
@@ -184,6 +183,8 @@
     @Override
     public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
             throws ACIDException {
+        log("instantLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+
         lock(datasetId, entityHashValue, lockMode, txnContext);
         unlock(datasetId, entityHashValue, txnContext);
     }
@@ -191,6 +192,7 @@
     @Override
     public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
             throws ACIDException {
+        log("tryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
         
         if (entityHashValue != -1) {
             // get the intention lock on the dataset, if we want to lock an individual item
@@ -203,6 +205,8 @@
         int dsId = datasetId.getId();
         int jobSlot = getJobSlot(txnContext.getJobId().getId());
         
+        boolean locked = false;
+
         ResourceGroup group = table.get(datasetId, entityHashValue);
         group.getLatch();
 
@@ -212,17 +216,24 @@
             // 2) create a request entry
             int reqSlot = getRequestSlot(resSlot, jobSlot, lockMode);
             // 3) check lock compatibility
-
+            
             int curLockMode = resArenaMgr.getMaxMode(resSlot);
-            switch (ACTION_MATRIX[curLockMode][lockMode]) {
+            LockAction act = ACTION_MATRIX[curLockMode][lockMode];
+            if (act == LockAction.WAIT) {
+                act = updateActionForSameJob(resSlot, jobSlot, lockMode);
+            }
+            switch (act) {
                 case UPD:
                     resArenaMgr.setMaxMode(resSlot, lockMode);
                     // no break
                 case GET:
                     addHolder(reqSlot, resSlot, jobSlot);
-                    return true;
+                    locked = true;
+                    break;
                 case WAIT:
-                    return false;
+                case CONV:
+                    locked = false;
+                    break;
                 default:
                     throw new IllegalStateException();
             }
@@ -230,11 +241,20 @@
         } finally {
             group.releaseLatch();
         }
+        
+        // if we did acquire the dataset lock, but not the entity lock, we need to remove it
+        if (!locked && entityHashValue != -1) {
+            unlock(datasetId, -1, txnContext);
+        }
+        
+        return locked;
     }
 
     @Override
     public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
             ITransactionContext txnContext) throws ACIDException {
+        log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+        
         if (tryLock(datasetId, entityHashValue, lockMode, txnContext)) {
             unlock(datasetId, entityHashValue, txnContext);
             return true;
@@ -244,49 +264,54 @@
 
     @Override
     public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException {
+        log("unlock", datasetId.getId(), entityHashValue, LockMode.NL, txnContext);
 
         ResourceGroup group = table.get(datasetId, entityHashValue);
         group.getLatch();
 
-        int dsId = datasetId.getId();
-        int resource = findResourceInGroup(group, dsId, entityHashValue);
-        
-        if (resource < 0) {
-            throw new IllegalStateException("resource (" + dsId + ",  " + entityHashValue + ") not found");
-        }
-        
-        int jobId = txnContext.getJobId().getId();
-        int jobSlot = getJobSlot(jobId);
+        try {
 
-        // since locking is properly nested, finding the last holder for a job is good enough        
-        int holder = removeLastHolder(resource, jobSlot);
-        
-        // deallocate request
-        reqArenaMgr.deallocate(holder);
-        // deallocate resource or fix max lock mode
-        if (resourceNotUsed(resource)) {
-            int prev = group.firstResourceIndex.get();
-            if (prev == resource) {
-                group.firstResourceIndex.set(resArenaMgr.getNext(resource));
-            } else {
-                while (resArenaMgr.getNext(prev) != resource) {
-                    prev = resArenaMgr.getNext(prev);
+            int dsId = datasetId.getId();
+            int resource = findResourceInGroup(group, dsId, entityHashValue);
+
+            if (resource < 0) {
+                throw new IllegalStateException("resource (" + dsId + ",  " + entityHashValue + ") not found");
+            }
+
+            int jobId = txnContext.getJobId().getId();
+            int jobSlot = getJobSlot(jobId);
+
+            // since locking is properly nested, finding the last holder for a job is good enough        
+            int holder = removeLastHolder(resource, jobSlot);
+
+            // deallocate request
+            reqArenaMgr.deallocate(holder);
+            // deallocate resource or fix max lock mode
+            if (resourceNotUsed(resource)) {
+                int prev = group.firstResourceIndex.get();
+                if (prev == resource) {
+                    group.firstResourceIndex.set(resArenaMgr.getNext(resource));
+                } else {
+                    while (resArenaMgr.getNext(prev) != resource) {
+                        prev = resArenaMgr.getNext(prev);
+                    }
+                    resArenaMgr.setNext(prev, resArenaMgr.getNext(resource));
                 }
-                resArenaMgr.setNext(prev, resArenaMgr.getNext(resource));
+                resArenaMgr.deallocate(resource);
+            } else {
+                final int oldMaxMode = resArenaMgr.getMaxMode(resource);
+                final int newMaxMode = determineNewMaxMode(resource, oldMaxMode);
+                resArenaMgr.setMaxMode(resource, newMaxMode);
+                if (oldMaxMode != newMaxMode) {
+                    // the locking mode didn't change, current waiters won't be
+                    // able to acquire the lock, so we do not need to signal them
+                    group.wakeUp();
+                }
             }
-            resArenaMgr.deallocate(resource);
-        } else {
-            final int oldMaxMode = resArenaMgr.getMaxMode(resource);
-            final int newMaxMode = determineNewMaxMode(resource, oldMaxMode);
-            resArenaMgr.setMaxMode(resource, newMaxMode);
-            if (oldMaxMode != newMaxMode) {
-                // the locking mode didn't change, current waiters won't be
-                // able to acquire the lock, so we do not need to signal them
-                group.wakeUp();
-            }
+        } finally {
+            group.releaseLatch();
         }
-        group.releaseLatch();
-        
+
         // finally remove the intention lock as well
         if (entityHashValue != -1) {
             // remove the intention lock on the dataset, if we want to lock an individual item
@@ -296,8 +321,14 @@
     
     @Override
     public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
+        log("releaseLocks", -1, -1, LockMode.NL, txnContext);
+
         int jobId = txnContext.getJobId().getId();
         Integer jobSlot = jobIdSlotMap.get(jobId);
+        if (jobSlot == null) {
+            // we don't know the job, so there are no locks for it - we're done
+            return;
+        }
         int holder = jobArenaMgr.getLastHolder(jobSlot);
         while (holder != -1) {
             int resource = reqArenaMgr.getResourceId(holder);
@@ -335,10 +366,8 @@
             resSlot = resArenaMgr.allocate();
             resArenaMgr.setDatasetId(resSlot, dsId);
             resArenaMgr.setPkHashVal(resSlot, entityHashValue);
-
-            if (group.firstResourceIndex.get() == -1) {
-                group.firstResourceIndex.set(resSlot);
-            }
+            resArenaMgr.setNext(resSlot, group.firstResourceIndex.get());
+            group.firstResourceIndex.set(resSlot);
         }
         return resSlot;
     }
@@ -362,6 +391,35 @@
         return -1;
     }
     
+    /**
+     * when we've got a lock conflict for a different job, we always have to
+     * wait, if it is for the same job we either have to
+     * a) (wait and) convert the lock once conversion becomes viable or
+     * b) acquire the lock if we want to lock the same resource with the same 
+     * lock mode for the same job.
+     * @param resource the resource slot that's being locked
+     * @param job the job slot of the job locking the resource
+     * @param lockMode the lock mode that the resource should be locked with
+     * @return
+     */
+    private LockAction updateActionForSameJob(int resource, int job, byte lockMode) {
+        // TODO we can reduce the numer of things we have to look at by carefully
+        // distinguishing the different lock modes
+        int holder = resArenaMgr.getLastHolder(resource);
+        LockAction res = LockAction.WAIT;
+        while (holder != -1) {
+            if (job == reqArenaMgr.getJobId(holder)) {
+                if (reqArenaMgr.getLockMode(holder) == lockMode) {
+                    return LockAction.GET;
+                } else {
+                    res = LockAction.CONV;
+                }
+            }
+            holder = reqArenaMgr.getNextRequest(holder);
+        }
+        return res;
+    }
+    
     private int findResourceInGroup(ResourceGroup group, int dsId, int entityHashValue) {
         int resSlot = group.firstResourceIndex.get();
         while (resSlot != -1) {
@@ -570,6 +628,25 @@
                 && resArenaMgr.getFirstWaiter(resource) == -1;
     }
 
+    private void log(String string, int id, int entityHashValue, byte lockMode, ITransactionContext txnContext) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{ op : ").append(string);
+        if (id != -1) {
+            sb.append(" , dataset : ").append(id);
+        }
+        if (entityHashValue != -1) {
+            sb.append(" , entity : ").append(entityHashValue);
+        }
+        if (lockMode != LockMode.NL) {
+            sb.append(" , mode : ").append(LockMode.toString(lockMode));
+        }
+        if (txnContext != null) {
+            sb.append(" , jobId : ").append(txnContext.getJobId());            
+        }
+        sb.append(" }");
+        //System.err.println("XXX" + sb.toString());
+    }
+
     private void validateJob(ITransactionContext txnContext) throws ACIDException {
         if (txnContext.getTxnState() == ITransactionManager.ABORTED) {
             throw new ACIDException("" + txnContext.getJobId() + " is in ABORTED state.");
@@ -896,10 +973,12 @@
         }
         
         void getLatch() {
+            log("latch " + toString());
             latch.writeLock().lock();
         }
         
         void releaseLatch() {
+            log("release " + toString());
             latch.writeLock().unlock();
         }
         
@@ -907,12 +986,28 @@
             return latch.hasQueuedThreads();
         }
         
-        void await() throws InterruptedException {
-            condition.await();
+        void await(ITransactionContext txnContext) throws ACIDException {
+            log("wait for " + toString());
+            try {
+                condition.await();
+            } catch (InterruptedException e) {
+                throw new ACIDException(txnContext, "interrupted", e);
+            }
         }
         
         void wakeUp() {
+            log("notify " + toString());
             condition.signalAll();
         }
+        
+        void log(String s) {
+            //System.out.println("XXXX " + s);
+        }
+        
+        public String toString() {
+            return "{ id : " + hashCode() 
+                    + ", first : " + firstResourceIndex.toString() 
+                    + ", waiters : " + (hasWaiters() ? "true" : "false") + " }";
+        }
     }
 }