add simple deadlock detection
add JobArenaManager
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
index 7c932f4..2768916 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
@@ -83,6 +83,14 @@
         request.addField("next request", RecordType.Type.INT, null);
 
         typeMap.put(request.name, request);
+
+        RecordType job = new RecordType("Job");
+        job.addField("job id", RecordType.Type.INT, null);
+        job.addField("last holder", RecordType.Type.INT, "-1");
+        job.addField("last waiter", RecordType.Type.INT, "-1");
+        job.addField("last upgrader", RecordType.Type.INT, "-1");
+
+        typeMap.put(job.name, job);
     }
 
     public void execute() throws MojoExecutionException {
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index 67dc5a0..8b2eb06 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -40,7 +40,11 @@
                 <configuration>
                     <arenaManagerTemplate>ArenaManager.java</arenaManagerTemplate>
                     <recordManagerTemplate>RecordManager.java</recordManagerTemplate>
-                    <recordTypes><param>Request</param><param>Resource</param></recordTypes>
+                    <recordTypes>
+                        <param>Job</param>
+                        <param>Request</param>
+                        <param>Resource</param>
+                    </recordTypes>
                     <outputDir>${project.build.directory}/generated-sources/java/edu/uci/ics/asterix/transaction/management/service/locking</outputDir>
                 </configuration>
                 <executions>
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index a208a93..d34d301 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -50,7 +50,8 @@
     private ResourceGroupTable table;
     private ResourceArenaManager resArenaMgr;
     private RequestArenaManager reqArenaMgr;
-    private ConcurrentHashMap<Integer, Integer> jobReqMap; // TODO different impl
+    private JobArenaManager jobArenaMgr;
+    private ConcurrentHashMap<Integer, Integer> jobIdSlotMap;
         
     enum LockAction {
         GET,
@@ -77,7 +78,8 @@
 
         resArenaMgr = new ResourceArenaManager();
         reqArenaMgr = new RequestArenaManager();
-        jobReqMap = new ConcurrentHashMap<>();
+        jobArenaMgr = new JobArenaManager();
+        jobIdSlotMap = new ConcurrentHashMap<>();
     }
 
     public AsterixTransactionProperties getTransactionProperties() {
@@ -95,16 +97,17 @@
         }
 
         int dsId = datasetId.getId();
-        int jobId = txnContext.getJobId().getId();
-
+        
+        int jobSlot = getJobSlot(txnContext.getJobId().getId());
+        
         ResourceGroup group = table.get(datasetId, entityHashValue);
         group.getLatch();
 
         try {
             // 1) Find the resource in the hash table
-            int resSlot = findResource(group, dsId, entityHashValue);
+            int resSlot = getResourceSlot(group, dsId, entityHashValue);
             // 2) create a request entry
-            int reqSlot = createRequest(resSlot, jobId, lockMode);
+            int reqSlot = getRequestSlot(resSlot, jobSlot, lockMode);
             // 3) check lock compatibility
             boolean locked = false;
 
@@ -115,16 +118,20 @@
                         resArenaMgr.setMaxMode(resSlot, lockMode);
                         // no break
                     case GET:
-                        addHolderToResource(resSlot, reqSlot);
+                        addHolder(reqSlot, resSlot, jobSlot);
                         locked = true;
                         break;
                     case WAIT:
                         // TODO can we have more than on upgrader? Or do we need to
                         // abort if we get a second upgrader?
-                        if (findLastHolderForJob(resSlot, jobId) != -1) {
-                            addUpgraderToResource(resSlot, reqSlot);
+                        if (findLastHolderForJob(resSlot, jobSlot) != -1) {
+                            addUpgrader(reqSlot, resSlot, jobSlot);
                         } else {
-                            addWaiterToResource(resSlot, reqSlot);
+                            if (! introducesDeadlock(resSlot, jobSlot)) {
+                                addWaiter(reqSlot, resSlot, jobSlot);
+                            } else {
+                                requestAbort(txnContext);
+                            }
                         }
                         try {
                             group.await();
@@ -134,14 +141,40 @@
                         break;       
                     default:
                         throw new IllegalStateException();
-                }            
-                // TODO where do we check for deadlocks?
+                }
             }
         } finally {
             group.releaseLatch();
         }
     }
 
+    /**
+     * determine if adding a job to the waiters of a resource will introduce a 
+     * cycle in the wait-graph where the job waits on itself
+     * @param resSlot the slot that contains the information about the resource
+     * @param jobSlot the slot that contains the information about the job
+     * @return true if a cycle would be introduced, false otherwise
+     */
+    private boolean introducesDeadlock(int resSlot, int jobSlot) {
+        int reqSlot = resArenaMgr.getLastHolder(resSlot);
+        while (reqSlot >= 0) {
+            int holderJobSlot = reqArenaMgr.getJobId(reqSlot);
+            if (holderJobSlot == jobSlot) {
+                return true;
+            }
+            int waiter = jobArenaMgr.getLastWaiter(holderJobSlot);
+            while (waiter >= 0) {
+                int watingOnResSlot = reqArenaMgr.getResourceId(waiter);
+                if (introducesDeadlock(watingOnResSlot, jobSlot)) {
+                    return true;
+                }
+                waiter = reqArenaMgr.getNextJobRequest(waiter);
+            }
+            reqSlot = reqArenaMgr.getNextRequest(reqSlot);
+        }
+        return false;
+    }
+
     @Override
     public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
             throws ACIDException {
@@ -162,16 +195,16 @@
         }
 
         int dsId = datasetId.getId();
-        int jobId = txnContext.getJobId().getId();
-
+        int jobSlot = getJobSlot(txnContext.getJobId().getId());
+        
         ResourceGroup group = table.get(datasetId, entityHashValue);
         group.getLatch();
 
         try {
             // 1) Find the resource in the hash table
-            int resSlot = findResource(group, dsId, entityHashValue);
+            int resSlot = getResourceSlot(group, dsId, entityHashValue);
             // 2) create a request entry
-            int reqSlot = createRequest(resSlot, jobId, lockMode);
+            int reqSlot = getRequestSlot(resSlot, jobSlot, lockMode);
             // 3) check lock compatibility
 
             int curLockMode = resArenaMgr.getMaxMode(resSlot);
@@ -180,7 +213,7 @@
                     resArenaMgr.setMaxMode(resSlot, lockMode);
                     // no break
                 case GET:
-                    addHolderToResource(resSlot, reqSlot);
+                    addHolder(reqSlot, resSlot, jobSlot);
                     return true;
                 case WAIT:
                     return false;
@@ -217,6 +250,7 @@
         }
         
         int jobId = txnContext.getJobId().getId();
+        int jobSlot = getJobSlot(jobId);
         // since locking is properly nested, finding the last holder is good enough
         
         int holder = resArenaMgr.getLastHolder(resource);
@@ -225,7 +259,7 @@
         }
         
         // remove from the list of holders
-        if (reqArenaMgr.getJobId(holder) == jobId) {
+        if (reqArenaMgr.getJobId(holder) == jobSlot) {
             int next = reqArenaMgr.getNextRequest(holder);
             resArenaMgr.setLastHolder(resource, next);
         } else {
@@ -235,7 +269,7 @@
                 if (holder == -1) {
                     throw new IllegalStateException("no holder for resource " + resource + " and job " + jobId);
                 }
-                if (jobId == reqArenaMgr.getJobId(holder)) {
+                if (jobSlot == reqArenaMgr.getJobId(holder)) {
                     break;
                 }
                 prev = holder;
@@ -248,11 +282,7 @@
         int prevForJob = reqArenaMgr.getPrevJobRequest(holder);
         int nextForJob = reqArenaMgr.getNextJobRequest(holder);
         if (prevForJob == -1) {
-            if (nextForJob == -1) {
-                jobReqMap.remove(jobId);
-            } else {
-                jobReqMap.put(jobId, nextForJob);
-            }
+            jobArenaMgr.setLastHolder(jobSlot, nextForJob);
         } else {
             reqArenaMgr.setNextJobRequest(prevForJob, nextForJob);
         }
@@ -276,7 +306,7 @@
             resArenaMgr.deallocate(resource);
         } else {
             final int oldMaxMode = resArenaMgr.getMaxMode(resource);
-            final int newMaxMode = getNewMaxMode(resource, oldMaxMode);
+            final int newMaxMode = determineNewMaxMode(resource, oldMaxMode);
             resArenaMgr.setMaxMode(resource, newMaxMode);
             if (oldMaxMode != newMaxMode) {
                 // the locking mode didn't change, current waiters won't be
@@ -296,17 +326,36 @@
     @Override
     public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
         int jobId = txnContext.getJobId().getId();
-        Integer head = jobReqMap.get(jobId);
-        while (head != null) {
-            int resource = reqArenaMgr.getResourceId(head);
+        Integer jobSlot = jobIdSlotMap.get(jobId);
+        int holder = jobArenaMgr.getLastHolder(jobSlot);
+        while (holder != -1) {
+            int resource = reqArenaMgr.getResourceId(holder);
             int dsId = resArenaMgr.getDatasetId(resource);
             int pkHashVal = resArenaMgr.getPkHashVal(resource);
             unlock(new DatasetId(dsId), pkHashVal, txnContext);
-            head = jobReqMap.get(jobId);            
+            holder = jobArenaMgr.getLastHolder(jobSlot);
         }
+        jobArenaMgr.deallocate(jobSlot);
     }
         
-    private int findResource(ResourceGroup group, int dsId, int entityHashValue) {
+    private int getJobSlot(int jobId) {
+        Integer jobSlot = jobIdSlotMap.get(jobId);
+        if (jobSlot == null) {
+            jobSlot = new Integer(jobArenaMgr.allocate());
+            Integer oldSlot = jobIdSlotMap.putIfAbsent(jobId, jobSlot);
+            if (oldSlot != null) {
+                // if another thread allocated a slot for this jobId between
+                // get(..) and putIfAbsent(..), we'll use that slot and
+                // deallocate the one we allocated
+                jobArenaMgr.deallocate(jobSlot);
+                jobSlot = oldSlot;
+            }
+        }
+        assert(jobSlot >= 0);
+        return jobSlot;
+    }
+
+    private int getResourceSlot(ResourceGroup group, int dsId, int entityHashValue) {
         int resSlot = findResourceInGroup(group, dsId, entityHashValue);
         
         if (resSlot == -1) {
@@ -322,29 +371,11 @@
         return resSlot;
     }
 
-    private int createRequest(int resSlot, int jobId, byte lockMode) {
+    private int getRequestSlot(int resSlot, int jobSlot, byte lockMode) {
         int reqSlot = reqArenaMgr.allocate();
         reqArenaMgr.setResourceId(reqSlot, resSlot);
         reqArenaMgr.setLockMode(reqSlot, lockMode); // lock mode is a byte!!
-        reqArenaMgr.setJobId(reqSlot, jobId);
-        
-        int prevHead = -1;
-        Integer headOfJobReqQueue = jobReqMap.putIfAbsent(jobId, reqSlot);
-        while (headOfJobReqQueue != null) {
-            // TODO make sure this works (even if the job gets removed from the table)
-            if (jobReqMap.replace(jobId, headOfJobReqQueue, reqSlot)) {
-                prevHead = headOfJobReqQueue;
-                break;
-            }
-            headOfJobReqQueue = jobReqMap.putIfAbsent(jobId, reqSlot);
-        }
-
-        // this goes across arenas
-        reqArenaMgr.setNextJobRequest(reqSlot, prevHead);
-        reqArenaMgr.setPrevJobRequest(reqSlot, -1);
-        if (prevHead >= 0) {
-          reqArenaMgr.setPrevJobRequest(prevHead, reqSlot);
-        }
+        reqArenaMgr.setJobId(reqSlot, jobSlot);
         return reqSlot;
     }
 
@@ -374,13 +405,17 @@
         return -1;        
     }
     
-    private void addHolderToResource(int resource, int request) {
-        final int lastHolder = resArenaMgr.getLastHolder(resource);
+    private void addHolder(int request, int resource, int job) {
+        int lastHolder = resArenaMgr.getLastHolder(resource);
         reqArenaMgr.setNextRequest(request, lastHolder);
         resArenaMgr.setLastHolder(resource, request);
+        
+        lastHolder = jobArenaMgr.getLastHolder(job);
+        insertIntoJobQueue(request, lastHolder);
+        jobArenaMgr.setLastHolder(resource, request);
     }
 
-    private void addWaiterToResource(int resource, int request) {
+    private void addWaiter(int request, int resource, int job) {
         int waiter = resArenaMgr.getFirstWaiter(resource);
         reqArenaMgr.setNextRequest(request, -1);
         if (waiter == -1) {
@@ -388,9 +423,13 @@
         } else {
             appendToRequestQueue(waiter, request);
         }
+        
+        waiter = jobArenaMgr.getLastWaiter(job);
+        insertIntoJobQueue(request, waiter);
+        jobArenaMgr.setLastWaiter(job, request);
     }
 
-    private void addUpgraderToResource(int resource, int request) {
+    private void addUpgrader(int request, int resource, int job) {
         int upgrader = resArenaMgr.getFirstUpgrader(resource);
         reqArenaMgr.setNextRequest(request, -1);
         if (upgrader == -1) {
@@ -398,6 +437,18 @@
         } else {
             appendToRequestQueue(upgrader, request);
         }
+
+        upgrader = jobArenaMgr.getLastUpgrader(job);
+        insertIntoJobQueue(request, upgrader);
+        jobArenaMgr.setLastUpgrader(job, request);
+    }
+
+    private void insertIntoJobQueue(int newRequest, int oldRequest) {
+        reqArenaMgr.setNextJobRequest(newRequest, oldRequest);
+        reqArenaMgr.setPrevJobRequest(newRequest, -1);
+        if (oldRequest >= 0) {
+            reqArenaMgr.setPrevJobRequest(oldRequest, newRequest);
+        }
     }
 
     private void appendToRequestQueue(int head, int appendee) {
@@ -409,7 +460,7 @@
         reqArenaMgr.setNextRequest(head, appendee);        
     }
     
-    private int getNewMaxMode(int resource, int oldMaxMode) {
+    private int determineNewMaxMode(int resource, int oldMaxMode) {
         int newMaxMode = LockMode.NL;
         int holder = resArenaMgr.getLastHolder(resource);
         while (holder != -1) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index 09d7000..2426927 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -32,6 +32,7 @@
 import edu.uci.ics.asterix.common.transactions.ILockManager;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -151,6 +152,7 @@
         if (isSuccess) {
             log("\n*** Test Passed ***");
         }
+        ((LogManager) txnProvider.getLogManager()).stop(false, null);
     }
 
     public boolean handleRequest(LockRequest request) throws ACIDException {