Merge branch 'master' into westmann/locks
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
index 79221c1..13a80f1 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
@@ -67,7 +67,7 @@
     
     String recordManagerTemplate = "RecordManager.java"; 
     String arenaManagerTemplate = "ArenaManager.java";
-    String[] supportTemplates = { "Stats.java", "AllocInfo.java", "TypeUtil.java" };
+    String[] supportTemplates = { "RecordManagerStats.java", "AllocInfo.java", "TypeUtil.java" };
     
     private Map<String, RecordType> typeMap;
 
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
index 4abb3af..6df032e 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
@@ -114,7 +114,7 @@
         return append(new StringBuilder()).toString();
     }
 
-    public Stats addTo(Stats s) {
+    public RecordManagerStats addTo(RecordManagerStats s) {
         s.arenas += noArenas;
         for (int i = 0; i < noArenas; ++i) {
             arenas[i].addTo(s);
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
index 9afc673..39a344e 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
@@ -206,7 +206,7 @@
         return append(new StringBuilder()).toString();
     }
 
-    public Stats addTo(Stats s) {
+    public RecordManagerStats addTo(RecordManagerStats s) {
         final int size = buffers.size();
         s.buffers += size;
         s.slots += size * NO_SLOTS;
@@ -302,7 +302,7 @@
             return append(new StringBuilder()).toString();
         }
         
-        public void addTo(Stats s) {
+        public void addTo(RecordManagerStats s) {
             if (isInitialized()) {
                 s.items += occupiedSlots;
             }
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/Stats.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManagerStats.java
similarity index 93%
rename from asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/Stats.java
rename to asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManagerStats.java
index c136101..5842ed6 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/Stats.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManagerStats.java
@@ -1,6 +1,6 @@
 package @PACKAGE@;
 
-public class Stats {
+public class RecordManagerStats {
     int arenas  = 0;
     int buffers = 0;
     int slots   = 0;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index 1f65f7f..d3e1b3b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -73,7 +73,7 @@
 public class MetadataManager implements IMetadataManager {
     private static final int INITIAL_SLEEP_TIME = 64;
     private static final int RETRY_MULTIPLIER = 4;
-    private static final int MAX_RETRY_COUNT = 6;
+    private static final int MAX_RETRY_COUNT = 10;
     // Set in init().
     public static MetadataManager INSTANCE;
     private final MetadataCache cache = new MetadataCache();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 1096da3..6bb6580 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -347,6 +347,7 @@
         ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
                 .getDatasetId().getId()) : new BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager,
                 index.getDatasetId().getId());
+        final String path = file.getFile().getPath();
         if (create) {
             lsmBtree = LSMBTreeUtils.createLSMTree(
                     virtualBufferCaches,
@@ -369,11 +370,11 @@
             ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
                     localResourceMetadata, LocalResource.LSMBTreeResource);
             ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
-            localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, file.getFile()
-                    .getPath(), 0));
+            localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, path, 0));
             indexLifecycleManager.register(resourceID, lsmBtree);
         } else {
-            resourceID = localResourceRepository.getResourceByName(file.getFile().getPath()).getResourceId();
+            final LocalResource resource = localResourceRepository.getResourceByName(path);
+            resourceID = resource.getResourceId();
             lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resourceID);
             if (lsmBtree == null) {
                 lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider,
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
index 9957edf..8de7ca5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
@@ -36,9 +36,8 @@
 
     @Override
     public boolean proceed(ITupleReference tuple) throws HyracksDataException {
-        int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
         try {
-            return lockManager.tryLock(datasetId, pkHash, LockMode.S, txnCtx);
+            return lockManager.tryLock(datasetId, -1, LockMode.S, txnCtx);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
@@ -46,9 +45,8 @@
 
     @Override
     public void reconcile(ITupleReference tuple) throws HyracksDataException {
-        int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
         try {
-            lockManager.lock(datasetId, pkHash, LockMode.S, txnCtx);
+            lockManager.lock(datasetId, -1, LockMode.S, txnCtx);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
@@ -56,12 +54,7 @@
 
     @Override
     public void cancel(ITupleReference tuple) throws HyracksDataException {
-        int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
-        try {
-            lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx);
-        } catch (ACIDException e) {
-            throw new HyracksDataException(e);
-        }
+        //no op
     }
 
     @Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 6c6b65f..51307e6 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -42,9 +42,11 @@
  */
 public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent {
 
-    private static final Logger LOGGER = Logger.getLogger(ConcurrentLockManager.class.getName());
-
-    public static final boolean IS_DEBUG_MODE = false;//true
+    private static final Logger LOGGER
+        = Logger.getLogger(ConcurrentLockManager.class.getName());
+    private static final Level LVL = Level.FINER;
+    
+    public static final boolean DEBUG_MODE = false;//true
 
     private TransactionSubsystem txnSubsystem;
     private ResourceGroupTable table;
@@ -53,11 +55,13 @@
     private JobArenaManager jobArenaMgr;
     private ConcurrentHashMap<Integer, Long> jobIdSlotMap;
     private ThreadLocal<DatasetLockCache> dsLockCache;
-        
+    
+    private LockManagerStats stats = new LockManagerStats(10000); 
+    
     enum LockAction {
         ERR(false, false),
         GET(false, false),
-        UPD(false, true), // special version of GET that updates the max lock mode
+        UPD(false, true), // version of GET that updates the max lock mode
         WAIT(true, false),
         CONV(true, true) // convert (upgrade) a lock (e.g. from S to X)
         ;
@@ -107,22 +111,22 @@
     public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
             throws ACIDException {
         log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+        stats.lock();
         
         final int dsId = datasetId.getId();        
         final int jobId = txnContext.getJobId().getId();
         
         if (entityHashValue != -1) {
-            // get the intention lock on the dataset, if we want to lock an individual item
-            final byte dsLockMode = LockMode.intentionMode(lockMode);
-            if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
-                lock(datasetId, -1, dsLockMode, txnContext);
-                dsLockCache.get().put(jobId, dsId, dsLockMode);
+            lock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext);
+        } else {
+            if (dsLockCache.get().contains(jobId, dsId, lockMode)) {
+                return;
             }
         }
 
         final long jobSlot = findOrAllocJobSlot(jobId);
         
-        final ResourceGroup group = table.get(datasetId, entityHashValue);
+        final ResourceGroup group = table.get(dsId, entityHashValue);
         group.getLatch();
         try {
             validateJob(txnContext);
@@ -149,6 +153,9 @@
                         throw new IllegalStateException();
                 }
             }
+            if (entityHashValue == -1) {
+                dsLockCache.get().put(jobId, dsId, lockMode);
+            }
         } finally {
             group.releaseLatch();
         }
@@ -208,20 +215,18 @@
     public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
             throws ACIDException {
         log("instantLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+        stats.instantLock();
         
         final int dsId = datasetId.getId();        
         final int jobId = txnContext.getJobId().getId();
         
         if (entityHashValue != -1) {
-            // get the intention lock on the dataset, if we want to lock an individual item
-            final byte dsLockMode = LockMode.intentionMode(lockMode);
-            if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
-                lock(datasetId, -1, dsLockMode, txnContext);
-                dsLockCache.get().put(jobId, dsId, dsLockMode);
-            }
+            lock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext);
+        } else {
+            throw new UnsupportedOperationException("instant locks are not supported on datasets");
         }
 
-        final ResourceGroup group = table.get(datasetId, entityHashValue);
+        final ResourceGroup group = table.get(dsId, entityHashValue);
         if (group.firstResourceIndex.get() == -1l) {
             validateJob(txnContext);
             // if we do not have a resource in the group, we know that the
@@ -265,6 +270,7 @@
         } finally {
             if (reqSlot != -1) {
                 // deallocate request, if we allocated one earlier
+                if (DEBUG_MODE) LOGGER.finer("del req slot " + TypeUtil.Global.toString(reqSlot));
                 reqArenaMgr.deallocate(reqSlot);
             }
             group.releaseLatch();
@@ -275,24 +281,24 @@
     public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
             throws ACIDException {
         log("tryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+        stats.tryLock();
         
         final int dsId = datasetId.getId();
         final int jobId = txnContext.getJobId().getId();
 
         if (entityHashValue != -1) {
-            // get the intention lock on the dataset, if we want to lock an individual item
-            final byte dsLockMode = LockMode.intentionMode(lockMode);
-            if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
-                if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
-                    return false;
-                }
-                dsLockCache.get().put(jobId, dsId, dsLockMode);
+            if (! tryLock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext)) {
+                return false;
+            }
+        } else {
+            if (dsLockCache.get().contains(jobId, dsId, lockMode)) {
+                return true;
             }
         }
 
         final long jobSlot = findOrAllocJobSlot(jobId);
         
-        final ResourceGroup group = table.get(datasetId, entityHashValue);
+        final ResourceGroup group = table.get(dsId, entityHashValue);
         group.getLatch();
 
         try {
@@ -300,7 +306,7 @@
 
             final long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
             final long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
-            
+
             final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
             switch (act) {
                 case UPD:
@@ -308,6 +314,9 @@
                     // no break
                 case GET:
                     addHolder(reqSlot, resSlot, jobSlot);
+                    if (entityHashValue == -1) {
+                        dsLockCache.get().put(jobId, dsId, lockMode);
+                    }
                     return true;
                 case WAIT:
                 case CONV:
@@ -318,7 +327,7 @@
         } finally {
             group.releaseLatch();
         }
-        
+
         // if we did acquire the dataset lock, but not the entity lock, we keep
         // it anyway and clean it up at the end of the job
     }
@@ -327,22 +336,20 @@
     public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
             ITransactionContext txnContext) throws ACIDException {
         log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+        stats.instantTryLock();
         
         final int dsId = datasetId.getId();
         final int jobId = txnContext.getJobId().getId();
 
         if (entityHashValue != -1) {
-            // get the intention lock on the dataset, if we want to lock an individual item
-            final byte dsLockMode = LockMode.intentionMode(lockMode);
-            if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
-                if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
-                    return false;
-                }
-                dsLockCache.get().put(jobId, dsId, dsLockMode);
+            if (! tryLock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext)) {
+                return false;
             }
+        } else {
+            throw new UnsupportedOperationException("instant locks are not supported on datasets");
         }
 
-        final ResourceGroup group = table.get(datasetId, entityHashValue);
+        final ResourceGroup group = table.get(dsId, entityHashValue);
         if (group.firstResourceIndex.get() == -1l) {
             validateJob(txnContext);
             // if we do not have a resource in the group, we know that the
@@ -382,23 +389,29 @@
     @Override
     public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext) throws ACIDException {
         log("unlock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+        final int jobId = txnContext.getJobId().getId();
+        final long jobSlot = jobIdSlotMap.get(jobId);
+        final int dsId = datasetId.getId();
+        unlock(dsId, entityHashValue, lockMode, jobSlot);
+    }
 
-        ResourceGroup group = table.get(datasetId, entityHashValue);
+    private void unlock(int dsId, int entityHashValue, byte lockMode, long jobSlot) throws ACIDException {
+        log("unlock", dsId, entityHashValue, lockMode, null);
+        stats.unlock();
+
+        ResourceGroup group = table.get(dsId, entityHashValue);
         group.getLatch();
         try {
 
-            int dsId = datasetId.getId();
             long resource = findResourceInGroup(group, dsId, entityHashValue);
             if (resource < 0) {
                 throw new IllegalStateException("resource (" + dsId + ",  " + entityHashValue + ") not found");
             }
 
-            int jobId = txnContext.getJobId().getId();
-            long jobSlot = findOrAllocJobSlot(jobId);
-
             long holder = removeLastHolder(resource, jobSlot, lockMode);
 
             // deallocate request
+            if (DEBUG_MODE) LOGGER.finer("del req slot " + TypeUtil.Global.toString(holder));
             reqArenaMgr.deallocate(holder);
             // deallocate resource or fix max lock mode
             if (resourceNotUsed(resource)) {
@@ -411,6 +424,7 @@
                     }
                     resArenaMgr.setNext(prev, resArenaMgr.getNext(resource));
                 }
+                if (DEBUG_MODE) LOGGER.finer("del res slot " + TypeUtil.Global.toString(resource));
                 resArenaMgr.deallocate(resource);
             } else {
                 final int oldMaxMode = resArenaMgr.getMaxMode(resource);
@@ -432,6 +446,7 @@
     @Override
     public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
         log("releaseLocks", -1, -1, LockMode.ANY, txnContext);
+        stats.releaseLocks();
 
         int jobId = txnContext.getJobId().getId();
         Long jobSlot = jobIdSlotMap.get(jobId);
@@ -439,6 +454,12 @@
             // we don't know the job, so there are no locks for it - we're done
             return;
         }
+        //System.err.println(table.append(new StringBuilder(), true).toString());
+        if (LOGGER.isLoggable(LVL)) {
+            LOGGER.log(LVL, "jobArenaMgr " + jobArenaMgr.addTo(new RecordManagerStats()).toString());
+            LOGGER.log(LVL, "resArenaMgr " + resArenaMgr.addTo(new RecordManagerStats()).toString());
+            LOGGER.log(LVL, "reqArenaMgr " + reqArenaMgr.addTo(new RecordManagerStats()).toString());
+        }
         long holder;
         synchronized (jobArenaMgr) {
             holder = jobArenaMgr.getLastHolder(jobSlot);
@@ -447,28 +468,30 @@
             long resource = reqArenaMgr.getResourceId(holder);
             int dsId = resArenaMgr.getDatasetId(resource);
             int pkHashVal = resArenaMgr.getPkHashVal(resource);
-            unlock(new DatasetId(dsId), pkHashVal, LockMode.ANY, txnContext);
+            unlock(dsId, pkHashVal, LockMode.ANY, jobSlot);
             synchronized (jobArenaMgr) {
                 holder = jobArenaMgr.getLastHolder(jobSlot);
             }
         }
+        if (DEBUG_MODE) LOGGER.finer("del job slot " + TypeUtil.Global.toString(jobSlot));
         jobArenaMgr.deallocate(jobSlot);
-        //System.err.println(table.append(new StringBuilder(), true).toString());        
-        //System.out.println("jobArenaMgr " + jobArenaMgr.addTo(new Stats()).toString());
-        //System.out.println("resArenaMgr " + resArenaMgr.addTo(new Stats()).toString());
-        //System.out.println("reqArenaMgr " + reqArenaMgr.addTo(new Stats()).toString());
+        jobIdSlotMap.remove(jobId);
+        stats.logCounters(LOGGER, Level.INFO, true);
+        //LOGGER.info(toString());
     }
         
     private long findOrAllocJobSlot(int jobId) {
         Long jobSlot = jobIdSlotMap.get(jobId);
         if (jobSlot == null) {
             jobSlot = new Long(jobArenaMgr.allocate());
+            if (DEBUG_MODE) LOGGER.finer("new job slot " + TypeUtil.Global.toString(jobSlot) + " (" + jobId + ")");
             jobArenaMgr.setJobId(jobSlot, jobId);
             Long oldSlot = jobIdSlotMap.putIfAbsent(jobId, jobSlot);
             if (oldSlot != null) {
                 // if another thread allocated a slot for this jobId between
                 // get(..) and putIfAbsent(..), we'll use that slot and
                 // deallocate the one we allocated
+                if (DEBUG_MODE) LOGGER.finer("del job slot " + TypeUtil.Global.toString(jobSlot) + " due to conflict");
                 jobArenaMgr.deallocate(jobSlot);
                 jobSlot = oldSlot;
             }
@@ -487,6 +510,9 @@
             resArenaMgr.setPkHashVal(resSlot, entityHashValue);
             resArenaMgr.setNext(resSlot, group.firstResourceIndex.get());
             group.firstResourceIndex.set(resSlot);
+            if (DEBUG_MODE) LOGGER.finer("new res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue + ")");
+        } else {
+            if (DEBUG_MODE) LOGGER.finer("fnd res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue + ")");
         }
         return resSlot;
     }
@@ -496,6 +522,12 @@
         reqArenaMgr.setResourceId(reqSlot, resSlot);
         reqArenaMgr.setLockMode(reqSlot, lockMode); // lock mode is a byte!!
         reqArenaMgr.setJobSlot(reqSlot, jobSlot);
+        if (DEBUG_MODE) {
+            LOGGER.finer("new req slot " + TypeUtil.Global.toString(reqSlot)
+                    + " (" + TypeUtil.Global.toString(resSlot)
+                    + ", " + TypeUtil.Global.toString(jobSlot)
+                    + ", " + LockMode.toString(lockMode) + ")");
+        }
         return reqSlot;
     }
 
@@ -538,6 +570,7 @@
     }
     
     private long findResourceInGroup(ResourceGroup group, int dsId, int entityHashValue) {
+        stats.logCounters(LOGGER, Level.INFO, false);
         long resSlot = group.firstResourceIndex.get();
         while (resSlot != -1) {
             // either we already have a lock on this resource or we have a 
@@ -551,7 +584,7 @@
         }
         return -1;        
     }
-    
+
     private void addHolder(long request, long resource, long job) {
         long lastHolder = resArenaMgr.getLastHolder(resource);
         reqArenaMgr.setNextRequest(request, lastHolder);
@@ -766,7 +799,7 @@
     }
 
     private void log(String string, int id, int entityHashValue, byte lockMode, ITransactionContext txnContext) {
-        if (! LOGGER.isLoggable(Level.FINEST)) {
+        if (! LOGGER.isLoggable(LVL)) {
             return;
         }
         StringBuilder sb = new StringBuilder();
@@ -784,7 +817,7 @@
             sb.append(" , jobId : ").append(txnContext.getJobId());            
         }
         sb.append(" }");
-        LOGGER.finest(sb.toString());
+        LOGGER.log(LVL, sb.toString());
     }
 
     private void validateJob(ITransactionContext txnContext) throws ACIDException {
@@ -855,6 +888,7 @@
                 os.write(toString().getBytes());
                 os.flush();
             } catch (IOException e) {
+                LOGGER.warning("caught exception when dumping state of ConcurrentLockManager: " + e.toString());
                 //ignore
             }
         }
@@ -912,9 +946,9 @@
             }
         }
         
-        ResourceGroup get(DatasetId dId, int entityHashValue) {
+        ResourceGroup get(int dId, int entityHashValue) {
             // TODO ensure good properties of hash function
-            int h = Math.abs(dId.getId() ^ entityHashValue);
+            int h = Math.abs(dId ^ entityHashValue);
             if (h < 0) h = 0;
             return table[h % TABLE_SIZE];
         }
@@ -979,6 +1013,7 @@
             try {
                 condition.await();
             } catch (InterruptedException e) {
+                LOGGER.finer("interrupted while wating on ResourceGroup");
                 throw new ACIDException(txnContext, "interrupted", e);
             }
         }
@@ -989,8 +1024,8 @@
         }
         
         void log(String s) {
-            if (LOGGER.isLoggable(Level.FINEST)) {
-                LOGGER.finest(s + " " + toString());
+            if (LOGGER.isLoggable(LVL)) {
+                LOGGER.log(LVL, s + " " + toString());
             }            
         }
         
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DummyLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DummyLockManager.java
new file mode 100644
index 0000000..691ae16
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DummyLockManager.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.DatasetId;
+import edu.uci.ics.asterix.common.transactions.ILockManager;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+
+
+/**
+ * A dummy implementation of the ILockManager interface. It assumes that all
+ * requests are successful. It can be used to for jobs that are known to be
+ * conflict free, but it'll yield terrible results if there are conflicts.
+ * 
+ * @author tillw
+ *
+ */
+public class DummyLockManager implements ILockManager, ILifeCycleComponent {
+
+    public DummyLockManager(TransactionSubsystem transactionSubsystem) {
+    }
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
+    }
+
+    @Override
+    public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+            throws ACIDException {
+    }
+
+    @Override
+    public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
+    }
+
+    @Override
+    public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+            throws ACIDException {
+    }
+
+    @Override
+    public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext context)
+            throws ACIDException {
+    }
+
+    @Override
+    public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext context)
+            throws ACIDException {
+        return true;
+    }
+
+    @Override
+    public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
+            ITransactionContext txnContext) throws ACIDException {
+        return true;
+    }
+
+    @Override
+    public String prettyPrint() throws ACIDException {
+        return "DummyLockManager";
+    }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerStats.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerStats.java
new file mode 100644
index 0000000..14d1775
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerStats.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+final class LockManagerStats {
+    private final int loggingPeriod;
+    
+    private final AtomicLong lCnt = new AtomicLong();
+    private final AtomicLong ilCnt = new AtomicLong();
+    private final AtomicLong tlCnt = new AtomicLong();
+    private final AtomicLong itlCnt = new AtomicLong();
+    private final AtomicLong ulCnt = new AtomicLong();
+    private final AtomicLong rlCnt = new AtomicLong();
+
+    LockManagerStats(int loggingPeriod) {
+        this.loggingPeriod = loggingPeriod;
+    }
+    
+    final void lock()           { lCnt.incrementAndGet(); }
+    final void instantLock()    { ilCnt.incrementAndGet(); }
+    final void tryLock()        { tlCnt.incrementAndGet(); }
+    final void instantTryLock() { itlCnt.incrementAndGet(); }
+    final void unlock()         { ulCnt.incrementAndGet(); }
+    final void releaseLocks()   { rlCnt.incrementAndGet(); }
+    
+    final int requestSum() {
+        return lCnt.intValue() + ilCnt.intValue() + tlCnt.intValue() 
+                + itlCnt.intValue() + ulCnt.intValue() + rlCnt.intValue();
+    }
+
+    final StringBuilder append(StringBuilder sb) {
+        sb.append("{")
+        .append(" lock : ").append(lCnt)
+        .append(", instantLock : ").append(ilCnt)
+        .append(", tryLock : ").append(tlCnt)
+        .append(", instantTryLock : ").append(itlCnt)
+        .append(", unlock : ").append(ulCnt)
+        .append(", releaseLocks : ").append(rlCnt)
+        .append(" }");
+        return sb;
+    }        
+
+    @Override
+    public String toString() {
+        return append(new StringBuilder()).toString();
+    }
+
+    final void logCounters(final Logger logger, final Level lvl, boolean always) {
+        if (logger.isLoggable(lvl) 
+            && (always || requestSum()  % loggingPeriod == 0)) {
+            logger.log(lvl, toString());
+        }
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index 1ed6fba..cec6cd3 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -50,6 +50,8 @@
     private final LinkedBlockingQueue<ILogRecord> syncCommitQ;
     private FileChannel fileChannel;
     private boolean stop;
+    private DatasetId reusableDsId;
+    private JobId reusableJobId;
 
     public LogPage(TransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLSN) {
         this.txnSubsystem = txnSubsystem;
@@ -64,6 +66,8 @@
         flushOffset = 0;
         isLastPage = false;
         syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
+        reusableDsId = new DatasetId(-1);
+        reusableJobId = new JobId(-1);
     }
 
     ////////////////////////////////////
@@ -193,21 +197,19 @@
         if (endOffset > beginOffset) {
             logPageReader.initializeScan(beginOffset, endOffset);
 
-            DatasetId dsId = new DatasetId(-1);
-            JobId jId = new JobId(-1);
             ITransactionContext txnCtx = null;
 
             LogRecord logRecord = logPageReader.next();
             while (logRecord != null) {
                 if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
-                    dsId.setId(logRecord.getDatasetId());
-                    jId.setId(logRecord.getJobId());
-                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jId, false);
-                    txnSubsystem.getLockManager().unlock(dsId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx);
+                    reusableJobId.setId(logRecord.getJobId());
+                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId, false);
+                    reusableDsId.setId(logRecord.getDatasetId());
+                    txnSubsystem.getLockManager().unlock(reusableDsId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx);
                     txnCtx.notifyOptracker(false);
                 } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
-                    jId.setId(logRecord.getJobId());
-                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jId, false);
+                    reusableJobId.setId(logRecord.getJobId());
+                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId, false);
                     txnCtx.notifyOptracker(true);
                     notifyJobTerminator();
                 }