work in progress
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ArenaManager.java.txt b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ArenaManager.java.txt
new file mode 100644
index 0000000..332167a
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ArenaManager.java.txt
@@ -0,0 +1,57 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.util.ArrayList;
+
+public class @E@ArenaManager {
+    
+    private ArrayList<@E@MemoryManager> arenas;
+    private volatile int nextArena; 
+    private ThreadLocal<LocalManager> local;    
+    
+    public @E@ArenaManager() {
+        int noArenas = Runtime.getRuntime().availableProcessors() * 2;
+        arenas = new ArrayList<@E@MemoryManager>(noArenas);
+        local = new ThreadLocal<LocalManager>() {
+            @Override
+            protected LocalManager initialValue() {
+                return getNext();
+            }
+        };
+    }
+    
+    public static int arenaId(int i) {
+        return (i >> 24) & 0xff;
+    }
+
+    public static int localId(int i) {
+        return i & 0xffffff;
+    }
+
+    public synchronized LocalManager getNext() {
+        @E@MemoryManager mgr = arenas.get(nextArena);
+        if (mgr == null) {
+            mgr = new @E@MemoryManager();
+            arenas.set(nextArena, mgr);
+        }
+        LocalManager res = new LocalManager();
+        res.mgr = mgr;
+        res.arenaId = nextArena;
+        nextArena = (nextArena + 1) % arenas.size();
+        return res;
+    }
+    
+    public @E@MemoryManager get(int i) {
+        return arenas.get(i);
+    }
+    
+    public @E@MemoryManager local() {
+        return local.get().mgr;
+    }
+    
+    @METHODS@
+    
+    static class LocalManager {
+        int arenaId;
+        @E@MemoryManager mgr;
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
new file mode 100644
index 0000000..7abb71b
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -0,0 +1,761 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.DatasetId;
+import edu.uci.ics.asterix.common.transactions.ILockManager;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
+import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+
+/**
+ * An implementation of the ILockManager interface for the
+ * specific case of locking protocol with two lock modes: (S) and (X),
+ * where S lock mode is shown by 0, and X lock mode is shown by 1.
+ * 
+ * @author tillw
+ */
+
+public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent {
+
+    public static final boolean IS_DEBUG_MODE = false;//true
+
+    private TransactionSubsystem txnSubsystem;
+
+    private ResourceGroupTable table;
+    private ResourceArenaManager resArenaMgr;
+    private RequestArenaManager reqArenaMgr;
+    private ConcurrentHashMap<Integer, Integer> jobReqMap; // TODO different impl
+        
+    enum LockAction {
+        GET,
+        UPD, // special version of GET that updates the max lock mode
+        WAIT
+    }
+    
+    static LockAction[][] ACTION_MATRIX = {
+        // new    NL              IS               IX                S                X
+        { LockAction.GET,  LockAction.UPD,  LockAction.UPD,  LockAction.UPD,  LockAction.UPD  }, // NL
+        { LockAction.WAIT, LockAction.GET,  LockAction.UPD,  LockAction.UPD,  LockAction.WAIT }, // IS
+        { LockAction.WAIT, LockAction.GET,  LockAction.GET,  LockAction.WAIT, LockAction.WAIT }, // IX
+        { LockAction.WAIT, LockAction.GET,  LockAction.WAIT, LockAction.GET,  LockAction.WAIT }, // S
+        { LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT }  // X
+    };
+        
+    public ConcurrentLockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
+        this.txnSubsystem = txnSubsystem;
+        
+        this.table = new ResourceGroupTable();
+        
+        final int lockManagerShrinkTimer = txnSubsystem.getTransactionProperties()
+                .getLockManagerShrinkTimer();
+
+        resArenaMgr = new ResourceArenaManager();
+        reqArenaMgr = new RequestArenaManager();
+        jobReqMap = new ConcurrentHashMap<>();
+    }
+
+    public AsterixTransactionProperties getTransactionProperties() {
+        return this.txnSubsystem.getTransactionProperties();
+    }
+
+    @Override
+    public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+            throws ACIDException {
+        
+        if (entityHashValue != -1) {
+            // get the intention lock on the dataset, if we want to lock an individual item
+            byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
+            lock(datasetId, -1, dsLockMode, txnContext);
+        }
+
+        int dsId = datasetId.getId();
+
+        ResourceGroup group = table.get(datasetId, entityHashValue);
+        group.getLatch();
+
+        // 1) Find the resource in the hash table
+        
+        int resSlot = findResourceInGroup(group, dsId, entityHashValue);
+        
+        if (resSlot == -1) {
+            // we don't know about this resource, let's alloc a slot
+            ResourceMemoryManager resMgr = resArenaMgr.local();
+            resSlot = resMgr.allocate();
+            resMgr.setDatasetId(resSlot, datasetId.getId());
+            resMgr.setPkHashVal(resSlot, entityHashValue);
+
+            if (group.firstResourceIndex.get() == -1) {
+                group.firstResourceIndex.set(resSlot);
+            }
+        }
+        
+        // 2) create a request entry
+
+        int jobId = txnContext.getJobId().getId();
+        int reqSlot = reqArenaMgr.local().allocate();
+        reqArenaMgr.setResourceId(reqSlot, resSlot);
+        reqArenaMgr.setLockMode(reqSlot, lockMode); // lock mode is a byte!!
+        reqArenaMgr.setJobId(reqSlot, jobId);
+        
+        Integer headOfJobReqQueue = jobReqMap.putIfAbsent(jobId, reqSlot);
+        if (headOfJobReqQueue != null) {
+            // TODO make sure this works (even if the job gets removed from the table)
+            while (!jobReqMap.replace(jobId, headOfJobReqQueue, reqSlot)) {
+                headOfJobReqQueue = jobReqMap.putIfAbsent(jobId, reqSlot);
+            }
+        }
+        // this goes across arenas
+        
+        reqArenaMgr.setNextJobRequest(reqSlot, headOfJobReqQueue);
+        reqArenaMgr.setPrevJobRequest(reqSlot, -1);
+        reqArenaMgr.setNextJobRequest(headOfJobReqQueue, reqSlot);
+        
+        // 3) check lock compatibility
+        
+        boolean locked = false;
+        
+        while (! locked) {
+            int curLockMode = resArenaMgr.getMaxMode(resSlot);
+            switch (ACTION_MATRIX[curLockMode][lockMode]) {
+                case UPD:
+                    resArenaMgr.setMaxMode(resSlot, lockMode);
+                    // no break
+                case GET:
+                    addHolderToResource(resSlot, reqSlot);
+                    locked = true;
+                    break;
+                case WAIT:
+                    // TODO can we have more than on upgrader? Or do we need to
+                    // abort if we get a second upgrader?
+                    if (findLastHolderForJob(resSlot, jobId) != -1) {
+                        addUpgraderToResource(resSlot, reqSlot);
+                    } else {
+                        addWaiterToResource(resSlot, reqSlot);
+                    }
+                    try {
+                        group.await();
+                    } catch (InterruptedException e) {
+                        throw new ACIDException(txnContext, "interrupted", e);
+                    }
+                    break;       
+            }
+            
+            // TODO where do we check for deadlocks?
+        }
+        
+        group.releaseLatch();
+        
+        //internalLock(datasetId, entityHashValue, lockMode, txnContext, false);
+    }
+
+    @Override
+    public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException {
+
+        ResourceGroup group = table.get(datasetId, entityHashValue);
+        group.getLatch();
+
+        int dsId = datasetId.getId();
+        int resource = findResourceInGroup(group, dsId, entityHashValue);
+        
+        if (resource < 0) {
+            throw new IllegalStateException("resource (" + dsId + ",  " + entityHashValue + ") not found");
+        }
+        
+        int jobId = txnContext.getJobId().getId();
+        // since locking is properly nested, finding the last holder is good enough
+        
+        int holder = resArenaMgr.getLastHolder(resource);
+        if (holder < 0) {
+            throw new IllegalStateException("no holder for resource " + resource);
+        }
+        
+        // remove from the list of holders
+        if (reqArenaMgr.getJobId(holder) == jobId) {
+            int next = reqArenaMgr.getNextRequest(holder);
+            resArenaMgr.setLastHolder(resource, next);
+        } else {
+            int prev = holder;
+            while (prev != -1) {
+                holder = reqArenaMgr.getNextRequest(prev);
+                if (holder == -1) {
+                    throw new IllegalStateException("no holder for resource " + resource + " and job " + jobId);
+                }
+                if (jobId == reqArenaMgr.getJobId(holder)) {
+                    break;
+                }
+                prev = holder;
+            }
+            int next = reqArenaMgr.getNextRequest(holder);
+            reqArenaMgr.setNextRequest(prev, next);
+        }
+        
+        // remove from the list of requests for a job
+        int prevForJob = reqArenaMgr.getPrevJobRequest(holder);
+        int nextForJob = reqArenaMgr.getNextJobRequest(holder);
+        if (prevForJob == -1) {
+            if (nextForJob == -1) {
+                jobReqMap.remove(jobId);
+            } else {
+                jobReqMap.put(jobId, nextForJob);
+            }
+        } else {
+            reqArenaMgr.setNextJobRequest(prevForJob, nextForJob);
+        }
+        if (nextForJob != -1) {
+            reqArenaMgr.setPrevJobRequest(nextForJob, prevForJob);
+        }
+        
+        // deallocate request
+        reqArenaMgr.local().deallocate(holder);
+        // deallocate resource or fix max lock mode
+        if (resourceNotUsed(resource)) {
+            int prev = group.firstResourceIndex.get();
+            if (prev == resource) {
+                group.firstResourceIndex.set(resArenaMgr.getNext(resource));
+            } else {
+                while (resArenaMgr.getNext(prev) != resource) {
+                    prev = resArenaMgr.getNext(prev);
+                }
+                resArenaMgr.setNext(prev, resArenaMgr.getNext(resource));
+            }
+            resArenaMgr.local().deallocate(resource);
+        } else {
+            final int oldMaxMode = resArenaMgr.getMaxMode(resource);
+            final int newMaxMode = getNewMaxMode(resource, oldMaxMode);
+            resArenaMgr.setMaxMode(resource, newMaxMode);
+            if (oldMaxMode != newMaxMode) {
+                // the locking mode didn't change, current waiters won't be
+                // able to acquire the lock, so we do not need to signal them
+                group.wakeUp();
+            }
+        }
+        group.releaseLatch();
+        
+        // finally remove the intention lock as well
+        if (entityHashValue != -1) {
+            // remove the intention lock on the dataset, if we want to lock an individual item
+            unlock(datasetId, -1, txnContext);
+        }
+        
+        //internalUnlock(datasetId, entityHashValue, txnContext, false, false);
+    }
+    
+    @Override
+    public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
+        int jobId = txnContext.getJobId().getId();
+        Integer head = jobReqMap.get(jobId);
+        while (head != null) {
+            int resource = reqArenaMgr.getResourceId(head);
+            int dsId = resArenaMgr.getDatasetId(resource);
+            int pkHashVal = resArenaMgr.getPkHashVal(resource);
+            unlock(new DatasetId(dsId), pkHashVal, txnContext);
+            head = jobReqMap.get(jobId);            
+        }
+    }
+        
+    private int findLastHolderForJob(int resource, int job) {
+        int holder = resArenaMgr.getLastHolder(resource);
+        while (holder != -1) {
+            if (job == reqArenaMgr.getJobId(holder)) {
+                return holder;
+            }
+            holder = reqArenaMgr.getNextRequest(holder);
+        }
+        return -1;
+    }
+    
+    private int findResourceInGroup(ResourceGroup group, int dsId, int entityHashValue) {
+        int resSlot = group.firstResourceIndex.get();
+        // we're looking in the local one as that should be set correctly // TODO make sure! 
+        ResourceMemoryManager resMgr = resArenaMgr.local();
+        while (resSlot != -1) {
+            // either we already have a lock on this resource or we have a 
+            // hash collision
+            if (resMgr.getDatasetId(resSlot) == dsId && 
+                    resMgr.getPkHashVal(resSlot) == entityHashValue) {
+                return resSlot;
+            } else {
+                resSlot = resMgr.getNext(resSlot);
+            }
+        }
+        return -1;        
+    }
+    
+    private void addHolderToResource(int resource, int request) {
+        final int lastHolder = resArenaMgr.getLastHolder(resource);
+        reqArenaMgr.setNextRequest(request, lastHolder);
+        resArenaMgr.setLastHolder(resource, request);
+    }
+
+    private void addWaiterToResource(int resource, int request) {
+        int waiter = resArenaMgr.getFirstWaiter(resource);
+        reqArenaMgr.setNextRequest(request, -1);
+        if (waiter == -1) {
+            resArenaMgr.setFirstWaiter(resource, request);
+        } else {
+            appendToRequestQueue(waiter, request);
+        }
+    }
+
+    private void addUpgraderToResource(int resource, int request) {
+        int upgrader = resArenaMgr.getFirstUpgrader(resource);
+        reqArenaMgr.setNextRequest(request, -1);
+        if (upgrader == -1) {
+            resArenaMgr.setFirstUpgrader(resource, request);
+        } else {
+            appendToRequestQueue(upgrader, request);
+        }
+    }
+
+    private void appendToRequestQueue(int head, int appendee) {
+        int next = reqArenaMgr.getNextRequest(head);
+        while(next != -1) {
+            head = next;
+            next = reqArenaMgr.getNextRequest(head);
+        }
+        reqArenaMgr.setNextRequest(head, appendee);        
+    }
+    
+    private int getNewMaxMode(int resource, int oldMaxMode) {
+        int newMaxMode = LockMode.NL;
+        int holder = resArenaMgr.getLastHolder(resource);
+        while (holder != -1) {
+            int curLockMode = reqArenaMgr.getLockMode(holder);
+            if (curLockMode == oldMaxMode) {
+                // we have another lock of the same mode - we're done
+                return oldMaxMode;
+            }
+            switch (ACTION_MATRIX[newMaxMode][curLockMode]) {
+                case UPD:
+                    newMaxMode = curLockMode;
+                    break;
+                case GET:
+                    break;
+                case WAIT:
+                    throw new IllegalStateException("incompatible locks in holder queue");
+            }
+        }
+        return newMaxMode;        
+    }
+    
+    private boolean resourceNotUsed(int resource) {
+        return resArenaMgr.getLastHolder(resource) == -1
+                && resArenaMgr.getFirstUpgrader(resource) == -1
+                && resArenaMgr.getFirstWaiter(resource) == -1;
+    }
+    
+
+
+    private void validateJob(ITransactionContext txnContext) throws ACIDException {
+        if (txnContext.getTxnState() == ITransactionManager.ABORTED) {
+            throw new ACIDException("" + txnContext.getJobId() + " is in ABORTED state.");
+        } else if (txnContext.isTimeout()) {
+            requestAbort(txnContext);
+        }
+    }
+
+    //@Override
+    public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
+            throws ACIDException {
+        throw new IllegalStateException();
+        //internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
+    }
+
+    private void instantUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
+            throws ACIDException {
+        throw new IllegalStateException();
+        //internalUnlock(datasetId, entityHashValue, txnContext, true, false);
+    }
+
+    @Override
+    public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+            throws ACIDException {
+
+        //        try {
+        //            internalLock(datasetId, entityHashValue, lockMode, txnContext);
+        //            return;
+        //        } finally {
+        //            unlock(datasetId, entityHashValue, txnContext);
+        //        }
+        throw new IllegalStateException();
+        //internalLock(datasetId, entityHashValue, lockMode, txnContext, true);
+        //instantUnlock(datasetId, entityHashValue, txnContext);
+    }
+
+    @Override
+    public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+            throws ACIDException {
+        throw new IllegalStateException();
+        //return internalTryLock(datasetId, entityHashValue, lockMode, txnContext, false);
+    }
+
+    @Override
+    public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
+            ITransactionContext txnContext) throws ACIDException {
+        throw new IllegalStateException();
+        //return internalInstantTryLock(datasetId, entityHashValue, lockMode, txnContext);
+    }
+
+    private void trackLockRequest(String msg, int requestType, DatasetId datasetIdObj, int entityHashValue,
+            byte lockMode, ITransactionContext txnContext, DatasetLockInfo dLockInfo, int eLockInfo) {
+/*
+        StringBuilder s = new StringBuilder();
+        LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType, datasetIdObj,
+                entityHashValue, lockMode, txnContext);
+        s.append(Thread.currentThread().getId() + ":");
+        s.append(msg);
+        if (msg.equals("Granted")) {
+            if (dLockInfo != null) {
+                s.append("\t|D| ");
+                s.append(dLockInfo.getIXCount()).append(",");
+                s.append(dLockInfo.getISCount()).append(",");
+                s.append(dLockInfo.getXCount()).append(",");
+                s.append(dLockInfo.getSCount()).append(",");
+                if (dLockInfo.getFirstUpgrader() != -1) {
+                    s.append("+");
+                } else {
+                    s.append("-");
+                }
+                s.append(",");
+                if (dLockInfo.getFirstWaiter() != -1) {
+                    s.append("+");
+                } else {
+                    s.append("-");
+                }
+            }
+
+            if (eLockInfo != -1) {
+                s.append("\t|E| ");
+                s.append(entityLockInfoManager.getXCount(eLockInfo)).append(",");
+                s.append(entityLockInfoManager.getSCount(eLockInfo)).append(",");
+                if (entityLockInfoManager.getUpgrader(eLockInfo) != -1) {
+                    s.append("+");
+                } else {
+                    s.append("-");
+                }
+                s.append(",");
+                if (entityLockInfoManager.getFirstWaiter(eLockInfo) != -1) {
+                    s.append("+");
+                } else {
+                    s.append("-");
+                }
+            }
+        }
+
+        lockRequestTracker.addEvent(s.toString(), request);
+        if (msg.equals("Requested")) {
+            lockRequestTracker.addRequest(request);
+        }
+        System.out.println(request.prettyPrint() + "--> " + s.toString());
+*/
+    }
+
+    public String getHistoryForAllJobs() {
+/*        if (IS_DEBUG_MODE) {
+            return lockRequestTracker.getHistoryForAllJobs();
+        }
+*/
+        return null;
+    }
+
+    public String getHistoryPerJob() {
+/*        if (IS_DEBUG_MODE) {
+            return lockRequestTracker.getHistoryPerJob();
+        }
+*/
+        return null;
+    }
+
+    public String getRequestHistoryForAllJobs() {
+/*        if (IS_DEBUG_MODE) {
+            return lockRequestTracker.getRequestHistoryForAllJobs();
+        }
+*/
+        return null;
+    }
+
+    private void requestAbort(ITransactionContext txnContext) throws ACIDException {
+        txnContext.setTimeout(true);
+        throw new ACIDException("Transaction " + txnContext.getJobId()
+                + " should abort (requested by the Lock Manager)");
+    }
+
+    /**
+     * For now, upgrading lock granule from entity-granule to dataset-granule is not supported!!
+     * 
+     * @param fromLockMode
+     * @param toLockMode
+     * @return
+     */
+    private boolean isLockUpgrade(byte fromLockMode, byte toLockMode) {
+        return fromLockMode == LockMode.S && toLockMode == LockMode.X;
+    }
+
+    @Override
+    public String prettyPrint() throws ACIDException {
+        StringBuilder s = new StringBuilder("\n########### LockManager Status #############\n");
+        return s + "\n";
+    }
+
+    @Override
+    public void start() {
+        //no op
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream os) {
+        if (dumpState) {
+
+            //#. dump Configurable Variables
+            dumpConfVars(os);
+
+            //#. dump jobHT
+            dumpJobInfo(os);
+
+            //#. dump datasetResourceHT
+            dumpDatasetLockInfo(os);
+
+            //#. dump entityLockInfoManager
+            dumpEntityLockInfo(os);
+
+            //#. dump entityInfoManager
+            dumpEntityInfo(os);
+
+            //#. dump lockWaiterManager
+
+            dumpLockWaiterInfo(os);
+            try {
+                os.flush();
+            } catch (IOException e) {
+                //ignore
+            }
+        }
+    }
+
+    private void dumpConfVars(OutputStream os) {
+        try {
+            StringBuilder sb = new StringBuilder();
+            sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+            sb.append("\nESCALATE_TRHESHOLD_ENTITY_TO_DATASET: "
+                    + txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
+//            sb.append("\nSHRINK_TIMER_THRESHOLD (entityLockInfoManager): "
+//                    + entityLockInfoManager.getShrinkTimerThreshold());
+//            sb.append("\nSHRINK_TIMER_THRESHOLD (entityInfoManager): " + entityInfoManager.getShrinkTimerThreshold());
+//            sb.append("\nSHRINK_TIMER_THRESHOLD (lockWaiterManager): " + lockWaiterManager.getShrinkTimerThreshold());
+            sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void dumpJobInfo(OutputStream os) {
+        JobId jobId;
+        JobInfo jobInfo;
+        StringBuilder sb = new StringBuilder();
+/*
+        try {
+            sb.append("\n>>dump_begin\t>>----- [JobInfo] -----");
+            Set<Map.Entry<JobId, JobInfo>> entrySet = jobHT.entrySet();
+            if (entrySet != null) {
+                for (Map.Entry<JobId, JobInfo> entry : entrySet) {
+                    if (entry != null) {
+                        jobId = entry.getKey();
+                        if (jobId != null) {
+                            sb.append("\n" + jobId);
+                        } else {
+                            sb.append("\nJID:null");
+                        }
+
+                        jobInfo = entry.getValue();
+                        if (jobInfo != null) {
+                            sb.append(jobInfo.coreDump());
+                        } else {
+                            sb.append("\nJobInfo:null");
+                        }
+                    }
+                }
+            }
+            sb.append("\n>>dump_end\t>>----- [JobInfo] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+*/
+    }
+
+    private void dumpDatasetLockInfo(OutputStream os) {
+/*
+        DatasetId datasetId;
+        DatasetLockInfo datasetLockInfo;
+        StringBuilder sb = new StringBuilder();
+
+        try {
+            sb.append("\n>>dump_begin\t>>----- [DatasetLockInfo] -----");
+            Set<Map.Entry<DatasetId, DatasetLockInfo>> entrySet = datasetResourceHT.entrySet();
+            if (entrySet != null) {
+                for (Map.Entry<DatasetId, DatasetLockInfo> entry : entrySet) {
+                    if (entry != null) {
+                        datasetId = entry.getKey();
+                        if (datasetId != null) {
+                            sb.append("\nDatasetId:" + datasetId.getId());
+                        } else {
+                            sb.append("\nDatasetId:null");
+                        }
+
+                        datasetLockInfo = entry.getValue();
+                        if (datasetLockInfo != null) {
+                            sb.append(datasetLockInfo.coreDump());
+                        } else {
+                            sb.append("\nDatasetLockInfo:null");
+                        }
+                    }
+                    sb.append("\n>>dump_end\t>>----- [DatasetLockInfo] -----\n");
+                    os.write(sb.toString().getBytes());
+
+                    //create a new sb to avoid possible OOM exception
+                    sb = new StringBuilder();
+                }
+            }
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+*/
+    }
+
+    private void dumpEntityLockInfo(OutputStream os) {
+/*
+        StringBuilder sb = new StringBuilder();
+        try {
+            sb.append("\n>>dump_begin\t>>----- [EntityLockInfo] -----");
+            entityLockInfoManager.coreDump(os);
+            sb.append("\n>>dump_end\t>>----- [EntityLockInfo] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+*/
+    }
+
+    private void dumpEntityInfo(OutputStream os) {
+/*
+        StringBuilder sb = new StringBuilder();
+        try {
+            sb.append("\n>>dump_begin\t>>----- [EntityInfo] -----");
+            entityInfoManager.coreDump(os);
+            sb.append("\n>>dump_end\t>>----- [EntityInfo] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+*/
+    }
+
+    private void dumpLockWaiterInfo(OutputStream os) {
+/*
+        StringBuilder sb = new StringBuilder();
+        try {
+            sb.append("\n>>dump_begin\t>>----- [LockWaiterInfo] -----");
+            lockWaiterManager.coreDump(os);
+            sb.append("\n>>dump_end\t>>----- [LockWaiterInfo] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+*/
+    }
+
+    private static class ResourceGroupTable {
+        public static final int TABLE_SIZE = 10; // TODO increase
+
+        private ResourceGroup[] table;
+        
+        public ResourceGroupTable() {
+            for (int i = 0; i < TABLE_SIZE; ++i) {
+                table[i] = new ResourceGroup();
+            }
+        }
+        
+        ResourceGroup get(DatasetId dId, int entityHashValue) {
+            // TODO ensure good properties of hash function
+            int h = dId.getId() ^ entityHashValue;
+            return table[h % TABLE_SIZE];
+        }
+    }
+    
+    private static class ResourceGroup {
+        private ReentrantReadWriteLock latch;
+        private Condition condition;
+        AtomicInteger firstResourceIndex;
+
+        ResourceGroup() {
+            latch = new ReentrantReadWriteLock();
+            condition = latch.writeLock().newCondition();
+            firstResourceIndex = new AtomicInteger(-1);
+        }
+        
+        void getLatch() {
+            latch.writeLock().lock();
+        }
+        
+        void releaseLatch() {
+            latch.writeLock().unlock();
+        }
+        
+        boolean hasWaiters() {
+            return latch.hasQueuedThreads();
+        }
+        
+        void await() throws InterruptedException {
+            condition.await();
+        }
+        
+        void wakeUp() {
+            condition.signalAll();
+        }
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Generator.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Generator.java
new file mode 100644
index 0000000..c530883
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Generator.java
@@ -0,0 +1,116 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import edu.uci.ics.asterix.transaction.management.service.locking.RecordType.Field;
+
+public class Generator {
+    public static void main(String args[]) {
+        
+        RecordType resource = new RecordType("Resource");
+        resource.addField("last holder",    RecordType.Type.INT, "-1");
+        resource.addField("first waiter",   RecordType.Type.INT, "-1");
+        resource.addField("first upgrader", RecordType.Type.INT, "-1");
+        resource.addField("max mode",       RecordType.Type.INT, "LockMode.NL");
+        resource.addField("dataset id",     RecordType.Type.INT, null);
+        resource.addField("pk hash val",    RecordType.Type.INT, null);
+        resource.addField("next",           RecordType.Type.INT, null);
+        
+        RecordType request = new RecordType("Request");
+        request.addField("resource id",      RecordType.Type.INT,  null);
+        request.addField("lock mode",        RecordType.Type.INT,  null);
+        request.addField("job id",           RecordType.Type.INT,  null);
+        request.addField("prev job request", RecordType.Type.INT,  null);
+        request.addField("next job request", RecordType.Type.INT,  null);
+        request.addField("next request",     RecordType.Type.INT,  null);
+
+        
+        StringBuilder sb = new StringBuilder();
+
+        //generateMemoryManagerSource(request, sb);
+        //generateMemoryManagerSource(resource, sb);
+        generateArenaManagerSource(request, sb);
+        //generateArenaManagerSource(resource, sb);
+
+        System.out.println(sb.toString());
+    }
+
+    private static void generateMemoryManagerSource(RecordType resource, StringBuilder sb) {
+        InputStream is = resource.getClass().getResourceAsStream("StructuredMemoryManager.java.txt");
+        if (is == null) {
+            throw new IllegalStateException();
+        }
+        BufferedReader in = new BufferedReader(new InputStreamReader(is));
+        String line = null;
+
+        try {
+
+            String indent = "    ";
+
+            while((line = in.readLine()) != null) {
+                if (line.contains("@E@")) {
+                    line = line.replace("@E@", resource.name);
+                }
+                if (line.contains("@CONSTS@")) {
+                    resource.appendConstants(sb, indent, 1);
+                    sb.append('\n');
+                } else if (line.contains("@METHODS@")) {
+                    for (int i = 0; i < resource.size(); ++i) {
+                        final Field field = resource.fields.get(i);
+                        field.appendMemoryManagerGetMethod(sb, indent, 1);
+                        sb.append('\n');
+                        field.appendMemoryManagerSetMethod(sb, indent, 1);
+                        sb.append('\n');
+                    }
+                } else if (line.contains("@INIT_SLOT@")) {
+                    for (int i = 0; i < resource.size(); ++i) {                        
+                        final Field field = resource.fields.get(i);
+                        field.appendInitializers(sb, indent, 1);
+                    }
+                } else {
+                  sb.append(line).append('\n');
+                }
+            }
+
+        } catch (IOException ioe) {
+            ioe.printStackTrace();
+        }
+    }
+
+    private static void generateArenaManagerSource(RecordType resource, StringBuilder sb) {
+        InputStream is = resource.getClass().getResourceAsStream("ArenaManager.java.txt");
+        if (is == null) {
+            throw new IllegalStateException();
+        }
+        BufferedReader in = new BufferedReader(new InputStreamReader(is));
+        String line = null;
+
+        try {
+
+            String indent = "    ";
+
+            while((line = in.readLine()) != null) {
+                if (line.contains("@E@")) {
+                    line = line.replace("@E@", resource.name);
+                }
+                if (line.contains("@METHODS@")) {
+                    for (int i = 0; i < resource.size(); ++i) {
+                        final Field field = resource.fields.get(i);
+                        field.appendArenaManagerGetMethod(sb, indent, 1);
+                        sb.append('\n');
+                        field.appendArenaManagerSetMethod(sb, indent, 1);
+                        sb.append('\n');
+                    }
+                } else {
+                  sb.append(line).append('\n');
+                }
+            }
+
+        } catch (IOException ioe) {
+            ioe.printStackTrace();
+        }
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordType.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordType.java
new file mode 100644
index 0000000..dc0f252
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordType.java
@@ -0,0 +1,235 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.util.ArrayList;
+
+public class RecordType {
+    
+    enum Type {
+        BYTE,
+        SHORT,
+        INT
+    }
+    
+    static class Field {
+        
+        String name;
+        Type type;
+        String initial;
+        int offset;
+
+        Field(String name, Type type, String initial, int offset) {
+            this.name = name;
+            this.type = type;
+            this.initial = initial;
+            this.offset = offset;
+        }
+        
+        String methodName(String prefix) {
+            String words[] = name.split(" ");
+            assert(words.length > 0);
+            StringBuilder sb = new StringBuilder(prefix);
+            for (int j = 0; j < words.length; ++j) {
+                String word = words[j];
+                sb.append(word.substring(0, 1).toUpperCase());
+                sb.append(word.substring(1));
+            }
+            return sb.toString();        
+        }
+                
+        StringBuilder appendMemoryManagerGetMethod(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("public ")
+              .append(name(type))
+              .append(' ')
+              .append(methodName("get"))
+              .append("(int slotNum) {\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("return b.")
+              .append(bbGetter(type))
+              .append("((slotNum % NO_SLOTS) * ITEM_SIZE + ")
+              .append(offsetName())
+              .append(");\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            return sb;
+        }
+            
+        StringBuilder appendMemoryManagerSetMethod(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("public void ")
+              .append(methodName("set"))
+              .append("(int slotNum, ")
+              .append(name(type))
+              .append(" value) {\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("  b.")
+              .append(bbSetter(type))
+              .append("((slotNum % NO_SLOTS) * ITEM_SIZE + ")
+              .append(offsetName())
+              .append(", value);\n");
+            sb = indent(sb, indent, level);
+            sb.append(indent)
+              .append("}\n");
+            return sb;
+        }
+
+        StringBuilder appendArenaManagerSetThreadLocal(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("final int arenaId = arenaId(slotNum);\n");
+            sb = indent(sb, indent, level);
+            sb.append("if (arenaId != local.get().arenaId) {\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("local.get().arenaId = arenaId;\n");
+            sb = indent(sb, indent, level + 1);
+            sb.append("local.get().mgr = get(arenaId);\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            return sb;
+        }
+        
+        StringBuilder appendArenaManagerGetMethod(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("public ")
+              .append(name(type))
+              .append(' ')
+              .append(methodName("get"))
+              .append("(int slotNum) {\n");
+            sb = appendArenaManagerSetThreadLocal(sb, indent, level + 1);
+            sb = indent(sb, indent, level + 1);
+            sb.append("return local.get().mgr.")
+              .append(methodName("get"))
+              .append("(localId(slotNum));\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            return sb;
+        }
+            
+        StringBuilder appendArenaManagerSetMethod(StringBuilder sb, String indent, int level) {
+            sb = indent(sb, indent, level);
+            sb.append("public void ")
+              .append(methodName("set"))
+              .append("(int slotNum, ")
+              .append(name(type))
+              .append(" value) {\n");
+            sb = appendArenaManagerSetThreadLocal(sb, indent, level + 1);
+            sb = indent(sb, indent, level + 1);
+            sb.append("local.get().mgr.")
+              .append(methodName("set"))
+              .append("(localId(slotNum), value);\n");
+            sb = indent(sb, indent, level);
+            sb.append("}\n");
+            return sb;
+        }
+        
+        StringBuilder appendInitializers(StringBuilder sb, String indent, int level) {
+            if (initial != null) {
+                sb = indent(sb, indent, level);
+                sb.append("bb.")
+                  .append(bbSetter(type))
+                  .append("(currentSlot * ITEM_SIZE + ")
+                  .append(offsetName())
+                  .append(", ")
+                  .append(initial)
+                  .append(");\n");
+            }
+            return sb;
+        }
+        
+        String offsetName() {
+            String words[] = name.split(" ");
+            assert(words.length > 0);
+            StringBuilder sb = new StringBuilder(words[0].toUpperCase());
+            for (int j = 1; j < words.length; ++j) {
+                sb.append("_").append(words[j].toUpperCase());
+            }
+            sb.append("_OFF");
+            return sb.toString();
+        }
+        
+        int offset() {
+            return offset;
+        }
+    }
+
+    String name;
+    ArrayList<Field> fields;
+    int totalSize;
+    
+    static StringBuilder indent(StringBuilder sb, String indent, int level) {
+        for (int i = 0; i < level; ++i) {
+            sb.append(indent);
+        }
+        return sb;
+    }
+    
+    RecordType(String name) {
+        this.name = name;
+        fields = new ArrayList<Field>();
+        totalSize = 0;
+    }
+    
+    void addField(String name, Type type, String initial) {
+        fields.add(new Field(name, type, initial, totalSize));
+        totalSize += size(type);
+    }
+     
+    int size() {
+        return fields.size();
+    }
+    
+    static int size(Type t) {
+        switch(t) {
+            case BYTE:  return 1;
+            case SHORT: return 2;
+            case INT:   return 4;
+            default:    throw new IllegalArgumentException();
+        }
+    }
+    
+    static String name(Type t) {
+        switch(t) {
+            case BYTE:  return "byte";
+            case SHORT: return "short";
+            case INT:   return "int";
+            default:    throw new IllegalArgumentException();
+        }
+    }
+    
+    static String bbGetter(Type t) {
+        switch(t) {
+            case BYTE:  return "get";
+            case SHORT: return "getShort";
+            case INT:   return "getInt";
+            default:    throw new IllegalArgumentException();
+        }
+    }
+    
+    static String bbSetter(Type t) {
+        switch(t) {
+            case BYTE:  return "put";
+            case SHORT: return "putShort";
+            case INT:   return "putInt";
+            default:    throw new IllegalArgumentException();
+        }
+    }
+    
+    StringBuilder appendConstants(StringBuilder sb, String indent, int level) {
+        sb = indent(sb, indent, level);
+        sb.append("public static int ITEM_SIZE = ")
+          .append(totalSize)
+          .append(";\n");
+        for (int i = 0; i < fields.size(); ++i) {
+            final Field field = fields.get(i);
+            sb = indent(sb, indent, level);
+            sb.append("public static int ")
+              .append(field.offsetName())
+              .append(" = ")
+              .append(field.offset).append(";\n");
+        }
+        return sb;
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestArenaManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestArenaManager.java
new file mode 100644
index 0000000..aea1f47
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestArenaManager.java
@@ -0,0 +1,165 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.util.ArrayList;
+
+public class RequestArenaManager {
+    
+    private ArrayList<RequestMemoryManager> arenas;
+    private volatile int nextArena; 
+    private ThreadLocal<LocalManager> local;    
+    
+    public RequestArenaManager() {
+        int noArenas = Runtime.getRuntime().availableProcessors() * 2;
+        arenas = new ArrayList<RequestMemoryManager>(noArenas);
+        local = new ThreadLocal<LocalManager>() {
+            @Override
+            protected LocalManager initialValue() {
+                return getNext();
+            }
+        };
+    }
+    
+    public static int arenaId(int i) {
+        return (i >> 24) & 0xff;
+    }
+
+    public static int localId(int i) {
+        return i & 0xffffff;
+    }
+
+    public synchronized LocalManager getNext() {
+        RequestMemoryManager mgr = arenas.get(nextArena);
+        if (mgr == null) {
+            mgr = new RequestMemoryManager();
+            arenas.set(nextArena, mgr);
+        }
+        LocalManager res = new LocalManager();
+        res.mgr = mgr;
+        res.arenaId = nextArena;
+        nextArena = (nextArena + 1) % arenas.size();
+        return res;
+    }
+    
+    public RequestMemoryManager get(int i) {
+        return arenas.get(i);
+    }
+    
+    public RequestMemoryManager local() {
+        return local.get().mgr;
+    }
+    
+    public int getResourceId(int slotNum) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        return local.get().mgr.getResourceId(localId(slotNum));
+    }
+
+    public void setResourceId(int slotNum, int value) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        local.get().mgr.setResourceId(localId(slotNum), value);
+    }
+
+    public int getLockMode(int slotNum) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        return local.get().mgr.getLockMode(localId(slotNum));
+    }
+
+    public void setLockMode(int slotNum, int value) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        local.get().mgr.setLockMode(localId(slotNum), value);
+    }
+
+    public int getJobId(int slotNum) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        return local.get().mgr.getJobId(localId(slotNum));
+    }
+
+    public void setJobId(int slotNum, int value) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        local.get().mgr.setJobId(localId(slotNum), value);
+    }
+
+    public int getPrevJobRequest(int slotNum) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        return local.get().mgr.getPrevJobRequest(localId(slotNum));
+    }
+
+    public void setPrevJobRequest(int slotNum, int value) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        local.get().mgr.setPrevJobRequest(localId(slotNum), value);
+    }
+
+    public int getNextJobRequest(int slotNum) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        return local.get().mgr.getNextJobRequest(localId(slotNum));
+    }
+
+    public void setNextJobRequest(int slotNum, int value) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        local.get().mgr.setNextJobRequest(localId(slotNum), value);
+    }
+
+    public int getNextRequest(int slotNum) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        return local.get().mgr.getNextRequest(localId(slotNum));
+    }
+
+    public void setNextRequest(int slotNum, int value) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        local.get().mgr.setNextRequest(localId(slotNum), value);
+    }
+
+    
+    static class LocalManager {
+        int arenaId;
+        RequestMemoryManager mgr;
+    }
+}
+
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestMemoryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestMemoryManager.java
new file mode 100644
index 0000000..df139b5
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestMemoryManager.java
@@ -0,0 +1,273 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+
+public class RequestMemoryManager {
+
+    public static final int SHRINK_TIMER_THRESHOLD = 120000; //2min
+
+    static final int NO_SLOTS = 10;
+    static final int NEXT_FREE_SLOT_OFFSET = 0;
+
+    public static int ITEM_SIZE = 24;
+    public static int RESOURCE_ID_OFF = 0;
+    public static int LOCK_MODE_OFF = 4;
+    public static int JOB_ID_OFF = 8;
+    public static int PREV_JOB_REQUEST_OFF = 12;
+    public static int NEXT_JOB_REQUEST_OFF = 16;
+    public static int NEXT_REQUEST_OFF = 20;
+
+
+    private ArrayList<Buffer> buffers;
+    private int current;
+    private int occupiedSlots;
+    private long shrinkTimer;
+    private boolean isShrinkTimerOn;
+    
+    public int allocate() {
+        if (buffers.get(current).isFull()) {
+            int size = buffers.size();
+            boolean needNewBuffer = true;
+            Buffer buffer;
+
+            for (int i = 0; i < size; i++) {
+                buffer = buffers.get(i);
+                if (! buffer.isInitialized()) {
+                    buffer.initialize();
+                    current = i;
+                    needNewBuffer = false;
+                    break;
+                }
+            }
+
+            if (needNewBuffer) {
+                buffers.add(new Buffer());
+                current = buffers.size() - 1;
+            }
+        }
+        ++occupiedSlots;
+        return buffers.get(current).allocate() + current * NO_SLOTS;
+    }
+
+    void deallocate(int slotNum) {
+        buffers.get(slotNum / NO_SLOTS).deallocate(slotNum % NO_SLOTS);
+        --occupiedSlots;
+
+        if (needShrink()) {
+            shrink();
+        }
+    }
+
+    /**
+     * Shrink policy:
+     * Shrink when the resource under-utilization lasts for a certain amount of time.
+     * TODO Need to figure out which of the policies is better
+     * case1.
+     * buffers status : O x x x x x O (O is initialized, x is deinitialized)
+     * In the above status, 'CURRENT' needShrink() returns 'TRUE'
+     * even if there is nothing to shrink or deallocate.
+     * It doesn't distinguish the deinitialized children from initialized children
+     * by calculating totalNumOfSlots = buffers.size() * ChildEntityLockInfoArrayManager.NUM_OF_SLOTS.
+     * In other words, it doesn't subtract the deinitialized children's slots.
+     * case2.
+     * buffers status : O O x x x x x
+     * However, in the above case, if we subtract the deinitialized children's slots,
+     * needShrink() will return false even if we shrink the buffers at this case.
+     * 
+     * @return
+     */
+    private boolean needShrink() {
+        int size = buffers.size();
+        int usedSlots = occupiedSlots;
+        if (usedSlots == 0) {
+            usedSlots = 1;
+        }
+
+        if (size > 1 && size * NO_SLOTS / usedSlots >= 3) {
+            if (isShrinkTimerOn) {
+                if (System.currentTimeMillis() - shrinkTimer >= SHRINK_TIMER_THRESHOLD) {
+                    isShrinkTimerOn = false;
+                    return true;
+                }
+            } else {
+                //turn on timer
+                isShrinkTimerOn = true;
+                shrinkTimer = System.currentTimeMillis();
+            }
+        } else {
+            //turn off timer
+            isShrinkTimerOn = false;
+        }
+
+        return false;
+    }
+
+    /**
+     * Shrink() may
+     * deinitialize(:deallocates ByteBuffer of child) Children(s) or
+     * shrink buffers according to the deinitialized children's contiguity status.
+     * It doesn't deinitialze or shrink more than half of children at a time.
+     */
+    private void shrink() {
+        int i;
+        int removeCount = 0;
+        int size = buffers.size();
+        int maxDecreaseCount = size / 2;
+        Buffer buffer;
+
+        //The first buffer never be deinitialized.
+        for (i = 1; i < size; i++) {
+            if (buffers.get(i).isEmpty()) {
+                buffers.get(i).deinitialize();
+            }
+        }
+
+        //remove the empty buffers from the end
+        for (i = size - 1; i >= 1; i--) {
+            buffer = buffers.get(i);
+            if (! buffer.isInitialized()) {
+                buffers.remove(i);
+                if (++removeCount == maxDecreaseCount) {
+                    break;
+                }
+            } else {
+                break;
+            }
+        }
+        
+        //reset allocChild to the first buffer
+        current = 0;
+
+        isShrinkTimerOn = false;
+    }
+
+    public int getResourceId(int slotNum) {
+        final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+        return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + RESOURCE_ID_OFF);
+    }
+
+    public void setResourceId(int slotNum, int value) {
+        final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+          b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + RESOURCE_ID_OFF, value);
+        }
+
+    public int getLockMode(int slotNum) {
+        final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+        return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + LOCK_MODE_OFF);
+    }
+
+    public void setLockMode(int slotNum, int value) {
+        final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+          b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + LOCK_MODE_OFF, value);
+        }
+
+    public int getJobId(int slotNum) {
+        final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+        return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + JOB_ID_OFF);
+    }
+
+    public void setJobId(int slotNum, int value) {
+        final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+          b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + JOB_ID_OFF, value);
+        }
+
+    public int getPrevJobRequest(int slotNum) {
+        final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+        return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + PREV_JOB_REQUEST_OFF);
+    }
+
+    public void setPrevJobRequest(int slotNum, int value) {
+        final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+          b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + PREV_JOB_REQUEST_OFF, value);
+        }
+
+    public int getNextJobRequest(int slotNum) {
+        final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+        return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_JOB_REQUEST_OFF);
+    }
+
+    public void setNextJobRequest(int slotNum, int value) {
+        final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+          b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_JOB_REQUEST_OFF, value);
+        }
+
+    public int getNextRequest(int slotNum) {
+        final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+        return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_REQUEST_OFF);
+    }
+
+    public void setNextRequest(int slotNum, int value) {
+        final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+          b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_REQUEST_OFF, value);
+        }
+
+    
+
+    static class Buffer {
+        private ByteBuffer bb;
+        private int freeSlotNum;
+        private int occupiedSlots; //-1 represents 'deinitialized' state.
+
+        void initialize() {
+            bb = ByteBuffer.allocate(NO_SLOTS * ITEM_SIZE);
+            freeSlotNum = 0;
+            occupiedSlots = 0;
+
+            for (int i = 0; i < NO_SLOTS - 1; i++) {
+                setNextFreeSlot(i, i + 1);
+            }
+            setNextFreeSlot(NO_SLOTS - 1, -1); //-1 represents EOL(end of link)
+        }
+        
+        public void deinitialize() {
+            bb = null;
+            occupiedSlots = -1;
+        }
+        
+        public boolean isInitialized() {
+            return occupiedSlots >= 0;
+        }
+
+        public boolean isFull() {
+            return occupiedSlots == NO_SLOTS;
+        }
+
+        public boolean isEmpty() {
+            return occupiedSlots == 0;
+        }
+        
+        public int allocate() {
+            int currentSlot = freeSlotNum;
+            freeSlotNum = getNextFreeSlot(currentSlot);
+            occupiedSlots++;
+            if (LockManager.IS_DEBUG_MODE) {
+                System.out.println(Thread.currentThread().getName()
+                                   + " allocate: " + currentSlot);
+            }
+            return currentSlot;
+        }
+    
+        public void deallocate(int slotNum) {
+            setNextFreeSlot(slotNum, freeSlotNum);
+            freeSlotNum = slotNum;
+            occupiedSlots--;
+            if (LockManager.IS_DEBUG_MODE) {
+                System.out.println(Thread.currentThread().getName()
+                                   + " deallocate: " + slotNum);
+            }
+        }
+
+        public int getNextFreeSlot(int slotNum) {
+            return bb.getInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFFSET);
+        }    
+            
+        public void setNextFreeSlot(int slotNum, int nextFreeSlot) {
+            bb.putInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFFSET, nextFreeSlot);
+        }
+    }
+    
+}
+
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceArenaManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceArenaManager.java
new file mode 100644
index 0000000..b7bceca
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceArenaManager.java
@@ -0,0 +1,183 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.util.ArrayList;
+
+public class ResourceArenaManager {
+    
+    private ArrayList<ResourceMemoryManager> arenas;
+    private volatile int nextArena; 
+    private ThreadLocal<LocalManager> local;    
+    
+    public ResourceArenaManager() {
+        int noArenas = Runtime.getRuntime().availableProcessors() * 2;
+        arenas = new ArrayList<ResourceMemoryManager>(noArenas);
+        local = new ThreadLocal<LocalManager>() {
+            @Override
+            protected LocalManager initialValue() {
+                return getNext();
+            }
+        };
+    }
+    
+    public static int arenaId(int i) {
+        return (i >> 24) & 0xff;
+    }
+
+    public static int localId(int i) {
+        return i & 0xffffff;
+    }
+
+    public synchronized LocalManager getNext() {
+        ResourceMemoryManager mgr = arenas.get(nextArena);
+        if (mgr == null) {
+            mgr = new ResourceMemoryManager();
+            arenas.set(nextArena, mgr);
+        }
+        LocalManager res = new LocalManager();
+        res.mgr = mgr;
+        res.arenaId = nextArena;
+        nextArena = (nextArena + 1) % arenas.size();
+        return res;
+    }
+    
+    public synchronized ResourceMemoryManager get(int i) {
+        return arenas.get(i);
+    }
+    
+    public ResourceMemoryManager local() {
+        return local.get().mgr;
+    }
+    
+    public int getLastHolder(int slotNum) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        return local.get().mgr.getLastHolder(localId(slotNum));
+    }
+
+    public void setLastHolder(int slotNum, int value) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        local.get().mgr.setLastHolder(localId(slotNum), value);
+    }
+
+    public int getFirstWaiter(int slotNum) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        return local.get().mgr.getFirstWaiter(localId(slotNum));
+    }
+
+    public void setFirstWaiter(int slotNum, int value) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        local.get().mgr.setFirstWaiter(localId(slotNum), value);
+    }
+
+    public int getFirstUpgrader(int slotNum) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        return local.get().mgr.getFirstUpgrader(localId(slotNum));
+    }
+
+    public void setFirstUpgrader(int slotNum, int value) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        local.get().mgr.setFirstUpgrader(localId(slotNum), value);
+    }
+
+    public int getMaxMode(int slotNum) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        return local.get().mgr.getMaxMode(localId(slotNum));
+    }
+
+    public void setMaxMode(int slotNum, int value) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        local.get().mgr.setMaxMode(localId(slotNum), value);
+    }
+
+    public int getDatasetId(int slotNum) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        return local.get().mgr.getDatasetId(localId(slotNum));
+    }
+
+    public void setDatasetId(int slotNum, int value) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        local.get().mgr.setDatasetId(localId(slotNum), value);
+    }
+
+    public int getPkHashVal(int slotNum) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        return local.get().mgr.getPkHashVal(localId(slotNum));
+    }
+
+    public void setPkHashVal(int slotNum, int value) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        local.get().mgr.setPkHashVal(localId(slotNum), value);
+    }
+
+    public int getNext(int slotNum) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        return local.get().mgr.getNext(localId(slotNum));
+    }
+
+    public void setNext(int slotNum, int value) {
+        final int arenaId = arenaId(slotNum);
+        if (arenaId != local.get().arenaId) {
+            local.get().arenaId = arenaId;
+            local.get().mgr = get(arenaId);
+        }
+        local.get().mgr.setNext(localId(slotNum), value);
+    }
+
+    
+    static class LocalManager {
+        int arenaId;
+        ResourceMemoryManager mgr;
+    }
+}
+
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceMemoryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceMemoryManager.java
new file mode 100644
index 0000000..b49c7f8
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceMemoryManager.java
@@ -0,0 +1,288 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+
+public class ResourceMemoryManager {
+
+    public static final int SHRINK_TIMER_THRESHOLD = 120000; //2min
+
+    static final int NO_SLOTS = 10;
+    static final int NEXT_FREE_SLOT_OFFSET = 0;
+
+    public static int ITEM_SIZE = 28;
+    public static int LAST_HOLDER_OFF = 0;
+    public static int FIRST_WAITER_OFF = 4;
+    public static int FIRST_UPGRADER_OFF = 8;
+    public static int MAX_MODE_OFF = 12;
+    public static int DATASET_ID_OFF = 16;
+    public static int PK_HASH_VAL_OFF = 20;
+    public static int NEXT_OFF = 24;
+
+
+    private ArrayList<Buffer> buffers;
+    private int current;
+    private int occupiedSlots;
+    private long shrinkTimer;
+    private boolean isShrinkTimerOn;
+    
+    public int allocate() {
+        if (buffers.get(current).isFull()) {
+            int size = buffers.size();
+            boolean needNewBuffer = true;
+            Buffer buffer;
+
+            for (int i = 0; i < size; i++) {
+                buffer = buffers.get(i);
+                if (! buffer.isInitialized()) {
+                    buffer.initialize();
+                    current = i;
+                    needNewBuffer = false;
+                    break;
+                }
+            }
+
+            if (needNewBuffer) {
+                buffers.add(new Buffer());
+                current = buffers.size() - 1;
+            }
+        }
+        ++occupiedSlots;
+        return buffers.get(current).allocate() + current * NO_SLOTS;
+    }
+
+    void deallocate(int slotNum) {
+        buffers.get(slotNum / NO_SLOTS).deallocate(slotNum % NO_SLOTS);
+        --occupiedSlots;
+
+        if (needShrink()) {
+            shrink();
+        }
+    }
+
+    /**
+     * Shrink policy:
+     * Shrink when the resource under-utilization lasts for a certain amount of time.
+     * TODO Need to figure out which of the policies is better
+     * case1.
+     * buffers status : O x x x x x O (O is initialized, x is deinitialized)
+     * In the above status, 'CURRENT' needShrink() returns 'TRUE'
+     * even if there is nothing to shrink or deallocate.
+     * It doesn't distinguish the deinitialized children from initialized children
+     * by calculating totalNumOfSlots = buffers.size() * ChildEntityLockInfoArrayManager.NUM_OF_SLOTS.
+     * In other words, it doesn't subtract the deinitialized children's slots.
+     * case2.
+     * buffers status : O O x x x x x
+     * However, in the above case, if we subtract the deinitialized children's slots,
+     * needShrink() will return false even if we shrink the buffers at this case.
+     * 
+     * @return
+     */
+    private boolean needShrink() {
+        int size = buffers.size();
+        int usedSlots = occupiedSlots;
+        if (usedSlots == 0) {
+            usedSlots = 1;
+        }
+
+        if (size > 1 && size * NO_SLOTS / usedSlots >= 3) {
+            if (isShrinkTimerOn) {
+                if (System.currentTimeMillis() - shrinkTimer >= SHRINK_TIMER_THRESHOLD) {
+                    isShrinkTimerOn = false;
+                    return true;
+                }
+            } else {
+                //turn on timer
+                isShrinkTimerOn = true;
+                shrinkTimer = System.currentTimeMillis();
+            }
+        } else {
+            //turn off timer
+            isShrinkTimerOn = false;
+        }
+
+        return false;
+    }
+
+    /**
+     * Shrink() may
+     * deinitialize(:deallocates ByteBuffer of child) Children(s) or
+     * shrink buffers according to the deinitialized children's contiguity status.
+     * It doesn't deinitialze or shrink more than half of children at a time.
+     */
+    private void shrink() {
+        int i;
+        int removeCount = 0;
+        int size = buffers.size();
+        int maxDecreaseCount = size / 2;
+        Buffer buffer;
+
+        //The first buffer never be deinitialized.
+        for (i = 1; i < size; i++) {
+            if (buffers.get(i).isEmpty()) {
+                buffers.get(i).deinitialize();
+            }
+        }
+
+        //remove the empty buffers from the end
+        for (i = size - 1; i >= 1; i--) {
+            buffer = buffers.get(i);
+            if (! buffer.isInitialized()) {
+                buffers.remove(i);
+                if (++removeCount == maxDecreaseCount) {
+                    break;
+                }
+            } else {
+                break;
+            }
+        }
+        
+        //reset allocChild to the first buffer
+        current = 0;
+
+        isShrinkTimerOn = false;
+    }
+
+    public int getLastHolder(int slotNum) {
+      final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+      return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + LAST_HOLDER_OFF);
+    }
+
+    public void setLastHolder(int slotNum, int value) {
+      final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+      b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + LAST_HOLDER_OFF, value);
+    }
+
+    public int getFirstWaiter(int slotNum) {
+      final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+      return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_WAITER_OFF);
+    }
+
+    public void setFirstWaiter(int slotNum, int value) {
+      final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+      b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_WAITER_OFF, value);
+    }
+
+    public int getFirstUpgrader(int slotNum) {
+      final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+      return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_UPGRADER_OFF);
+    }
+
+    public void setFirstUpgrader(int slotNum, int value) {
+      final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+      b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_UPGRADER_OFF, value);
+    }
+
+    public int getMaxMode(int slotNum) {
+      final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+      return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + MAX_MODE_OFF);
+    }
+
+    public void setMaxMode(int slotNum, int value) {
+      final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+      b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + MAX_MODE_OFF, value);
+    }
+
+    public int getDatasetId(int slotNum) {
+      final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+      return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + DATASET_ID_OFF);
+    }
+
+    public void setDatasetId(int slotNum, int value) {
+      final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+      b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + DATASET_ID_OFF, value);
+    }
+
+    public int getPkHashVal(int slotNum) {
+      final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+      return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + PK_HASH_VAL_OFF);
+    }
+
+    public void setPkHashVal(int slotNum, int value) {
+      final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+      b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + PK_HASH_VAL_OFF, value);
+    }
+
+    public int getNext(int slotNum) {
+      final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+      return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_OFF);
+    }
+
+    public void setNext(int slotNum, int value) {
+      final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+      b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_OFF, value);
+    }
+
+    
+
+    static class Buffer {
+        private ByteBuffer bb;
+        private int freeSlotNum;
+        private int occupiedSlots; //-1 represents 'deinitialized' state.
+
+        void initialize() {
+            bb = ByteBuffer.allocate(NO_SLOTS * ITEM_SIZE);
+            freeSlotNum = 0;
+            occupiedSlots = 0;
+
+            for (int i = 0; i < NO_SLOTS - 1; i++) {
+                setNextFreeSlot(i, i + 1);
+            }
+            setNextFreeSlot(NO_SLOTS - 1, -1); //-1 represents EOL(end of link)
+        }
+        
+        public void deinitialize() {
+            bb = null;
+            occupiedSlots = -1;
+        }
+        
+        public boolean isInitialized() {
+            return occupiedSlots >= 0;
+        }
+
+        public boolean isFull() {
+            return occupiedSlots == NO_SLOTS;
+        }
+
+        public boolean isEmpty() {
+            return occupiedSlots == 0;
+        }
+        
+        public int allocate() {
+            int currentSlot = freeSlotNum;
+            freeSlotNum = getNextFreeSlot(currentSlot);
+            bb.putInt(currentSlot * ITEM_SIZE + LAST_HOLDER_OFF, -1);
+            bb.putInt(currentSlot * ITEM_SIZE + FIRST_WAITER_OFF, -1);
+            bb.putInt(currentSlot * ITEM_SIZE + FIRST_UPGRADER_OFF, -1);
+            bb.putInt(currentSlot * ITEM_SIZE + MAX_MODE_OFF, LockMode.NL);
+            occupiedSlots++;
+            if (LockManager.IS_DEBUG_MODE) {
+                System.out.println(Thread.currentThread().getName()
+                                   + " allocate: " + currentSlot);
+            }
+            return currentSlot;
+        }
+    
+        public void deallocate(int slotNum) {
+            setNextFreeSlot(slotNum, freeSlotNum);
+            freeSlotNum = slotNum;
+            occupiedSlots--;
+            if (LockManager.IS_DEBUG_MODE) {
+                System.out.println(Thread.currentThread().getName()
+                                   + " deallocate: " + slotNum);
+            }
+        }
+
+        public int getNextFreeSlot(int slotNum) {
+            return bb.getInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFFSET);
+        }    
+            
+        public void setNextFreeSlot(int slotNum, int nextFreeSlot) {
+            bb.putInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFFSET, nextFreeSlot);
+        }
+    }
+    
+}
+
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/StructuredMemoryManager.java.txt b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/StructuredMemoryManager.java.txt
new file mode 100644
index 0000000..d28919d
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/StructuredMemoryManager.java.txt
@@ -0,0 +1,207 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+
+public class @E@MemoryManager {
+
+    public static final int SHRINK_TIMER_THRESHOLD = 120000; //2min
+
+    static final int NO_SLOTS = 10;
+    static final int NEXT_FREE_SLOT_OFFSET = 0;
+
+    @CONSTS@
+
+    private ArrayList<Buffer> buffers;
+    private int current;
+    private int occupiedSlots;
+    private long shrinkTimer;
+    private boolean isShrinkTimerOn;
+    
+    public int allocate() {
+        if (buffers.get(current).isFull()) {
+            int size = buffers.size();
+            boolean needNewBuffer = true;
+            Buffer buffer;
+
+            for (int i = 0; i < size; i++) {
+                buffer = buffers.get(i);
+                if (! buffer.isInitialized()) {
+                    buffer.initialize();
+                    current = i;
+                    needNewBuffer = false;
+                    break;
+                }
+            }
+
+            if (needNewBuffer) {
+                buffers.add(new Buffer());
+                current = buffers.size() - 1;
+            }
+        }
+        ++occupiedSlots;
+        return buffers.get(current).allocate() + current * NO_SLOTS;
+    }
+
+    void deallocate(int slotNum) {
+        buffers.get(slotNum / NO_SLOTS).deallocate(slotNum % NO_SLOTS);
+        --occupiedSlots;
+
+        if (needShrink()) {
+            shrink();
+        }
+    }
+
+    /**
+     * Shrink policy:
+     * Shrink when the resource under-utilization lasts for a certain amount of time.
+     * TODO Need to figure out which of the policies is better
+     * case1.
+     * buffers status : O x x x x x O (O is initialized, x is deinitialized)
+     * In the above status, 'CURRENT' needShrink() returns 'TRUE'
+     * even if there is nothing to shrink or deallocate.
+     * It doesn't distinguish the deinitialized children from initialized children
+     * by calculating totalNumOfSlots = buffers.size() * ChildEntityLockInfoArrayManager.NUM_OF_SLOTS.
+     * In other words, it doesn't subtract the deinitialized children's slots.
+     * case2.
+     * buffers status : O O x x x x x
+     * However, in the above case, if we subtract the deinitialized children's slots,
+     * needShrink() will return false even if we shrink the buffers at this case.
+     * 
+     * @return
+     */
+    private boolean needShrink() {
+        int size = buffers.size();
+        int usedSlots = occupiedSlots;
+        if (usedSlots == 0) {
+            usedSlots = 1;
+        }
+
+        if (size > 1 && size * NO_SLOTS / usedSlots >= 3) {
+            if (isShrinkTimerOn) {
+                if (System.currentTimeMillis() - shrinkTimer >= SHRINK_TIMER_THRESHOLD) {
+                    isShrinkTimerOn = false;
+                    return true;
+                }
+            } else {
+                //turn on timer
+                isShrinkTimerOn = true;
+                shrinkTimer = System.currentTimeMillis();
+            }
+        } else {
+            //turn off timer
+            isShrinkTimerOn = false;
+        }
+
+        return false;
+    }
+
+    /**
+     * Shrink() may
+     * deinitialize(:deallocates ByteBuffer of child) Children(s) or
+     * shrink buffers according to the deinitialized children's contiguity status.
+     * It doesn't deinitialze or shrink more than half of children at a time.
+     */
+    private void shrink() {
+        int i;
+        int removeCount = 0;
+        int size = buffers.size();
+        int maxDecreaseCount = size / 2;
+        Buffer buffer;
+
+        //The first buffer never be deinitialized.
+        for (i = 1; i < size; i++) {
+            if (buffers.get(i).isEmpty()) {
+                buffers.get(i).deinitialize();
+            }
+        }
+
+        //remove the empty buffers from the end
+        for (i = size - 1; i >= 1; i--) {
+            buffer = buffers.get(i);
+            if (! buffer.isInitialized()) {
+                buffers.remove(i);
+                if (++removeCount == maxDecreaseCount) {
+                    break;
+                }
+            } else {
+                break;
+            }
+        }
+        
+        //reset allocChild to the first buffer
+        current = 0;
+
+        isShrinkTimerOn = false;
+    }
+
+    @METHODS@
+    
+
+    static class Buffer {
+        private ByteBuffer bb;
+        private int freeSlotNum;
+        private int occupiedSlots; //-1 represents 'deinitialized' state.
+
+        void initialize() {
+            bb = ByteBuffer.allocate(NO_SLOTS * ITEM_SIZE);
+            freeSlotNum = 0;
+            occupiedSlots = 0;
+
+            for (int i = 0; i < NO_SLOTS - 1; i++) {
+                setNextFreeSlot(i, i + 1);
+            }
+            setNextFreeSlot(NO_SLOTS - 1, -1); //-1 represents EOL(end of link)
+        }
+        
+        public void deinitialize() {
+            bb = null;
+            occupiedSlots = -1;
+        }
+        
+        public boolean isInitialized() {
+            return occupiedSlots >= 0;
+        }
+
+        public boolean isFull() {
+            return occupiedSlots == NO_SLOTS;
+        }
+
+        public boolean isEmpty() {
+            return occupiedSlots == 0;
+        }
+        
+        public int allocate() {
+            int currentSlot = freeSlotNum;
+            freeSlotNum = getNextFreeSlot(currentSlot);
+            @INIT_SLOT@
+            occupiedSlots++;
+            if (LockManager.IS_DEBUG_MODE) {
+                System.out.println(Thread.currentThread().getName()
+                                   + " allocate: " + currentSlot);
+            }
+            return currentSlot;
+        }
+    
+        public void deallocate(int slotNum) {
+            setNextFreeSlot(slotNum, freeSlotNum);
+            freeSlotNum = slotNum;
+            occupiedSlots--;
+            if (LockManager.IS_DEBUG_MODE) {
+                System.out.println(Thread.currentThread().getName()
+                                   + " deallocate: " + slotNum);
+            }
+        }
+
+        public int getNextFreeSlot(int slotNum) {
+            return bb.getInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFFSET);
+        }    
+            
+        public void setNextFreeSlot(int slotNum, int nextFreeSlot) {
+            bb.putInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFFSET, nextFreeSlot);
+        }
+    }
+    
+}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
index 78bca42..28af321 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
@@ -30,16 +30,12 @@
     }
 
     public static class LockManagerConstants {
-        public static final String LOCK_CONF_DIR = "lock_conf";
-        public static final String LOCK_CONF_FILE = "lock.conf";
-        public static final int[] LOCK_CONFLICT_MATRIX = new int[] { 2, 3 };
-        public static final int[] LOCK_CONVERT_MATRIX = new int[] { 2, 0 };
-
         public static class LockMode {
-            public static final byte S = 0;
-            public static final byte X = 1;
-            public static final byte IS = 2;
-            public static final byte IX = 3;
+            public static final byte NL = 0;
+            public static final byte IS = 1;
+            public static final byte IX = 2;
+            public static final byte S  = 3;
+            public static final byte X  = 4;
         }
     }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index aceeb82..4cb5fac 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -22,7 +22,7 @@
 import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
+import edu.uci.ics.asterix.transaction.management.service.locking.ConcurrentLockManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
 import edu.uci.ics.asterix.transaction.management.service.recovery.CheckpointThread;
 import edu.uci.ics.asterix.transaction.management.service.recovery.RecoveryManager;
@@ -46,7 +46,7 @@
         this.id = id;
         this.txnProperties = txnProperties;
         this.transactionManager = new TransactionManager(this);
-        this.lockManager = new LockManager(this);
+        this.lockManager = new ConcurrentLockManager(this);
         this.logManager = new LogManager(this);
         this.recoveryManager = new RecoveryManager(this);
         this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;