add DeadlockTracker
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 3dc4fa6..ba8d86d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -17,6 +17,7 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -163,10 +164,13 @@
     private void enqueueWaiter(final ResourceGroup group, final long reqSlot, final long resSlot, final long jobSlot,
             final LockAction act, ITransactionContext txnContext) throws ACIDException {
         final Queue queue = act.modify ? upgrader : waiter;
-        if (!introducesDeadlock(resSlot, jobSlot)) {
-            queue.add(reqSlot, resSlot, jobSlot);
+        if (introducesDeadlock(resSlot, jobSlot, NOPTracker.INSTANCE)) {
+            DeadlockTracker tracker = new CollectingTracker();
+            tracker.pushJob(jobSlot);
+            introducesDeadlock(resSlot, jobSlot, tracker);
+            requestAbort(txnContext, tracker.toString());
         } else {
-            requestAbort(txnContext);
+            queue.add(reqSlot, resSlot, jobSlot);
         }
         try {
             group.await(txnContext);
@@ -175,6 +179,64 @@
         }
     }
 
+    interface DeadlockTracker {
+        void pushResource(long resSlot);
+        void pushRequest(long reqSlot);
+        void pushJob(long jobSlot);
+        void pop();
+    }
+    
+    static class NOPTracker implements DeadlockTracker {        
+        static final DeadlockTracker INSTANCE = new NOPTracker();
+
+        public void pushResource(long resSlot) {}
+        public void pushRequest(long reqSlot) {}
+        public void pushJob(long jobSlot) {}
+        public void pop() {}
+    }
+    
+    static class CollectingTracker implements DeadlockTracker {
+        ArrayList<Long> slots = new ArrayList<Long>();
+        ArrayList<String> types = new ArrayList<String>();
+
+        @Override
+        public void pushResource(long resSlot) {
+            types.add("Resource");
+            slots.add(resSlot);
+            System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+        }
+
+        @Override
+        public void pushRequest(long reqSlot) {
+            types.add("Request");
+            slots.add(reqSlot);
+            System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+        }
+
+        @Override
+        public void pushJob(long jobSlot) {
+            types.add("Job");
+            slots.add(jobSlot);
+            System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+        }
+
+        @Override
+        public void pop() {
+            System.err.println("pop " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+            types.remove(types.size() - 1);
+            slots.remove(slots.size() - 1);            
+        }
+        
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < slots.size(); ++i) {
+                sb.append(types.get(i) + " " + slots.get(i) + "\n");
+            }
+            return sb.toString();
+        }
+    }
+        
     /**
      * determine if adding a job to the waiters of a resource will introduce a
      * cycle in the wait-graph where the job waits on itself
@@ -185,11 +247,15 @@
      *            the slot that contains the information about the job
      * @return true if a cycle would be introduced, false otherwise
      */
-    private boolean introducesDeadlock(final long resSlot, final long jobSlot) {
+    private boolean introducesDeadlock(final long resSlot, final long jobSlot,
+            final DeadlockTracker tracker) {
         synchronized (jobArenaMgr) {
+            tracker.pushResource(resSlot);
             long reqSlot = resArenaMgr.getLastHolder(resSlot);
             while (reqSlot >= 0) {
-                long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
+                tracker.pushRequest(reqSlot);
+                final long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
+                tracker.pushJob(holderJobSlot);
                 if (holderJobSlot == jobSlot) {
                     return true;
                 }
@@ -197,7 +263,7 @@
                 long waiter = jobArenaMgr.getLastWaiter(holderJobSlot);
                 while (waiter >= 0) {
                     long watingOnResSlot = reqArenaMgr.getResourceId(waiter);
-                    if (introducesDeadlock(watingOnResSlot, jobSlot)) {
+                    if (introducesDeadlock(watingOnResSlot, jobSlot, tracker)) {
                         return true;
                     }
                     waiter = reqArenaMgr.getNextJobRequest(waiter);
@@ -206,12 +272,15 @@
                         waiter = jobArenaMgr.getLastUpgrader(holderJobSlot);
                     }
                 }
+                tracker.pop(); // job
+                tracker.pop(); // request
                 reqSlot = reqArenaMgr.getNextRequest(reqSlot);
             }
+            tracker.pop(); // resource
             return false;
         }
     }
-
+    
     @Override
     public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
             throws ACIDException {
@@ -834,14 +903,14 @@
         if (txnContext.getTxnState() == ITransactionManager.ABORTED) {
             throw new ACIDException("" + txnContext.getJobId() + " is in ABORTED state.");
         } else if (txnContext.isTimeout()) {
-            requestAbort(txnContext);
+            requestAbort(txnContext, "timeout");
         }
     }
 
-    private void requestAbort(ITransactionContext txnContext) throws ACIDException {
+    private void requestAbort(ITransactionContext txnContext, String msg) throws ACIDException {
         txnContext.setTimeout(true);
         throw new ACIDException("Transaction " + txnContext.getJobId()
-                + " should abort (requested by the Lock Manager)");
+                + " should abort (requested by the Lock Manager)" + ":\n" + msg);
     }
 
     public StringBuilder append(StringBuilder sb) {