ASTERIXDB-1118: allow for lock conversion

Also improve debugability of ConcurrentLockManager and add new unit tests.

Change-Id: If49ed8d48fa8c71a52c880d4f42a2badbe6a57d7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/474
Reviewed-by: Taewoo Kim <wangsaeu@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index e5ad1b4..ae4aaa5 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -116,6 +116,6 @@
             <artifactId>mockito-all</artifactId>
             <version>1.10.19</version>
         </dependency>
-	</dependencies>
+    </dependencies>
 
 </project>
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 3114195..af508f5 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -19,6 +19,14 @@
 
 package org.apache.asterix.transaction.management.service.locking;
 
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -31,30 +39,20 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.config.AsterixTransactionProperties;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.DatasetId;
-import org.apache.asterix.common.transactions.ILockManager;
-import org.apache.asterix.common.transactions.ITransactionContext;
-import org.apache.asterix.common.transactions.ITransactionManager;
-import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
-import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
-import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
-
 /**
- * An implementation of the ILockManager interface.
+ * A concurrent implementation of the ILockManager interface.
  *
- * @author tillw
+ * @see ResourceGroupTable
+ * @see ResourceGroup
  */
 public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent {
 
-    private static final Logger LOGGER = Logger.getLogger(ConcurrentLockManager.class.getName());
-    private static final Level LVL = Level.FINER;
+    static final Logger LOGGER = Logger.getLogger(ConcurrentLockManager.class.getName());
+    static final Level LVL = Level.FINER;
 
     public static final boolean DEBUG_MODE = false;//true
     public static final boolean CHECK_CONSISTENCY = false;
 
-    private TransactionSubsystem txnSubsystem;
     private ResourceGroupTable table;
     private ResourceArenaManager resArenaMgr;
     private RequestArenaManager reqArenaMgr;
@@ -81,22 +79,21 @@
 
     static LockAction[][] ACTION_MATRIX = {
             // new    NL              IS               IX                S                X
-            { LockAction.ERR, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD }, // NL
-            { LockAction.ERR, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT }, // IS
-            { LockAction.ERR, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT }, // IX
-            { LockAction.ERR, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT }, // S
-            { LockAction.ERR, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT } // X
+            {LockAction.ERR, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD}, // NL
+            {LockAction.ERR, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT}, // IS
+            {LockAction.ERR, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT}, // IX
+            {LockAction.ERR, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT}, // S
+            {LockAction.ERR, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT} // X
     };
 
-    public ConcurrentLockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
-        this.txnSubsystem = txnSubsystem;
+    public ConcurrentLockManager(final int lockManagerShrinkTimer) throws ACIDException {
+        this(lockManagerShrinkTimer, Runtime.getRuntime().availableProcessors() * 2, 1024);
+        // TODO increase table size?
+    }
 
-        this.table = new ResourceGroupTable();
-
-        final int lockManagerShrinkTimer = txnSubsystem.getTransactionProperties().getLockManagerShrinkTimer();
-
-        int noArenas = Runtime.getRuntime().availableProcessors() * 2;
-
+    public ConcurrentLockManager(final int lockManagerShrinkTimer, final int noArenas, final int tableSize) throws
+            ACIDException {
+        this.table = new ResourceGroupTable(tableSize);
         resArenaMgr = new ResourceArenaManager(noArenas, lockManagerShrinkTimer);
         reqArenaMgr = new RequestArenaManager(noArenas, lockManagerShrinkTimer);
         jobArenaMgr = new JobArenaManager(noArenas, lockManagerShrinkTimer);
@@ -108,10 +105,6 @@
         };
     }
 
-    public AsterixTransactionProperties getTransactionProperties() {
-        return this.txnSubsystem.getTransactionProperties();
-    }
-
     @Override
     public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
             throws ACIDException {
@@ -142,6 +135,18 @@
             while (!locked) {
                 final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
                 switch (act) {
+                    case CONV:
+                        if (introducesDeadlock(resSlot, jobSlot, NOPTracker.INSTANCE)) {
+                            DeadlockTracker tracker = new CollectingTracker();
+                            tracker.pushJob(jobSlot);
+                            introducesDeadlock(resSlot, jobSlot, tracker);
+                            requestAbort(txnContext, tracker.toString());
+                            break;
+                        } else if (hasOtherHolders(resSlot, jobSlot)) {
+                            enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext);
+                            break;
+                        }
+                        //no break
                     case UPD:
                         resArenaMgr.setMaxMode(resSlot, lockMode);
                         // no break
@@ -150,7 +155,6 @@
                         locked = true;
                         break;
                     case WAIT:
-                    case CONV:
                         enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext);
                         break;
                     case ERR:
@@ -161,6 +165,8 @@
             if (entityHashValue == -1) {
                 dsLockCache.get().put(jobId, dsId, lockMode);
             }
+        } catch (InterruptedException e) {
+            throw new WaitInterruptedException(txnContext, "interrupted", e);
         } finally {
             group.releaseLatch();
         }
@@ -170,7 +176,8 @@
     }
 
     private void enqueueWaiter(final ResourceGroup group, final long reqSlot, final long resSlot, final long jobSlot,
-            final LockAction act, ITransactionContext txnContext) throws ACIDException {
+                               final LockAction act, ITransactionContext txnContext) throws ACIDException,
+            InterruptedException {
         final Queue queue = act.modify ? upgrader : waiter;
         if (introducesDeadlock(resSlot, jobSlot, NOPTracker.INSTANCE)) {
             DeadlockTracker tracker = new CollectingTracker();
@@ -214,6 +221,9 @@
     }
 
     static class CollectingTracker implements DeadlockTracker {
+
+        static final boolean DEBUG = false;
+
         ArrayList<Long> slots = new ArrayList<Long>();
         ArrayList<String> types = new ArrayList<String>();
 
@@ -221,26 +231,26 @@
         public void pushResource(long resSlot) {
             types.add("Resource");
             slots.add(resSlot);
-            System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+            if (DEBUG) System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
         }
 
         @Override
         public void pushRequest(long reqSlot) {
             types.add("Request");
             slots.add(reqSlot);
-            System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+            if (DEBUG) System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
         }
 
         @Override
         public void pushJob(long jobSlot) {
             types.add("Job");
             slots.add(jobSlot);
-            System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+            if (DEBUG) System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
         }
 
         @Override
         public void pop() {
-            System.err.println("pop " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+            if (DEBUG) System.err.println("pop " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
             types.remove(types.size() - 1);
             slots.remove(slots.size() - 1);
         }
@@ -257,15 +267,19 @@
 
     /**
      * determine if adding a job to the waiters of a resource will introduce a
-     * cycle in the wait-graph where the job waits on itself
-     * 
-     * @param resSlot
-     *            the slot that contains the information about the resource
-     * @param jobSlot
-     *            the slot that contains the information about the job
+     * cycle in the wait-graph where the job waits on itself - but not directly on itself (which happens e.g. in the
+     * case of upgrading a lock from S to X).
+     *
+     * @param resSlot the slot that contains the information about the resource
+     * @param jobSlot the slot that contains the information about the job
      * @return true if a cycle would be introduced, false otherwise
      */
     private boolean introducesDeadlock(final long resSlot, final long jobSlot, final DeadlockTracker tracker) {
+        return introducesDeadlock(resSlot, jobSlot, tracker, 0);
+    }
+
+    private boolean introducesDeadlock(final long resSlot, final long jobSlot,
+                                       final DeadlockTracker tracker, final int depth) {
         synchronized (jobArenaMgr) {
             tracker.pushResource(resSlot);
             long reqSlot = resArenaMgr.getLastHolder(resSlot);
@@ -273,14 +287,22 @@
                 tracker.pushRequest(reqSlot);
                 final long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
                 tracker.pushJob(holderJobSlot);
-                if (holderJobSlot == jobSlot) {
+                if (holderJobSlot == jobSlot && depth != 0) {
                     return true;
                 }
+
+                // To determine if we have a deadlock we need to look at the waiters and at the upgraders.
+                // The scanWaiters flag indicates if we are currently scanning the waiters (true) or the upgraders
+                // (false).
                 boolean scanWaiters = true;
                 long waiter = jobArenaMgr.getLastWaiter(holderJobSlot);
+                if (waiter < 0 && scanWaiters) {
+                    scanWaiters = false;
+                    waiter = jobArenaMgr.getLastUpgrader(holderJobSlot);
+                }
                 while (waiter >= 0) {
-                    long watingOnResSlot = reqArenaMgr.getResourceId(waiter);
-                    if (introducesDeadlock(watingOnResSlot, jobSlot, tracker)) {
+                    long waitingOnResSlot = reqArenaMgr.getResourceId(waiter);
+                    if (introducesDeadlock(waitingOnResSlot, jobSlot, tracker, depth + 1)) {
                         return true;
                     }
                     waiter = reqArenaMgr.getNextJobRequest(waiter);
@@ -289,6 +311,7 @@
                         waiter = jobArenaMgr.getLastUpgrader(holderJobSlot);
                     }
                 }
+
                 tracker.pop(); // job
                 tracker.pop(); // request
                 reqSlot = reqArenaMgr.getNextRequest(reqSlot);
@@ -354,6 +377,8 @@
                         throw new IllegalStateException();
                 }
             }
+        } catch (InterruptedException e) {
+            throw new WaitInterruptedException(txnContext, "interrupted", e);
         } finally {
             if (reqSlot != -1) {
                 // deallocate request, if we allocated one earlier
@@ -422,7 +447,7 @@
 
     @Override
     public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
-            ITransactionContext txnContext) throws ACIDException {
+                                  ITransactionContext txnContext) throws ACIDException {
         log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
         stats.instantTryLock();
 
@@ -524,17 +549,15 @@
                 final int oldMaxMode = resArenaMgr.getMaxMode(resource);
                 final int newMaxMode = determineNewMaxMode(resource, oldMaxMode);
                 resArenaMgr.setMaxMode(resource, newMaxMode);
-                if (oldMaxMode != newMaxMode) {
-                    // the locking mode didn't change, current waiters won't be
-                    // able to acquire the lock, so we do not need to signal them
-                    group.wakeUp();
-                }
+                group.wakeUp();
             }
         } finally {
             group.releaseLatch();
         }
 
-        // dataset intention locks are cleaned up at the end of the job
+        // dataset intention locks are
+        // a) kept in dsLockCache and
+        // b) cleaned up only in releaseLocks at the end of the job
     }
 
     @Override
@@ -606,13 +629,15 @@
             resArenaMgr.setPkHashVal(resSlot, entityHashValue);
             resArenaMgr.setNext(resSlot, group.firstResourceIndex.get());
             group.firstResourceIndex.set(resSlot);
-            if (DEBUG_MODE)
-                LOGGER.finer("new res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue
-                        + ")");
+            if (DEBUG_MODE) {
+                LOGGER.finer("new res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " +
+                        entityHashValue + ")");
+            }
         } else {
-            if (DEBUG_MODE)
-                LOGGER.finer("fnd res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue
-                        + ")");
+            if (DEBUG_MODE) {
+                LOGGER.finer("fnd res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " +
+                        entityHashValue + ")");
+            }
         }
         return resSlot;
     }
@@ -644,13 +669,10 @@
      * a) (wait and) convert the lock once conversion becomes viable or
      * b) acquire the lock if we want to lock the same resource with the same
      * lock mode for the same job.
-     * 
-     * @param resource
-     *            the resource slot that's being locked
-     * @param job
-     *            the job slot of the job locking the resource
-     * @param lockMode
-     *            the lock mode that the resource should be locked with
+     *
+     * @param resource the resource slot that's being locked
+     * @param job      the job slot of the job locking the resource
+     * @param lockMode the lock mode that the resource should be locked with
      * @return
      */
     private LockAction updateActionForSameJob(long resource, long job, byte lockMode) {
@@ -698,6 +720,17 @@
         }
     }
 
+    private boolean hasOtherHolders(long resSlot, long jobSlot) {
+        long holder = resArenaMgr.getLastHolder(resSlot);
+        while (holder != -1) {
+            if (reqArenaMgr.getJobSlot(holder) != jobSlot) {
+                return true;
+            }
+            holder = reqArenaMgr.getNextRequest(holder);
+        }
+        return false;
+    }
+
     private long removeLastHolder(long resource, long jobSlot, byte lockMode) {
         long holder = resArenaMgr.getLastHolder(resource);
         if (holder < 0) {
@@ -848,13 +881,10 @@
      * remove the first request for a given job and lock mode from a request queue.
      * If the value of the parameter lockMode is LockMode.ANY the first request
      * for the job is removed - independent of the LockMode.
-     * 
-     * @param head
-     *            the head of the request queue
-     * @param jobSlot
-     *            the job slot
-     * @param lockMode
-     *            the lock mode
+     *
+     * @param head     the head of the request queue
+     * @param jobSlot  the job slot
+     * @param lockMode the lock mode
      * @return the slot of the first request that matched the given job
      */
     private long removeRequestFromQueueForJob(long head, long jobSlot, byte lockMode) {
@@ -947,30 +977,34 @@
     }
 
     private void assertLocksCanBefoundInJobQueue() throws ACIDException {
-        for (int i = 0; i < ResourceGroupTable.TABLE_SIZE; ++i) {
-            final ResourceGroup group = table.get(i);
-            if (group.tryLatch(100, TimeUnit.MILLISECONDS)) {
-                try {
-                    long resSlot = group.firstResourceIndex.get();
-                    while (resSlot != -1) {
-                        int dsId = resArenaMgr.getDatasetId(resSlot);
-                        int entityHashValue = resArenaMgr.getPkHashVal(resSlot);
-                        long reqSlot = resArenaMgr.getLastHolder(resSlot);
-                        while (reqSlot != -1) {
-                            byte lockMode = (byte) reqArenaMgr.getLockMode(reqSlot);
-                            long jobSlot = reqArenaMgr.getJobSlot(reqSlot);
-                            int jobId = jobArenaMgr.getJobId(jobSlot);
-                            assertLockCanBeFoundInJobQueue(dsId, entityHashValue, lockMode, jobId);
-                            reqSlot = reqArenaMgr.getNextRequest(reqSlot);
+        try {
+            for (int i = 0; i < table.size; ++i) {
+                final ResourceGroup group = table.get(i);
+                if (group.tryLatch(100, TimeUnit.MILLISECONDS)) {
+                    try {
+                        long resSlot = group.firstResourceIndex.get();
+                        while (resSlot != -1) {
+                            int dsId = resArenaMgr.getDatasetId(resSlot);
+                            int entityHashValue = resArenaMgr.getPkHashVal(resSlot);
+                            long reqSlot = resArenaMgr.getLastHolder(resSlot);
+                            while (reqSlot != -1) {
+                                byte lockMode = (byte) reqArenaMgr.getLockMode(reqSlot);
+                                long jobSlot = reqArenaMgr.getJobSlot(reqSlot);
+                                int jobId = jobArenaMgr.getJobId(jobSlot);
+                                assertLockCanBeFoundInJobQueue(dsId, entityHashValue, lockMode, jobId);
+                                reqSlot = reqArenaMgr.getNextRequest(reqSlot);
+                            }
+                            resSlot = resArenaMgr.getNext(resSlot);
                         }
-                        resSlot = resArenaMgr.getNext(resSlot);
+                    } finally {
+                        group.releaseLatch();
                     }
-                } finally {
-                    group.releaseLatch();
+                } else {
+                    LOGGER.warning("Could not check locks for " + group);
                 }
-            } else {
-                LOGGER.warning("Could not check locks for " + group);
             }
+        } catch (InterruptedException e) {
+            throw new IllegalStateException("interrupted", e);
         }
     }
 
@@ -986,7 +1020,7 @@
 
     /**
      * tries to find a lock request searching though the job queue
-     * 
+     *
      * @param dsId
      *            dataset id
      * @param entityHashValue
@@ -1021,66 +1055,30 @@
         return -1;
     }
 
-    private String resQueueToString(long resSlot) {
-        return appendResQueue(new StringBuilder(), resSlot).toString();
+    private TablePrinter getResourceTablePrinter() {
+        return new ResourceTablePrinter(table, resArenaMgr, reqArenaMgr, jobArenaMgr);
     }
 
-    private StringBuilder appendResQueue(StringBuilder sb, long resSlot) {
-        resArenaMgr.appendRecord(sb, resSlot);
-        sb.append("\n");
-        appendReqQueue(sb, resArenaMgr.getLastHolder(resSlot));
-        return sb;
+    private TablePrinter getDumpTablePrinter() {
+        return new DumpTablePrinter(table, resArenaMgr, reqArenaMgr, jobArenaMgr, jobIdSlotMap);
     }
 
-    private StringBuilder appendReqQueue(StringBuilder sb, long head) {
-        while (head != -1) {
-            reqArenaMgr.appendRecord(sb, head);
-            sb.append("\n");
-            head = reqArenaMgr.getNextRequest(head);
-        }
-        return sb;
-    }
-
-    public StringBuilder append(StringBuilder sb) {
-        table.getAllLatches();
-        try {
-            sb.append(">>dump_begin\t>>----- [resTable] -----\n");
-            table.append(sb);
-            sb.append(">>dump_end\t>>----- [resTable] -----\n");
-
-            sb.append(">>dump_begin\t>>----- [resArenaMgr] -----\n");
-            resArenaMgr.append(sb);
-            sb.append(">>dump_end\t>>----- [resArenaMgr] -----\n");
-
-            sb.append(">>dump_begin\t>>----- [reqArenaMgr] -----\n");
-            reqArenaMgr.append(sb);
-            sb.append(">>dump_end\t>>----- [reqArenaMgr] -----\n");
-
-            sb.append(">>dump_begin\t>>----- [jobIdSlotMap] -----\n");
-            for (Integer i : jobIdSlotMap.keySet()) {
-                sb.append(i).append(" : ");
-                TypeUtil.Global.append(sb, jobIdSlotMap.get(i));
-                sb.append("\n");
-            }
-            sb.append(">>dump_end\t>>----- [jobIdSlotMap] -----\n");
-
-            sb.append(">>dump_begin\t>>----- [jobArenaMgr] -----\n");
-            jobArenaMgr.append(sb);
-            sb.append(">>dump_end\t>>----- [jobArenaMgr] -----\n");
-        } finally {
-            table.releaseAllLatches();
-        }
-        return sb;
+    public String printByResource() {
+        return getResourceTablePrinter().append(new StringBuilder()).append("\n").toString();
     }
 
     public String toString() {
-        return append(new StringBuilder()).toString();
+        return printByResource();
+    }
+
+    public String dump() {
+        return getDumpTablePrinter().append(new StringBuilder()).toString();
     }
 
     @Override
     public String prettyPrint() throws ACIDException {
         StringBuilder s = new StringBuilder("\n########### LockManager Status #############\n");
-        return append(s).toString() + "\n";
+        return getDumpTablePrinter().append(s).toString() + "\n";
     }
 
     @Override
@@ -1090,7 +1088,7 @@
 
     @Override
     public void dumpState(OutputStream os) throws IOException {
-        os.write(toString().getBytes());
+        os.write(dump().getBytes());
     }
 
     @Override
@@ -1140,119 +1138,5 @@
         }
     }
 
-    private static class ResourceGroupTable {
-        public static final int TABLE_SIZE = 1024; // TODO increase?
-
-        private ResourceGroup[] table;
-
-        public ResourceGroupTable() {
-            table = new ResourceGroup[TABLE_SIZE];
-            for (int i = 0; i < TABLE_SIZE; ++i) {
-                table[i] = new ResourceGroup();
-            }
-        }
-
-        ResourceGroup get(int dId, int entityHashValue) {
-            // TODO ensure good properties of hash function
-            int h = Math.abs(dId ^ entityHashValue);
-            if (h < 0)
-                h = 0;
-            return table[h % TABLE_SIZE];
-        }
-
-        ResourceGroup get(int i) {
-            return table[i];
-        }
-
-        public void getAllLatches() {
-            for (int i = 0; i < TABLE_SIZE; ++i) {
-                table[i].getLatch();
-            }
-        }
-
-        public void releaseAllLatches() {
-            for (int i = 0; i < TABLE_SIZE; ++i) {
-                table[i].releaseLatch();
-            }
-        }
-
-        public StringBuilder append(StringBuilder sb) {
-            return append(sb, false);
-        }
-
-        public StringBuilder append(StringBuilder sb, boolean detail) {
-            for (int i = 0; i < table.length; ++i) {
-                sb.append(i).append(" : ");
-                if (detail) {
-                    sb.append(table[i]);
-                } else {
-                    sb.append(table[i].firstResourceIndex);
-                }
-                sb.append('\n');
-            }
-            return sb;
-        }
-    }
-
-    private static class ResourceGroup {
-        private ReentrantReadWriteLock latch;
-        private Condition condition;
-        AtomicLong firstResourceIndex;
-
-        ResourceGroup() {
-            latch = new ReentrantReadWriteLock();
-            condition = latch.writeLock().newCondition();
-            firstResourceIndex = new AtomicLong(-1);
-        }
-
-        void getLatch() {
-            log("latch");
-            latch.writeLock().lock();
-        }
-
-        boolean tryLatch(long timeout, TimeUnit unit) throws ACIDException {
-            log("tryLatch");
-            try {
-                return latch.writeLock().tryLock(timeout, unit);
-            } catch (InterruptedException e) {
-                LOGGER.finer("interrupted while wating on ResourceGroup");
-                throw new ACIDException("interrupted", e);
-            }
-        }
-
-        void releaseLatch() {
-            log("release");
-            latch.writeLock().unlock();
-        }
-
-        boolean hasWaiters() {
-            return latch.hasQueuedThreads();
-        }
-
-        void await(ITransactionContext txnContext) throws ACIDException {
-            log("wait for");
-            try {
-                condition.await();
-            } catch (InterruptedException e) {
-                LOGGER.finer("interrupted while wating on ResourceGroup");
-                throw new ACIDException(txnContext, "interrupted", e);
-            }
-        }
-
-        void wakeUp() {
-            log("notify");
-            condition.signalAll();
-        }
-
-        void log(String s) {
-            if (LOGGER.isLoggable(LVL)) {
-                LOGGER.log(LVL, s + " " + toString());
-            }
-        }
-
-        public String toString() {
-            return "{ id : " + hashCode() + ", first : " + TypeUtil.Global.toString(firstResourceIndex.get())
-                    + ", waiters : " + (hasWaiters() ? "true" : "false") + " }";
-        }
-    }
 }
+
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java
new file mode 100644
index 0000000..ffdb151
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.service.locking;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DumpTablePrinter implements TablePrinter {
+    private ResourceGroupTable table;
+    private ResourceArenaManager resArenaMgr;
+    private RequestArenaManager reqArenaMgr;
+    private JobArenaManager jobArenaMgr;
+    private ConcurrentHashMap<Integer, Long> jobIdSlotMap;
+
+    DumpTablePrinter(ResourceGroupTable table,
+                     ResourceArenaManager resArenaMgr,
+                     RequestArenaManager reqArenaMgr,
+                     JobArenaManager jobArenaMgr,
+                     ConcurrentHashMap<Integer, Long> jobIdSlotMap) {
+        this.table = table;
+        this.resArenaMgr = resArenaMgr;
+        this.reqArenaMgr = reqArenaMgr;
+        this.jobArenaMgr = jobArenaMgr;
+        this.jobIdSlotMap = jobIdSlotMap;
+    }
+
+    public StringBuilder append(StringBuilder sb) {
+        table.getAllLatches();
+        try {
+            sb.append(">>dump_begin\t>>----- [resTable] -----\n");
+            table.append(sb);
+            sb.append(">>dump_end\t>>----- [resTable] -----\n");
+
+            sb.append(">>dump_begin\t>>----- [resArenaMgr] -----\n");
+            resArenaMgr.append(sb);
+            sb.append(">>dump_end\t>>----- [resArenaMgr] -----\n");
+
+            sb.append(">>dump_begin\t>>----- [reqArenaMgr] -----\n");
+            reqArenaMgr.append(sb);
+            sb.append(">>dump_end\t>>----- [reqArenaMgr] -----\n");
+
+            sb.append(">>dump_begin\t>>----- [jobIdSlotMap] -----\n");
+            for (Integer i : jobIdSlotMap.keySet()) {
+                sb.append(i).append(" : ");
+                TypeUtil.Global.append(sb, jobIdSlotMap.get(i));
+                sb.append("\n");
+            }
+            sb.append(">>dump_end\t>>----- [jobIdSlotMap] -----\n");
+
+            sb.append(">>dump_begin\t>>----- [jobArenaMgr] -----\n");
+            jobArenaMgr.append(sb);
+            sb.append(">>dump_end\t>>----- [jobArenaMgr] -----\n");
+        } finally {
+            table.releaseAllLatches();
+        }
+        return sb;
+    }
+
+    String resQueueToString(long resSlot) {
+        return appendResQueue(new StringBuilder(), resSlot).toString();
+    }
+
+    StringBuilder appendResQueue(StringBuilder sb, long resSlot) {
+        resArenaMgr.appendRecord(sb, resSlot);
+        sb.append("\n");
+        appendReqQueue(sb, resArenaMgr.getLastHolder(resSlot));
+        return sb;
+    }
+
+    StringBuilder appendReqQueue(StringBuilder sb, long head) {
+        while (head != -1) {
+            reqArenaMgr.appendRecord(sb, head);
+            sb.append("\n");
+            head = reqArenaMgr.getNextRequest(head);
+        }
+        return sb;
+    }
+}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index aaa96bb..1dbf16b 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -392,7 +392,6 @@
 class LockRequestWorker implements Runnable {
 
     String threadName;
-    TransactionSubsystem txnProvider;
     ILockManager lockMgr;
     WorkerReadyQueue workerReadyQueue;
     LockRequest lockRequest;
@@ -401,7 +400,6 @@
     boolean isDone;
 
     public LockRequestWorker(TransactionSubsystem txnProvider, WorkerReadyQueue workerReadyQueue, String threadName) {
-        this.txnProvider = txnProvider;
         this.lockMgr = txnProvider.getLockManager();
         this.workerReadyQueue = workerReadyQueue;
         this.threadName = new String(threadName);
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroup.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroup.java
new file mode 100644
index 0000000..bec4e53
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroup.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.locking;
+
+import org.apache.asterix.common.transactions.ITransactionContext;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A ResourceGroup represents a group of resources that are manged by a ConcurrentLockManager.
+ * All resources in a group share a common latch. I.e. all modifications of lock requests for any resource in a group
+ * are protected by the same latch.
+ *
+ * @see ConcurrentLockManager
+ */
+class ResourceGroup {
+    private ReentrantReadWriteLock latch;
+    private Condition condition;
+    AtomicLong firstResourceIndex;
+
+    ResourceGroup() {
+        latch = new ReentrantReadWriteLock();
+        condition = latch.writeLock().newCondition();
+        firstResourceIndex = new AtomicLong(-1);
+    }
+
+    void getLatch() {
+        log("latch");
+        latch.writeLock().lock();
+    }
+
+    boolean tryLatch(long timeout, TimeUnit unit) throws InterruptedException {
+        log("tryLatch");
+        try {
+            return latch.writeLock().tryLock(timeout, unit);
+        } catch (InterruptedException e) {
+            ConcurrentLockManager.LOGGER.finer("interrupted while wating on ResourceGroup");
+            throw e;
+        }
+    }
+
+    void releaseLatch() {
+        log("release");
+        latch.writeLock().unlock();
+    }
+
+    boolean hasWaiters() {
+        return latch.hasQueuedThreads();
+    }
+
+    void await(ITransactionContext txnContext) throws InterruptedException {
+        log("wait for");
+        try {
+            condition.await();
+        } catch (InterruptedException e) {
+            ConcurrentLockManager.LOGGER.finer("interrupted while waiting on ResourceGroup");
+            throw e;
+        }
+    }
+
+    void wakeUp() {
+        log("notify");
+        condition.signalAll();
+    }
+
+    void log(String s) {
+        if (ConcurrentLockManager.LOGGER.isLoggable(ConcurrentLockManager.LVL)) {
+            ConcurrentLockManager.LOGGER.log(ConcurrentLockManager.LVL, s + " " + toString());
+        }
+    }
+
+    public String toString() {
+        return "{ id : " + hashCode() + ", first : " + TypeUtil.Global.toString(firstResourceIndex.get()) + ", " +
+                "waiters : " + (hasWaiters() ? "true" : "false") + " }";
+    }
+}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroupTable.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroupTable.java
new file mode 100644
index 0000000..213ccd9
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroupTable.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.locking;
+
+/**
+ * A hash table for ResourceGroups. As each ResourceGroup has a latch that protects the modifications for resources in
+ * that group, the size of a ResourceGroupTable determines the maximal number of lock requests that can concurrently
+ * be served by a ConcurrentLockManager.
+ *
+ * @see ResourceGroup
+ * @see ConcurrentLockManager
+ */
+
+class ResourceGroupTable {
+    public final int size;
+
+    private ResourceGroup[] table;
+
+    public ResourceGroupTable(int size) {
+        this.size = size;
+        table = new ResourceGroup[size];
+        for (int i = 0; i < size; ++i) {
+            table[i] = new ResourceGroup();
+        }
+    }
+
+    ResourceGroup get(int dId, int entityHashValue) {
+        // TODO ensure good properties of hash function
+        int h = Math.abs(dId ^ entityHashValue);
+        if (h < 0) h = 0;
+        return table[h % size];
+    }
+
+    ResourceGroup get(int i) {
+        return table[i];
+    }
+
+    public void getAllLatches() {
+        for (int i = 0; i < size; ++i) {
+            table[i].getLatch();
+        }
+    }
+
+    public void releaseAllLatches() {
+        for (int i = 0; i < size; ++i) {
+            table[i].releaseLatch();
+        }
+    }
+
+    public StringBuilder append(StringBuilder sb) {
+        return append(sb, false);
+    }
+
+    public StringBuilder append(StringBuilder sb, boolean detail) {
+        for (int i = 0; i < table.length; ++i) {
+            sb.append(i).append(" : ");
+            if (detail) {
+                sb.append(table[i]);
+            } else {
+                sb.append(table[i].firstResourceIndex);
+            }
+            sb.append('\n');
+        }
+        return sb;
+    }
+}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java
new file mode 100644
index 0000000..90c1f69
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.service.locking;
+
+import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
+
+import static org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+
+/**
+ * Creates a JSON serialization of the lock table of the ConcurrentLockManager organized by resource. I.e. the
+ * serialization will contain all resources for which lock request are recorded in the table - along with a list of
+ * the requests for each resource.
+ *
+ * @see ConcurrentLockManager
+ */
+public class ResourceTablePrinter implements TablePrinter {
+    private ResourceGroupTable table;
+    private ResourceArenaManager resArenaMgr;
+    private RequestArenaManager reqArenaMgr;
+    private JobArenaManager jobArenaMgr;
+
+    ResourceTablePrinter(ResourceGroupTable table,
+                         ResourceArenaManager resArenaMgr,
+                         RequestArenaManager reqArenaMgr,
+                         JobArenaManager jobArenaMgr) {
+        this.table = table;
+        this.resArenaMgr = resArenaMgr;
+        this.reqArenaMgr = reqArenaMgr;
+        this.jobArenaMgr = jobArenaMgr;
+    }
+
+    public StringBuilder append(StringBuilder sb) {
+        table.getAllLatches();
+        sb.append("[\n");
+        int i = 0;
+        long res = -1;
+        while (res == -1 && i < table.size) {
+            res = table.get(i++).firstResourceIndex.get();
+        }
+        while (i < table.size) {
+            sb = appendResource(sb, res);
+            res = resArenaMgr.getNext(res);
+            while (res == -1 && i < table.size) {
+                res = table.get(i++).firstResourceIndex.get();
+            }
+            if (res == -1) {
+                sb.append("\n");
+                break;
+            } else {
+                sb.append(",\n");
+            }
+        }
+        table.releaseAllLatches();
+        return sb.append("]");
+    }
+
+    StringBuilder appendResource(StringBuilder sb, long res) {
+        sb.append("{ \"dataset\": ").append(resArenaMgr.getDatasetId(res));
+        sb.append(", \"hash\": ").append(resArenaMgr.getPkHashVal(res));
+        sb.append(", \"max mode\": ").append(string(resArenaMgr.getMaxMode(res)));
+        long lastHolder = resArenaMgr.getLastHolder(res);
+        if (lastHolder != -1) {
+            sb = appendRequests(sb.append(", \"holders\": "), lastHolder);
+        }
+        long firstUpgrader = resArenaMgr.getFirstUpgrader(res);
+        if (firstUpgrader != -1) {
+            sb = appendRequests(sb.append(", \"upgraders\": "), firstUpgrader);
+        }
+        long firstWaiter = resArenaMgr.getFirstWaiter(res);
+        if (firstWaiter != -1) {
+            sb = appendRequests(sb.append(", \"waiters\": "), firstWaiter);
+        }
+        return sb.append(" }");
+    }
+
+    StringBuilder appendRequests(StringBuilder sb, long req) {
+        sb.append("[ ");
+        while (req != -1) {
+            appendRequest(sb, req);
+            req = reqArenaMgr.getNextRequest(req);
+            sb.append(req == -1 ? " ]" : ", ");
+        }
+        return sb;
+    }
+
+    StringBuilder appendRequest(StringBuilder sb, long req) {
+        long job = reqArenaMgr.getJobSlot(req);
+        sb.append("{ \"job\": ").append(jobArenaMgr.getJobId(job));
+        sb.append(", \"mode\": \"").append(string(reqArenaMgr.getLockMode(req)));
+        return sb.append("\" }");
+    }
+
+    private static final String string(int lockMode) {
+        return LockMode.toString((byte) lockMode);
+    }
+}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TablePrinter.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TablePrinter.java
new file mode 100644
index 0000000..2b4260b
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TablePrinter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.service.locking;
+
+public interface TablePrinter {
+    StringBuilder append(StringBuilder sb);
+}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/WaitInterruptedException.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/WaitInterruptedException.java
new file mode 100644
index 0000000..8171f77
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/WaitInterruptedException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.locking;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ITransactionContext;
+
+public class WaitInterruptedException extends ACIDException {
+    public WaitInterruptedException(ITransactionContext txnContext, String message, Throwable cause) {
+        super(txnContext, message, cause);
+    }
+}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index d371e94..6650ac6 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -54,7 +54,7 @@
         this.id = id;
         this.txnProperties = txnProperties;
         this.transactionManager = new TransactionManager(this);
-        this.lockManager = new ConcurrentLockManager(this);
+        this.lockManager = new ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer());
 
         AsterixReplicationProperties asterixReplicationProperties = null;
         if (asterixAppRuntimeContextProvider != null) {
@@ -108,4 +108,4 @@
         return id;
     }
 
-}
\ No newline at end of file
+}
diff --git a/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
new file mode 100644
index 0000000..5cabd04
--- /dev/null
+++ b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
@@ -0,0 +1,398 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.asterix.transaction.management.service.locking;
+
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.JobId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Logger;
+
+import static org.apache.asterix.transaction.management.service.locking.Request.Kind;
+import static org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class LockManagerUnitTest {
+
+    public static int LOCK_MGR_SHRINK_TIMER = 5000;
+    public static int LOCK_MGR_ARENAS = 2;
+    public static int LOCK_MGR_TABLE_SIZE = 10;
+
+    static int INITIAL_TIMESTAMP = 0;
+    static long COORDINATOR_SLEEP = 20;
+    static int TIMEOUT_MS = 100;
+
+    static {
+        Logger.getLogger(ConcurrentLockManager.class.getName()).addHandler(new ConsoleHandler());
+    }
+
+    Map<Integer, ITransactionContext> jobMap;
+    ILockManager lockMgr;
+
+    // set to e.g. System.err to get some output
+    PrintStream out = System.out;
+    PrintStream err = null; //System.err;
+
+    //--------------------------------------------------------------------
+    // JUnit methods
+    //--------------------------------------------------------------------
+
+    @Before
+    public void setUp() throws Exception {
+        jobMap = new HashMap<>();
+        lockMgr = new ConcurrentLockManager(LOCK_MGR_SHRINK_TIMER, LOCK_MGR_ARENAS, LOCK_MGR_TABLE_SIZE);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        lockMgr = null;
+        jobMap = null;
+    }
+
+    @Test
+    public void testSimpleSharedUnlock() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.UNLOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.UNLOCK, j(1), d(1), e(-1), LockMode.IS));
+        reportErrors(execute(reqs));
+    }
+
+    @Test
+    public void testSimpleSharedRelease() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.RELEASE, j(1)));
+        reportErrors(execute(reqs));
+    }
+
+    @Test
+    public void testReacquire() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.RELEASE, j(1)));
+        reportErrors(execute(reqs));
+    }
+
+    @Test
+    public void testInstant() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.INSTANT_LOCK, j(1), d(1), e(1), LockMode.X));
+        reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X));
+        reqs.add(req(Kind.PRINT));
+        reqs.add(req(Kind.INSTANT_LOCK, j(3), d(1), e(1), LockMode.S));
+        expectError(execute(reqs), j(3), WaitInterruptedException.class);
+    }
+
+    @Test
+    public void testTry() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.TRY_LOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.PRINT));
+        reqs.add(req(Kind.TRY_LOCK, j(3), d(1), e(1), LockMode.X));
+        reportErrors(execute(reqs));
+    }
+
+    @Test
+    public void testInstantTry() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.INSTANT_LOCK, j(1), d(1), e(1), LockMode.X));
+        reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X));
+        reqs.add(req(Kind.PRINT));
+        reqs.add(req(Kind.INSTANT_TRY_LOCK, j(3), d(1), e(1), LockMode.S));
+        reportErrors(execute(reqs));
+    }
+
+    @Test
+    public void testDeadlock() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X));
+        reqs.add(req(Kind.LOCK, j(2), d(1), e(2), LockMode.X));
+        reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X));
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(2), LockMode.X));
+        reqs.add(req(Kind.RELEASE, j(1)));
+        reqs.add(req(Kind.RELEASE, j(2)));
+        expectError(execute(reqs), j(1), ACIDException.class);
+    }
+
+    @Test
+    public void testUpgrade() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X));
+        reqs.add(req(Kind.RELEASE, j(1)));
+        reportErrors(execute(reqs));
+    }
+
+    @Test
+    public void testUpgradeDeadlock() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X));
+        reqs.add(req(Kind.PRINT));
+        reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X));
+        reqs.add(req(Kind.RELEASE, j(1)));
+        reqs.add(req(Kind.RELEASE, j(2)));
+        expectError(execute(reqs), j(2), ACIDException.class);
+    }
+
+    @Test
+    /**
+     * Runs into a time-out and j(1) gets interrupted by
+     * the test. This scenario happens only in this test as there
+     * is additional synchronization between the locking threads
+     * through the coordinator.
+     */
+    public void testTimeout() throws Exception {
+        List<Request> reqs = new ArrayList<>();
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.S));
+        reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X));
+        reqs.add(req(Kind.RELEASE, j(1)));
+        reqs.add(req(Kind.RELEASE, j(2)));
+        // this runs into a time-out and j(1) gets interrupted
+        expectError(execute(reqs), j(1), WaitInterruptedException.class);
+    }
+
+    //--------------------------------------------------------------------
+    // Helper methods
+    //--------------------------------------------------------------------
+
+    /**
+     * Executes a list of requests where
+     * a) each job runs in a different thread and
+     * b) the threads/jobs are synchronized
+     * The synchronization ensures that the requests are submitted to the
+     * LockManager in list order, however they are fulfilled in the order
+     * decided by the LockManager
+     *
+     * @param reqs a list of requests that will be execute in order
+     * @return a map of (JodId, exception) pairs that can either be handled
+     * by the test or thrown using #reportErrors
+     */
+    private Map<String, Throwable> execute(List<Request> reqs) throws InterruptedException {
+        if (err != null) {
+            err.println("*** start ***");
+        }
+        final AtomicInteger timeStamp = new AtomicInteger(INITIAL_TIMESTAMP);
+        Set<Locker> lockers = createLockers(reqs, timeStamp);
+        Map<String, Thread> threads = startThreads(lockers);
+
+        int coordinatorTime = timeStamp.get();
+        while (active(lockers)) {
+            if (err != null) {
+                err.println("coordinatorTime = " + coordinatorTime);
+            }
+            if (coordinatorTime == timeStamp.get()) {
+                Thread.sleep(COORDINATOR_SLEEP);
+                if (coordinatorTime == timeStamp.get()) {
+                    Locker timedOut = timedOut(lockers);
+                    if (timedOut != null) {
+                        if (err != null) {
+                            err.println(timedOut.name + " timed out");
+                        }
+                        break;
+                    }
+                }
+            }
+            coordinatorTime = timeStamp.get();
+        }
+        Map<String, Throwable> result = stopThreads(lockers, threads);
+        return result;
+    }
+
+    private boolean active(Set<Locker> lockers) {
+        for (Locker locker : lockers) {
+            if (locker.active()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private Locker timedOut(Set<Locker> lockers) {
+        for (Locker locker : lockers) {
+            if (locker.timedOut()) {
+                return locker;
+            }
+        }
+        return null;
+    }
+
+    private Set<Locker> createLockers(List<Request> reqs, AtomicInteger timeStamp) {
+        Set<Locker> lockers = new HashSet<>();
+        lockers.add(new Locker(lockMgr, null, reqs, timeStamp, err));
+        for (ITransactionContext txnCtx : jobMap.values()) {
+            Locker locker = new Locker(lockMgr, txnCtx, reqs, timeStamp, err);
+            lockers.add(locker);
+        }
+        return lockers;
+    }
+
+    private Map<String, Thread> startThreads(Set<Locker> lockers) {
+        Map<String, Thread> threads = new HashMap<>(lockers.size());
+        for (Locker locker : lockers) {
+            Thread t = new Thread(locker, locker.name);
+            threads.put(locker.name, t);
+            t.start();
+        }
+        return threads;
+    }
+
+    private Map<String, Throwable> stopThreads(Set<Locker> lockers, Map<String, Thread> threads) throws
+            InterruptedException {
+        Map<String, Throwable> result = new HashMap<>();
+        for (Locker locker : lockers) {
+            stopThread(threads.get(locker.name));
+            List<Throwable> errors = locker.getErrors();
+            if (errors != null) {
+                errors.forEach(error -> result.put(locker.name, error));
+            }
+        }
+        return result;
+    }
+
+    private void stopThread(Thread t) throws InterruptedException {
+        if (err != null) {
+            err.println("stopping " + t.getName() + " " + t.getState());
+        }
+        boolean done = false;
+        while (!done) {
+            switch (t.getState()) {
+                case NEW:
+                case RUNNABLE:
+                case TERMINATED:
+                    done = true;
+                    break;
+                default:
+                    if (err != null) {
+                        err.println("interrupting " + t.getName());
+                    }
+                    t.interrupt();
+            }
+        }
+        if (err != null) {
+            err.println("joining " + t.getName());
+        }
+        t.join();
+    }
+
+    /**
+     * throws the first Throwable found in the map.
+     * This is the default way to handle the errors returned by #execute
+     *
+     * @param errors a map of (JodId, exception) pairs
+     */
+    void reportErrors(Map<String, Throwable> errors) {
+        for (String name : errors.keySet()) {
+            throw new AssertionError("job " + name + " caught something", errors.get(name));
+        }
+        out.println("no errors");
+    }
+
+    void printErrors(Map<String, Throwable> errors) {
+        errors.keySet().forEach(name -> out.println("Thread " + name + " caught " + errors.get(name)));
+    }
+
+    /**
+     * gets the error for a specific job from the errors map
+     *
+     * @param errors a map of (JodId, throwable) pairs
+     * @param txnCtx the transaction context of the job whose error is requested
+     * @return throwable for said error
+     */
+    private static Throwable getError(Map<String, Throwable> errors, ITransactionContext txnCtx) {
+        return errors.get(txnCtx.getJobId().toString());
+    }
+
+    /**
+     * asserts that the error for a specific job from the errors map is of a specific class
+     *
+     * @param errors a map of (JodId, throwable) pairs
+     * @param txnCtx the transaction context of the job whose error is requested
+     * @param clazz  the exception class
+     */
+    private void expectError(Map<String, Throwable> errors, ITransactionContext txnCtx,
+                             Class<? extends Throwable> clazz) throws Exception {
+        Throwable error = getError(errors, txnCtx);
+        if (error == null) {
+            throw new AssertionError("expected " + clazz.getSimpleName() + " for " + txnCtx.getJobId() + ", got no " +
+                    "exception");
+        }
+        if (!clazz.isInstance(error)) {
+            throw new AssertionError(error);
+        }
+        out.println("caught expected " + error);
+    }
+
+    //--------------------------------------------------------------------
+    // Convenience methods to make test description more compact
+    //--------------------------------------------------------------------
+
+    private Request req(final Kind kind, final ITransactionContext txnCtx,
+                        final DatasetId dsId, final int hashValue, final byte lockMode) {
+        return Request.create(kind, txnCtx, dsId, hashValue, lockMode);
+    }
+
+    private Request req(final Kind kind, final ITransactionContext txnCtx) {
+        return Request.create(kind, txnCtx);
+    }
+
+    private Request req(final Kind kind) {
+        return Request.create(kind, out);
+    }
+
+    private static DatasetId d(int id) {
+        return new DatasetId(id);
+    }
+
+    private static int e(int i) {
+        return i;
+    }
+
+    private ITransactionContext j(int jId) {
+        if (!jobMap.containsKey(jId)) {
+            ITransactionContext mockTxnContext = mock(ITransactionContext.class);
+            when(mockTxnContext.getJobId()).thenReturn(new JobId(jId));
+            jobMap.put(jId, mockTxnContext);
+        }
+        return jobMap.get(jId);
+    }
+
+}
diff --git a/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java
new file mode 100644
index 0000000..164fc07
--- /dev/null
+++ b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java
@@ -0,0 +1,187 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.transaction.management.service.locking;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.junit.Assert;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Executes a sequence of lock requests against an ILockManager.
+ * Lockers are run by different threads in the LockManagerUnitTest.
+ *
+ * @see ILockManager
+ * @see LockManagerUnitTest
+ */
+class Locker implements Runnable {
+
+    public String name;
+
+    private ILockManager lockMgr;
+
+    private List<Requester> requests;
+    private Iterator<Requester> reqIter;
+    private volatile Requester curReq;
+    private int reqStart;
+
+    private AtomicInteger globalTime;
+    private List<Throwable> errors;
+
+    private PrintStream err;
+
+    /**
+     * @param lockMgr the ILockManager to send requests to
+     * @param txnCtx the ITransactionContext that identifies the transaction that this Locker represents
+     * @param allRequests an ordered list of lock requests for multiple transactions, this Locker will only execute
+     *                    requests for the transaction identified by txnCtx
+     * @param time a global timestamp that is used to synchronize different lockers to ensure that requests are started
+     *             in the order given in allRequests
+     * @param err a stream to write log/error information to
+     *
+     * @see Request
+     */
+    Locker(ILockManager lockMgr, ITransactionContext txnCtx, List<Request> allRequests, AtomicInteger time,
+           PrintStream err) {
+        this.name = txnCtx == null ? "admin" : txnCtx.getJobId().toString();
+        this.lockMgr = lockMgr;
+
+        this.requests = new LinkedList<>();
+        for (int pos = 0; pos < allRequests.size(); ++pos) {
+            Request req = allRequests.get(pos);
+            if (req.txnCtx == txnCtx) {
+                requests.add(new Requester(pos, req));
+            }
+        }
+        this.reqIter = requests.iterator();
+        this.globalTime = time;
+        this.err = err;
+    }
+
+    private boolean hasErrors() {
+        return errors != null && errors.size() > 0;
+    }
+
+    synchronized List<Throwable> getErrors() {
+        return errors;
+    }
+
+    private synchronized void addError(Throwable error) {
+        log("caught " + error);
+        if (this.errors == null) {
+            this.errors = Collections.synchronizedList(new ArrayList<Throwable>());
+        }
+        this.errors.add(error);
+    }
+
+    public synchronized boolean active() {
+        return !hasErrors() && (reqIter.hasNext() || curReq != null);
+    }
+
+    public synchronized boolean timedOut() {
+        return reqStart > 0 && (currentTime() - reqStart) > LockManagerUnitTest.TIMEOUT_MS;
+    }
+
+    @Override
+    public void run() {
+        log("running");
+        try {
+            while (! hasErrors() && reqIter.hasNext()) {
+                curReq = reqIter.next();
+                int localTime = globalTime.get();
+                while (localTime < curReq.time) {
+                    Thread.sleep(10);
+                    localTime = globalTime.get();
+                }
+                if (localTime != curReq.time) {
+                    throw new AssertionError("missed time for request " + curReq);
+                }
+                log("will exec at t=" + localTime + " " + curReq);
+                try {
+                    reqStart = currentTime();
+                    Assert.assertEquals(localTime, globalTime.getAndIncrement());
+                    log("incremented");
+                    curReq.setResult(curReq.request.execute(lockMgr) ? Requester.SUCCESS : Requester.FAIL);
+                } catch (ACIDException e) {
+                    curReq.setResult(Requester.ERROR);
+                    addError(e);
+                } finally {
+                    reqStart = -1;
+                }
+                log("time " + localTime);
+            }
+            curReq = null;
+        } catch (InterruptedException ie) {
+            log("got interrupted");
+        } catch (Throwable e) {
+            if (err != null) {
+                e.printStackTrace(err);
+            }
+            addError(e);
+        }
+        log("done");
+    }
+
+    private void log(String msg) {
+        if (err != null) {
+            err.println(Thread.currentThread().getName() + " " + msg);
+        }
+    }
+
+    private static int currentTime() {
+        return ((int) System.currentTimeMillis()) & 0x7fffffff;
+    }
+
+    public String toString() {
+        return "[" + name + "]" + curReq;
+    }
+}
+
+class Requester {
+
+    public static byte NONE = -1;
+    public static byte FAIL = 0;
+    public static byte SUCCESS = 1;
+    public static byte ERROR = 2;
+
+    int time;
+    Request request;
+    byte result = NONE;
+
+    Requester(int time, Request request) {
+        this.time = time;
+        this.request = request;
+    }
+
+    void setResult(byte res) {
+        result = res;
+    }
+
+    public String toString() {
+        return request.toString() + " t=" + time;
+    }
+}
diff --git a/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
new file mode 100644
index 0000000..c7e0c42
--- /dev/null
+++ b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
@@ -0,0 +1,166 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.transaction.management.service.locking;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
+
+import java.io.PrintStream;
+
+/**
+ * repesents a lock request for testing.
+ */
+abstract class Request {
+    /** the kind of a request */
+    enum Kind {
+        /** requests an instant-try-lock */
+        INSTANT_TRY_LOCK,
+        /** requests an instant-lock */
+        INSTANT_LOCK,
+        /** requests a lock */
+        LOCK,
+        /** prints a JSON representation of the lock table by entity */
+        PRINT,
+        /** releases all locks */
+        RELEASE,
+        /** requests a try-lock */
+        TRY_LOCK,
+        /** unlocks a lock */
+        UNLOCK
+    }
+
+    Kind kind;
+    ITransactionContext txnCtx;
+
+    Request(Kind kind, ITransactionContext txnCtx) {
+        this.kind = kind;
+        this.txnCtx = txnCtx;
+    }
+
+    String asString(final Kind kind, final ITransactionContext txnCtx,
+                    final DatasetId dsId, final int hashValue, final byte lockMode) {
+        return txnCtx.getJobId().toString() + ":" + kind.name() + ":" + dsId.getId() + ":" + hashValue + ":"
+                + TransactionManagementConstants.LockManagerConstants.LockMode.toString(lockMode);
+    }
+
+    abstract boolean execute(ILockManager lockMgr) throws ACIDException;
+
+    static Request create(final Kind kind, final ITransactionContext txnCtx,
+                          final DatasetId dsId, final int hashValue, final byte lockMode) {
+        switch (kind) {
+            case INSTANT_TRY_LOCK:
+                return new Request(kind, txnCtx) {
+                    boolean execute(ILockManager lockMgr) throws ACIDException {
+                        return lockMgr.instantTryLock(dsId, hashValue, lockMode, txnCtx);
+                    }
+
+                    public String toString() {
+                        return asString(kind, txnCtx, dsId, hashValue, lockMode);
+                    }
+                };
+            case INSTANT_LOCK:
+                return new Request(kind, txnCtx) {
+                    boolean execute(ILockManager lockMgr) throws ACIDException {
+                        lockMgr.instantLock(dsId, hashValue, lockMode, txnCtx);
+                        return true;
+                    }
+
+                    public String toString() {
+                        return asString(kind, txnCtx, dsId, hashValue, lockMode);
+                    }
+                };
+            case LOCK:
+                return new Request(kind, txnCtx) {
+                    boolean execute(ILockManager lockMgr) throws ACIDException {
+                        lockMgr.lock(dsId, hashValue, lockMode, txnCtx);
+                        return true;
+                    }
+
+                    public String toString() {
+                        return asString(kind, txnCtx, dsId, hashValue, lockMode);
+                    }
+                };
+            case TRY_LOCK:
+                return new Request(kind, txnCtx) {
+                    boolean execute(ILockManager lockMgr) throws ACIDException {
+                        return lockMgr.tryLock(dsId, hashValue, lockMode, txnCtx);
+                    }
+
+                    public String toString() {
+                        return asString(kind, txnCtx, dsId, hashValue, lockMode);
+                    }
+                };
+            case UNLOCK:
+                return new Request(kind, txnCtx) {
+                    boolean execute(ILockManager lockMgr) throws ACIDException {
+                        lockMgr.unlock(dsId, hashValue, lockMode, txnCtx);
+                        return true;
+                    }
+
+                    public String toString() {
+                        return asString(kind, txnCtx, dsId, hashValue, lockMode);
+                    }
+                };
+            default:
+        }
+        throw new AssertionError("Illegal Request Kind " + kind);
+    }
+
+    static Request create(final Kind kind, final ITransactionContext txnCtx) {
+        if (kind == Kind.RELEASE) {
+            return new Request(kind, txnCtx) {
+                boolean execute(ILockManager lockMgr) throws ACIDException {
+                    lockMgr.releaseLocks(txnCtx);
+                    return true;
+                }
+
+                public String toString() {
+                    return txnCtx.getJobId().toString() + ":" + kind.name();
+                }
+            };
+        }
+        throw new AssertionError("Illegal Request Kind " + kind);
+    }
+
+    static Request create(final Kind kind, final PrintStream out) {
+        if (kind == Kind.PRINT) {
+            return new Request(kind, null) {
+                boolean execute(ILockManager lockMgr) throws ACIDException {
+                    if (out == null) {
+                        return false;
+                    }
+                    if (!(lockMgr instanceof ConcurrentLockManager)) {
+                        out.print("cannot print");
+                        return false;
+                    }
+                    out.print(((ConcurrentLockManager) lockMgr).printByResource());
+                    return true;
+                }
+
+                public String toString() {
+                    return kind.name();
+                }
+            };
+        }
+        throw new AssertionError("Illegal Request Kind " + kind);
+    }
+}