work in progress
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ArenaManager.java.txt b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ArenaManager.java.txt
new file mode 100644
index 0000000..332167a
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ArenaManager.java.txt
@@ -0,0 +1,57 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.util.ArrayList;
+
+public class @E@ArenaManager {
+
+ private ArrayList<@E@MemoryManager> arenas;
+ private volatile int nextArena;
+ private ThreadLocal<LocalManager> local;
+
+ public @E@ArenaManager() {
+ int noArenas = Runtime.getRuntime().availableProcessors() * 2;
+ arenas = new ArrayList<@E@MemoryManager>(noArenas);
+ local = new ThreadLocal<LocalManager>() {
+ @Override
+ protected LocalManager initialValue() {
+ return getNext();
+ }
+ };
+ }
+
+ public static int arenaId(int i) {
+ return (i >> 24) & 0xff;
+ }
+
+ public static int localId(int i) {
+ return i & 0xffffff;
+ }
+
+ public synchronized LocalManager getNext() {
+ @E@MemoryManager mgr = arenas.get(nextArena);
+ if (mgr == null) {
+ mgr = new @E@MemoryManager();
+ arenas.set(nextArena, mgr);
+ }
+ LocalManager res = new LocalManager();
+ res.mgr = mgr;
+ res.arenaId = nextArena;
+ nextArena = (nextArena + 1) % arenas.size();
+ return res;
+ }
+
+ public @E@MemoryManager get(int i) {
+ return arenas.get(i);
+ }
+
+ public @E@MemoryManager local() {
+ return local.get().mgr;
+ }
+
+ @METHODS@
+
+ static class LocalManager {
+ int arenaId;
+ @E@MemoryManager mgr;
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
new file mode 100644
index 0000000..7abb71b
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -0,0 +1,761 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.DatasetId;
+import edu.uci.ics.asterix.common.transactions.ILockManager;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
+import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+
+/**
+ * An implementation of the ILockManager interface for the
+ * specific case of locking protocol with two lock modes: (S) and (X),
+ * where S lock mode is shown by 0, and X lock mode is shown by 1.
+ *
+ * @author tillw
+ */
+
+public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent {
+
+ public static final boolean IS_DEBUG_MODE = false;//true
+
+ private TransactionSubsystem txnSubsystem;
+
+ private ResourceGroupTable table;
+ private ResourceArenaManager resArenaMgr;
+ private RequestArenaManager reqArenaMgr;
+ private ConcurrentHashMap<Integer, Integer> jobReqMap; // TODO different impl
+
+ enum LockAction {
+ GET,
+ UPD, // special version of GET that updates the max lock mode
+ WAIT
+ }
+
+ static LockAction[][] ACTION_MATRIX = {
+ // new NL IS IX S X
+ { LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD }, // NL
+ { LockAction.WAIT, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT }, // IS
+ { LockAction.WAIT, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT }, // IX
+ { LockAction.WAIT, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT }, // S
+ { LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT } // X
+ };
+
+ public ConcurrentLockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
+ this.txnSubsystem = txnSubsystem;
+
+ this.table = new ResourceGroupTable();
+
+ final int lockManagerShrinkTimer = txnSubsystem.getTransactionProperties()
+ .getLockManagerShrinkTimer();
+
+ resArenaMgr = new ResourceArenaManager();
+ reqArenaMgr = new RequestArenaManager();
+ jobReqMap = new ConcurrentHashMap<>();
+ }
+
+ public AsterixTransactionProperties getTransactionProperties() {
+ return this.txnSubsystem.getTransactionProperties();
+ }
+
+ @Override
+ public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+ throws ACIDException {
+
+ if (entityHashValue != -1) {
+ // get the intention lock on the dataset, if we want to lock an individual item
+ byte dsLockMode = lockMode == LockMode.X ? LockMode.IX : LockMode.IS;
+ lock(datasetId, -1, dsLockMode, txnContext);
+ }
+
+ int dsId = datasetId.getId();
+
+ ResourceGroup group = table.get(datasetId, entityHashValue);
+ group.getLatch();
+
+ // 1) Find the resource in the hash table
+
+ int resSlot = findResourceInGroup(group, dsId, entityHashValue);
+
+ if (resSlot == -1) {
+ // we don't know about this resource, let's alloc a slot
+ ResourceMemoryManager resMgr = resArenaMgr.local();
+ resSlot = resMgr.allocate();
+ resMgr.setDatasetId(resSlot, datasetId.getId());
+ resMgr.setPkHashVal(resSlot, entityHashValue);
+
+ if (group.firstResourceIndex.get() == -1) {
+ group.firstResourceIndex.set(resSlot);
+ }
+ }
+
+ // 2) create a request entry
+
+ int jobId = txnContext.getJobId().getId();
+ int reqSlot = reqArenaMgr.local().allocate();
+ reqArenaMgr.setResourceId(reqSlot, resSlot);
+ reqArenaMgr.setLockMode(reqSlot, lockMode); // lock mode is a byte!!
+ reqArenaMgr.setJobId(reqSlot, jobId);
+
+ Integer headOfJobReqQueue = jobReqMap.putIfAbsent(jobId, reqSlot);
+ if (headOfJobReqQueue != null) {
+ // TODO make sure this works (even if the job gets removed from the table)
+ while (!jobReqMap.replace(jobId, headOfJobReqQueue, reqSlot)) {
+ headOfJobReqQueue = jobReqMap.putIfAbsent(jobId, reqSlot);
+ }
+ }
+ // this goes across arenas
+
+ reqArenaMgr.setNextJobRequest(reqSlot, headOfJobReqQueue);
+ reqArenaMgr.setPrevJobRequest(reqSlot, -1);
+ reqArenaMgr.setNextJobRequest(headOfJobReqQueue, reqSlot);
+
+ // 3) check lock compatibility
+
+ boolean locked = false;
+
+ while (! locked) {
+ int curLockMode = resArenaMgr.getMaxMode(resSlot);
+ switch (ACTION_MATRIX[curLockMode][lockMode]) {
+ case UPD:
+ resArenaMgr.setMaxMode(resSlot, lockMode);
+ // no break
+ case GET:
+ addHolderToResource(resSlot, reqSlot);
+ locked = true;
+ break;
+ case WAIT:
+ // TODO can we have more than on upgrader? Or do we need to
+ // abort if we get a second upgrader?
+ if (findLastHolderForJob(resSlot, jobId) != -1) {
+ addUpgraderToResource(resSlot, reqSlot);
+ } else {
+ addWaiterToResource(resSlot, reqSlot);
+ }
+ try {
+ group.await();
+ } catch (InterruptedException e) {
+ throw new ACIDException(txnContext, "interrupted", e);
+ }
+ break;
+ }
+
+ // TODO where do we check for deadlocks?
+ }
+
+ group.releaseLatch();
+
+ //internalLock(datasetId, entityHashValue, lockMode, txnContext, false);
+ }
+
+ @Override
+ public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException {
+
+ ResourceGroup group = table.get(datasetId, entityHashValue);
+ group.getLatch();
+
+ int dsId = datasetId.getId();
+ int resource = findResourceInGroup(group, dsId, entityHashValue);
+
+ if (resource < 0) {
+ throw new IllegalStateException("resource (" + dsId + ", " + entityHashValue + ") not found");
+ }
+
+ int jobId = txnContext.getJobId().getId();
+ // since locking is properly nested, finding the last holder is good enough
+
+ int holder = resArenaMgr.getLastHolder(resource);
+ if (holder < 0) {
+ throw new IllegalStateException("no holder for resource " + resource);
+ }
+
+ // remove from the list of holders
+ if (reqArenaMgr.getJobId(holder) == jobId) {
+ int next = reqArenaMgr.getNextRequest(holder);
+ resArenaMgr.setLastHolder(resource, next);
+ } else {
+ int prev = holder;
+ while (prev != -1) {
+ holder = reqArenaMgr.getNextRequest(prev);
+ if (holder == -1) {
+ throw new IllegalStateException("no holder for resource " + resource + " and job " + jobId);
+ }
+ if (jobId == reqArenaMgr.getJobId(holder)) {
+ break;
+ }
+ prev = holder;
+ }
+ int next = reqArenaMgr.getNextRequest(holder);
+ reqArenaMgr.setNextRequest(prev, next);
+ }
+
+ // remove from the list of requests for a job
+ int prevForJob = reqArenaMgr.getPrevJobRequest(holder);
+ int nextForJob = reqArenaMgr.getNextJobRequest(holder);
+ if (prevForJob == -1) {
+ if (nextForJob == -1) {
+ jobReqMap.remove(jobId);
+ } else {
+ jobReqMap.put(jobId, nextForJob);
+ }
+ } else {
+ reqArenaMgr.setNextJobRequest(prevForJob, nextForJob);
+ }
+ if (nextForJob != -1) {
+ reqArenaMgr.setPrevJobRequest(nextForJob, prevForJob);
+ }
+
+ // deallocate request
+ reqArenaMgr.local().deallocate(holder);
+ // deallocate resource or fix max lock mode
+ if (resourceNotUsed(resource)) {
+ int prev = group.firstResourceIndex.get();
+ if (prev == resource) {
+ group.firstResourceIndex.set(resArenaMgr.getNext(resource));
+ } else {
+ while (resArenaMgr.getNext(prev) != resource) {
+ prev = resArenaMgr.getNext(prev);
+ }
+ resArenaMgr.setNext(prev, resArenaMgr.getNext(resource));
+ }
+ resArenaMgr.local().deallocate(resource);
+ } else {
+ final int oldMaxMode = resArenaMgr.getMaxMode(resource);
+ final int newMaxMode = getNewMaxMode(resource, oldMaxMode);
+ resArenaMgr.setMaxMode(resource, newMaxMode);
+ if (oldMaxMode != newMaxMode) {
+ // the locking mode didn't change, current waiters won't be
+ // able to acquire the lock, so we do not need to signal them
+ group.wakeUp();
+ }
+ }
+ group.releaseLatch();
+
+ // finally remove the intention lock as well
+ if (entityHashValue != -1) {
+ // remove the intention lock on the dataset, if we want to lock an individual item
+ unlock(datasetId, -1, txnContext);
+ }
+
+ //internalUnlock(datasetId, entityHashValue, txnContext, false, false);
+ }
+
+ @Override
+ public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
+ int jobId = txnContext.getJobId().getId();
+ Integer head = jobReqMap.get(jobId);
+ while (head != null) {
+ int resource = reqArenaMgr.getResourceId(head);
+ int dsId = resArenaMgr.getDatasetId(resource);
+ int pkHashVal = resArenaMgr.getPkHashVal(resource);
+ unlock(new DatasetId(dsId), pkHashVal, txnContext);
+ head = jobReqMap.get(jobId);
+ }
+ }
+
+ private int findLastHolderForJob(int resource, int job) {
+ int holder = resArenaMgr.getLastHolder(resource);
+ while (holder != -1) {
+ if (job == reqArenaMgr.getJobId(holder)) {
+ return holder;
+ }
+ holder = reqArenaMgr.getNextRequest(holder);
+ }
+ return -1;
+ }
+
+ private int findResourceInGroup(ResourceGroup group, int dsId, int entityHashValue) {
+ int resSlot = group.firstResourceIndex.get();
+ // we're looking in the local one as that should be set correctly // TODO make sure!
+ ResourceMemoryManager resMgr = resArenaMgr.local();
+ while (resSlot != -1) {
+ // either we already have a lock on this resource or we have a
+ // hash collision
+ if (resMgr.getDatasetId(resSlot) == dsId &&
+ resMgr.getPkHashVal(resSlot) == entityHashValue) {
+ return resSlot;
+ } else {
+ resSlot = resMgr.getNext(resSlot);
+ }
+ }
+ return -1;
+ }
+
+ private void addHolderToResource(int resource, int request) {
+ final int lastHolder = resArenaMgr.getLastHolder(resource);
+ reqArenaMgr.setNextRequest(request, lastHolder);
+ resArenaMgr.setLastHolder(resource, request);
+ }
+
+ private void addWaiterToResource(int resource, int request) {
+ int waiter = resArenaMgr.getFirstWaiter(resource);
+ reqArenaMgr.setNextRequest(request, -1);
+ if (waiter == -1) {
+ resArenaMgr.setFirstWaiter(resource, request);
+ } else {
+ appendToRequestQueue(waiter, request);
+ }
+ }
+
+ private void addUpgraderToResource(int resource, int request) {
+ int upgrader = resArenaMgr.getFirstUpgrader(resource);
+ reqArenaMgr.setNextRequest(request, -1);
+ if (upgrader == -1) {
+ resArenaMgr.setFirstUpgrader(resource, request);
+ } else {
+ appendToRequestQueue(upgrader, request);
+ }
+ }
+
+ private void appendToRequestQueue(int head, int appendee) {
+ int next = reqArenaMgr.getNextRequest(head);
+ while(next != -1) {
+ head = next;
+ next = reqArenaMgr.getNextRequest(head);
+ }
+ reqArenaMgr.setNextRequest(head, appendee);
+ }
+
+ private int getNewMaxMode(int resource, int oldMaxMode) {
+ int newMaxMode = LockMode.NL;
+ int holder = resArenaMgr.getLastHolder(resource);
+ while (holder != -1) {
+ int curLockMode = reqArenaMgr.getLockMode(holder);
+ if (curLockMode == oldMaxMode) {
+ // we have another lock of the same mode - we're done
+ return oldMaxMode;
+ }
+ switch (ACTION_MATRIX[newMaxMode][curLockMode]) {
+ case UPD:
+ newMaxMode = curLockMode;
+ break;
+ case GET:
+ break;
+ case WAIT:
+ throw new IllegalStateException("incompatible locks in holder queue");
+ }
+ }
+ return newMaxMode;
+ }
+
+ private boolean resourceNotUsed(int resource) {
+ return resArenaMgr.getLastHolder(resource) == -1
+ && resArenaMgr.getFirstUpgrader(resource) == -1
+ && resArenaMgr.getFirstWaiter(resource) == -1;
+ }
+
+
+
+ private void validateJob(ITransactionContext txnContext) throws ACIDException {
+ if (txnContext.getTxnState() == ITransactionManager.ABORTED) {
+ throw new ACIDException("" + txnContext.getJobId() + " is in ABORTED state.");
+ } else if (txnContext.isTimeout()) {
+ requestAbort(txnContext);
+ }
+ }
+
+ //@Override
+ public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
+ throws ACIDException {
+ throw new IllegalStateException();
+ //internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
+ }
+
+ private void instantUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
+ throws ACIDException {
+ throw new IllegalStateException();
+ //internalUnlock(datasetId, entityHashValue, txnContext, true, false);
+ }
+
+ @Override
+ public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+ throws ACIDException {
+
+ // try {
+ // internalLock(datasetId, entityHashValue, lockMode, txnContext);
+ // return;
+ // } finally {
+ // unlock(datasetId, entityHashValue, txnContext);
+ // }
+ throw new IllegalStateException();
+ //internalLock(datasetId, entityHashValue, lockMode, txnContext, true);
+ //instantUnlock(datasetId, entityHashValue, txnContext);
+ }
+
+ @Override
+ public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+ throws ACIDException {
+ throw new IllegalStateException();
+ //return internalTryLock(datasetId, entityHashValue, lockMode, txnContext, false);
+ }
+
+ @Override
+ public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
+ ITransactionContext txnContext) throws ACIDException {
+ throw new IllegalStateException();
+ //return internalInstantTryLock(datasetId, entityHashValue, lockMode, txnContext);
+ }
+
+ private void trackLockRequest(String msg, int requestType, DatasetId datasetIdObj, int entityHashValue,
+ byte lockMode, ITransactionContext txnContext, DatasetLockInfo dLockInfo, int eLockInfo) {
+/*
+ StringBuilder s = new StringBuilder();
+ LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType, datasetIdObj,
+ entityHashValue, lockMode, txnContext);
+ s.append(Thread.currentThread().getId() + ":");
+ s.append(msg);
+ if (msg.equals("Granted")) {
+ if (dLockInfo != null) {
+ s.append("\t|D| ");
+ s.append(dLockInfo.getIXCount()).append(",");
+ s.append(dLockInfo.getISCount()).append(",");
+ s.append(dLockInfo.getXCount()).append(",");
+ s.append(dLockInfo.getSCount()).append(",");
+ if (dLockInfo.getFirstUpgrader() != -1) {
+ s.append("+");
+ } else {
+ s.append("-");
+ }
+ s.append(",");
+ if (dLockInfo.getFirstWaiter() != -1) {
+ s.append("+");
+ } else {
+ s.append("-");
+ }
+ }
+
+ if (eLockInfo != -1) {
+ s.append("\t|E| ");
+ s.append(entityLockInfoManager.getXCount(eLockInfo)).append(",");
+ s.append(entityLockInfoManager.getSCount(eLockInfo)).append(",");
+ if (entityLockInfoManager.getUpgrader(eLockInfo) != -1) {
+ s.append("+");
+ } else {
+ s.append("-");
+ }
+ s.append(",");
+ if (entityLockInfoManager.getFirstWaiter(eLockInfo) != -1) {
+ s.append("+");
+ } else {
+ s.append("-");
+ }
+ }
+ }
+
+ lockRequestTracker.addEvent(s.toString(), request);
+ if (msg.equals("Requested")) {
+ lockRequestTracker.addRequest(request);
+ }
+ System.out.println(request.prettyPrint() + "--> " + s.toString());
+*/
+ }
+
+ public String getHistoryForAllJobs() {
+/* if (IS_DEBUG_MODE) {
+ return lockRequestTracker.getHistoryForAllJobs();
+ }
+*/
+ return null;
+ }
+
+ public String getHistoryPerJob() {
+/* if (IS_DEBUG_MODE) {
+ return lockRequestTracker.getHistoryPerJob();
+ }
+*/
+ return null;
+ }
+
+ public String getRequestHistoryForAllJobs() {
+/* if (IS_DEBUG_MODE) {
+ return lockRequestTracker.getRequestHistoryForAllJobs();
+ }
+*/
+ return null;
+ }
+
+ private void requestAbort(ITransactionContext txnContext) throws ACIDException {
+ txnContext.setTimeout(true);
+ throw new ACIDException("Transaction " + txnContext.getJobId()
+ + " should abort (requested by the Lock Manager)");
+ }
+
+ /**
+ * For now, upgrading lock granule from entity-granule to dataset-granule is not supported!!
+ *
+ * @param fromLockMode
+ * @param toLockMode
+ * @return
+ */
+ private boolean isLockUpgrade(byte fromLockMode, byte toLockMode) {
+ return fromLockMode == LockMode.S && toLockMode == LockMode.X;
+ }
+
+ @Override
+ public String prettyPrint() throws ACIDException {
+ StringBuilder s = new StringBuilder("\n########### LockManager Status #############\n");
+ return s + "\n";
+ }
+
+ @Override
+ public void start() {
+ //no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) {
+ if (dumpState) {
+
+ //#. dump Configurable Variables
+ dumpConfVars(os);
+
+ //#. dump jobHT
+ dumpJobInfo(os);
+
+ //#. dump datasetResourceHT
+ dumpDatasetLockInfo(os);
+
+ //#. dump entityLockInfoManager
+ dumpEntityLockInfo(os);
+
+ //#. dump entityInfoManager
+ dumpEntityInfo(os);
+
+ //#. dump lockWaiterManager
+
+ dumpLockWaiterInfo(os);
+ try {
+ os.flush();
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+ }
+
+ private void dumpConfVars(OutputStream os) {
+ try {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+ sb.append("\nESCALATE_TRHESHOLD_ENTITY_TO_DATASET: "
+ + txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
+// sb.append("\nSHRINK_TIMER_THRESHOLD (entityLockInfoManager): "
+// + entityLockInfoManager.getShrinkTimerThreshold());
+// sb.append("\nSHRINK_TIMER_THRESHOLD (entityInfoManager): " + entityInfoManager.getShrinkTimerThreshold());
+// sb.append("\nSHRINK_TIMER_THRESHOLD (lockWaiterManager): " + lockWaiterManager.getShrinkTimerThreshold());
+ sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpJobInfo(OutputStream os) {
+ JobId jobId;
+ JobInfo jobInfo;
+ StringBuilder sb = new StringBuilder();
+/*
+ try {
+ sb.append("\n>>dump_begin\t>>----- [JobInfo] -----");
+ Set<Map.Entry<JobId, JobInfo>> entrySet = jobHT.entrySet();
+ if (entrySet != null) {
+ for (Map.Entry<JobId, JobInfo> entry : entrySet) {
+ if (entry != null) {
+ jobId = entry.getKey();
+ if (jobId != null) {
+ sb.append("\n" + jobId);
+ } else {
+ sb.append("\nJID:null");
+ }
+
+ jobInfo = entry.getValue();
+ if (jobInfo != null) {
+ sb.append(jobInfo.coreDump());
+ } else {
+ sb.append("\nJobInfo:null");
+ }
+ }
+ }
+ }
+ sb.append("\n>>dump_end\t>>----- [JobInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+*/
+ }
+
+ private void dumpDatasetLockInfo(OutputStream os) {
+/*
+ DatasetId datasetId;
+ DatasetLockInfo datasetLockInfo;
+ StringBuilder sb = new StringBuilder();
+
+ try {
+ sb.append("\n>>dump_begin\t>>----- [DatasetLockInfo] -----");
+ Set<Map.Entry<DatasetId, DatasetLockInfo>> entrySet = datasetResourceHT.entrySet();
+ if (entrySet != null) {
+ for (Map.Entry<DatasetId, DatasetLockInfo> entry : entrySet) {
+ if (entry != null) {
+ datasetId = entry.getKey();
+ if (datasetId != null) {
+ sb.append("\nDatasetId:" + datasetId.getId());
+ } else {
+ sb.append("\nDatasetId:null");
+ }
+
+ datasetLockInfo = entry.getValue();
+ if (datasetLockInfo != null) {
+ sb.append(datasetLockInfo.coreDump());
+ } else {
+ sb.append("\nDatasetLockInfo:null");
+ }
+ }
+ sb.append("\n>>dump_end\t>>----- [DatasetLockInfo] -----\n");
+ os.write(sb.toString().getBytes());
+
+ //create a new sb to avoid possible OOM exception
+ sb = new StringBuilder();
+ }
+ }
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+*/
+ }
+
+ private void dumpEntityLockInfo(OutputStream os) {
+/*
+ StringBuilder sb = new StringBuilder();
+ try {
+ sb.append("\n>>dump_begin\t>>----- [EntityLockInfo] -----");
+ entityLockInfoManager.coreDump(os);
+ sb.append("\n>>dump_end\t>>----- [EntityLockInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+*/
+ }
+
+ private void dumpEntityInfo(OutputStream os) {
+/*
+ StringBuilder sb = new StringBuilder();
+ try {
+ sb.append("\n>>dump_begin\t>>----- [EntityInfo] -----");
+ entityInfoManager.coreDump(os);
+ sb.append("\n>>dump_end\t>>----- [EntityInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+*/
+ }
+
+ private void dumpLockWaiterInfo(OutputStream os) {
+/*
+ StringBuilder sb = new StringBuilder();
+ try {
+ sb.append("\n>>dump_begin\t>>----- [LockWaiterInfo] -----");
+ lockWaiterManager.coreDump(os);
+ sb.append("\n>>dump_end\t>>----- [LockWaiterInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+*/
+ }
+
+ private static class ResourceGroupTable {
+ public static final int TABLE_SIZE = 10; // TODO increase
+
+ private ResourceGroup[] table;
+
+ public ResourceGroupTable() {
+ for (int i = 0; i < TABLE_SIZE; ++i) {
+ table[i] = new ResourceGroup();
+ }
+ }
+
+ ResourceGroup get(DatasetId dId, int entityHashValue) {
+ // TODO ensure good properties of hash function
+ int h = dId.getId() ^ entityHashValue;
+ return table[h % TABLE_SIZE];
+ }
+ }
+
+ private static class ResourceGroup {
+ private ReentrantReadWriteLock latch;
+ private Condition condition;
+ AtomicInteger firstResourceIndex;
+
+ ResourceGroup() {
+ latch = new ReentrantReadWriteLock();
+ condition = latch.writeLock().newCondition();
+ firstResourceIndex = new AtomicInteger(-1);
+ }
+
+ void getLatch() {
+ latch.writeLock().lock();
+ }
+
+ void releaseLatch() {
+ latch.writeLock().unlock();
+ }
+
+ boolean hasWaiters() {
+ return latch.hasQueuedThreads();
+ }
+
+ void await() throws InterruptedException {
+ condition.await();
+ }
+
+ void wakeUp() {
+ condition.signalAll();
+ }
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Generator.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Generator.java
new file mode 100644
index 0000000..c530883
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/Generator.java
@@ -0,0 +1,116 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import edu.uci.ics.asterix.transaction.management.service.locking.RecordType.Field;
+
+public class Generator {
+ public static void main(String args[]) {
+
+ RecordType resource = new RecordType("Resource");
+ resource.addField("last holder", RecordType.Type.INT, "-1");
+ resource.addField("first waiter", RecordType.Type.INT, "-1");
+ resource.addField("first upgrader", RecordType.Type.INT, "-1");
+ resource.addField("max mode", RecordType.Type.INT, "LockMode.NL");
+ resource.addField("dataset id", RecordType.Type.INT, null);
+ resource.addField("pk hash val", RecordType.Type.INT, null);
+ resource.addField("next", RecordType.Type.INT, null);
+
+ RecordType request = new RecordType("Request");
+ request.addField("resource id", RecordType.Type.INT, null);
+ request.addField("lock mode", RecordType.Type.INT, null);
+ request.addField("job id", RecordType.Type.INT, null);
+ request.addField("prev job request", RecordType.Type.INT, null);
+ request.addField("next job request", RecordType.Type.INT, null);
+ request.addField("next request", RecordType.Type.INT, null);
+
+
+ StringBuilder sb = new StringBuilder();
+
+ //generateMemoryManagerSource(request, sb);
+ //generateMemoryManagerSource(resource, sb);
+ generateArenaManagerSource(request, sb);
+ //generateArenaManagerSource(resource, sb);
+
+ System.out.println(sb.toString());
+ }
+
+ private static void generateMemoryManagerSource(RecordType resource, StringBuilder sb) {
+ InputStream is = resource.getClass().getResourceAsStream("StructuredMemoryManager.java.txt");
+ if (is == null) {
+ throw new IllegalStateException();
+ }
+ BufferedReader in = new BufferedReader(new InputStreamReader(is));
+ String line = null;
+
+ try {
+
+ String indent = " ";
+
+ while((line = in.readLine()) != null) {
+ if (line.contains("@E@")) {
+ line = line.replace("@E@", resource.name);
+ }
+ if (line.contains("@CONSTS@")) {
+ resource.appendConstants(sb, indent, 1);
+ sb.append('\n');
+ } else if (line.contains("@METHODS@")) {
+ for (int i = 0; i < resource.size(); ++i) {
+ final Field field = resource.fields.get(i);
+ field.appendMemoryManagerGetMethod(sb, indent, 1);
+ sb.append('\n');
+ field.appendMemoryManagerSetMethod(sb, indent, 1);
+ sb.append('\n');
+ }
+ } else if (line.contains("@INIT_SLOT@")) {
+ for (int i = 0; i < resource.size(); ++i) {
+ final Field field = resource.fields.get(i);
+ field.appendInitializers(sb, indent, 1);
+ }
+ } else {
+ sb.append(line).append('\n');
+ }
+ }
+
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ }
+
+ private static void generateArenaManagerSource(RecordType resource, StringBuilder sb) {
+ InputStream is = resource.getClass().getResourceAsStream("ArenaManager.java.txt");
+ if (is == null) {
+ throw new IllegalStateException();
+ }
+ BufferedReader in = new BufferedReader(new InputStreamReader(is));
+ String line = null;
+
+ try {
+
+ String indent = " ";
+
+ while((line = in.readLine()) != null) {
+ if (line.contains("@E@")) {
+ line = line.replace("@E@", resource.name);
+ }
+ if (line.contains("@METHODS@")) {
+ for (int i = 0; i < resource.size(); ++i) {
+ final Field field = resource.fields.get(i);
+ field.appendArenaManagerGetMethod(sb, indent, 1);
+ sb.append('\n');
+ field.appendArenaManagerSetMethod(sb, indent, 1);
+ sb.append('\n');
+ }
+ } else {
+ sb.append(line).append('\n');
+ }
+ }
+
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordType.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordType.java
new file mode 100644
index 0000000..dc0f252
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RecordType.java
@@ -0,0 +1,235 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.util.ArrayList;
+
+public class RecordType {
+
+ enum Type {
+ BYTE,
+ SHORT,
+ INT
+ }
+
+ static class Field {
+
+ String name;
+ Type type;
+ String initial;
+ int offset;
+
+ Field(String name, Type type, String initial, int offset) {
+ this.name = name;
+ this.type = type;
+ this.initial = initial;
+ this.offset = offset;
+ }
+
+ String methodName(String prefix) {
+ String words[] = name.split(" ");
+ assert(words.length > 0);
+ StringBuilder sb = new StringBuilder(prefix);
+ for (int j = 0; j < words.length; ++j) {
+ String word = words[j];
+ sb.append(word.substring(0, 1).toUpperCase());
+ sb.append(word.substring(1));
+ }
+ return sb.toString();
+ }
+
+ StringBuilder appendMemoryManagerGetMethod(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public ")
+ .append(name(type))
+ .append(' ')
+ .append(methodName("get"))
+ .append("(int slotNum) {\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("return b.")
+ .append(bbGetter(type))
+ .append("((slotNum % NO_SLOTS) * ITEM_SIZE + ")
+ .append(offsetName())
+ .append(");\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ return sb;
+ }
+
+ StringBuilder appendMemoryManagerSetMethod(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public void ")
+ .append(methodName("set"))
+ .append("(int slotNum, ")
+ .append(name(type))
+ .append(" value) {\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append(" b.")
+ .append(bbSetter(type))
+ .append("((slotNum % NO_SLOTS) * ITEM_SIZE + ")
+ .append(offsetName())
+ .append(", value);\n");
+ sb = indent(sb, indent, level);
+ sb.append(indent)
+ .append("}\n");
+ return sb;
+ }
+
+ StringBuilder appendArenaManagerSetThreadLocal(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("final int arenaId = arenaId(slotNum);\n");
+ sb = indent(sb, indent, level);
+ sb.append("if (arenaId != local.get().arenaId) {\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("local.get().arenaId = arenaId;\n");
+ sb = indent(sb, indent, level + 1);
+ sb.append("local.get().mgr = get(arenaId);\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ return sb;
+ }
+
+ StringBuilder appendArenaManagerGetMethod(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public ")
+ .append(name(type))
+ .append(' ')
+ .append(methodName("get"))
+ .append("(int slotNum) {\n");
+ sb = appendArenaManagerSetThreadLocal(sb, indent, level + 1);
+ sb = indent(sb, indent, level + 1);
+ sb.append("return local.get().mgr.")
+ .append(methodName("get"))
+ .append("(localId(slotNum));\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ return sb;
+ }
+
+ StringBuilder appendArenaManagerSetMethod(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public void ")
+ .append(methodName("set"))
+ .append("(int slotNum, ")
+ .append(name(type))
+ .append(" value) {\n");
+ sb = appendArenaManagerSetThreadLocal(sb, indent, level + 1);
+ sb = indent(sb, indent, level + 1);
+ sb.append("local.get().mgr.")
+ .append(methodName("set"))
+ .append("(localId(slotNum), value);\n");
+ sb = indent(sb, indent, level);
+ sb.append("}\n");
+ return sb;
+ }
+
+ StringBuilder appendInitializers(StringBuilder sb, String indent, int level) {
+ if (initial != null) {
+ sb = indent(sb, indent, level);
+ sb.append("bb.")
+ .append(bbSetter(type))
+ .append("(currentSlot * ITEM_SIZE + ")
+ .append(offsetName())
+ .append(", ")
+ .append(initial)
+ .append(");\n");
+ }
+ return sb;
+ }
+
+ String offsetName() {
+ String words[] = name.split(" ");
+ assert(words.length > 0);
+ StringBuilder sb = new StringBuilder(words[0].toUpperCase());
+ for (int j = 1; j < words.length; ++j) {
+ sb.append("_").append(words[j].toUpperCase());
+ }
+ sb.append("_OFF");
+ return sb.toString();
+ }
+
+ int offset() {
+ return offset;
+ }
+ }
+
+ String name;
+ ArrayList<Field> fields;
+ int totalSize;
+
+ static StringBuilder indent(StringBuilder sb, String indent, int level) {
+ for (int i = 0; i < level; ++i) {
+ sb.append(indent);
+ }
+ return sb;
+ }
+
+ RecordType(String name) {
+ this.name = name;
+ fields = new ArrayList<Field>();
+ totalSize = 0;
+ }
+
+ void addField(String name, Type type, String initial) {
+ fields.add(new Field(name, type, initial, totalSize));
+ totalSize += size(type);
+ }
+
+ int size() {
+ return fields.size();
+ }
+
+ static int size(Type t) {
+ switch(t) {
+ case BYTE: return 1;
+ case SHORT: return 2;
+ case INT: return 4;
+ default: throw new IllegalArgumentException();
+ }
+ }
+
+ static String name(Type t) {
+ switch(t) {
+ case BYTE: return "byte";
+ case SHORT: return "short";
+ case INT: return "int";
+ default: throw new IllegalArgumentException();
+ }
+ }
+
+ static String bbGetter(Type t) {
+ switch(t) {
+ case BYTE: return "get";
+ case SHORT: return "getShort";
+ case INT: return "getInt";
+ default: throw new IllegalArgumentException();
+ }
+ }
+
+ static String bbSetter(Type t) {
+ switch(t) {
+ case BYTE: return "put";
+ case SHORT: return "putShort";
+ case INT: return "putInt";
+ default: throw new IllegalArgumentException();
+ }
+ }
+
+ StringBuilder appendConstants(StringBuilder sb, String indent, int level) {
+ sb = indent(sb, indent, level);
+ sb.append("public static int ITEM_SIZE = ")
+ .append(totalSize)
+ .append(";\n");
+ for (int i = 0; i < fields.size(); ++i) {
+ final Field field = fields.get(i);
+ sb = indent(sb, indent, level);
+ sb.append("public static int ")
+ .append(field.offsetName())
+ .append(" = ")
+ .append(field.offset).append(";\n");
+ }
+ return sb;
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestArenaManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestArenaManager.java
new file mode 100644
index 0000000..aea1f47
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestArenaManager.java
@@ -0,0 +1,165 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.util.ArrayList;
+
+public class RequestArenaManager {
+
+ private ArrayList<RequestMemoryManager> arenas;
+ private volatile int nextArena;
+ private ThreadLocal<LocalManager> local;
+
+ public RequestArenaManager() {
+ int noArenas = Runtime.getRuntime().availableProcessors() * 2;
+ arenas = new ArrayList<RequestMemoryManager>(noArenas);
+ local = new ThreadLocal<LocalManager>() {
+ @Override
+ protected LocalManager initialValue() {
+ return getNext();
+ }
+ };
+ }
+
+ public static int arenaId(int i) {
+ return (i >> 24) & 0xff;
+ }
+
+ public static int localId(int i) {
+ return i & 0xffffff;
+ }
+
+ public synchronized LocalManager getNext() {
+ RequestMemoryManager mgr = arenas.get(nextArena);
+ if (mgr == null) {
+ mgr = new RequestMemoryManager();
+ arenas.set(nextArena, mgr);
+ }
+ LocalManager res = new LocalManager();
+ res.mgr = mgr;
+ res.arenaId = nextArena;
+ nextArena = (nextArena + 1) % arenas.size();
+ return res;
+ }
+
+ public RequestMemoryManager get(int i) {
+ return arenas.get(i);
+ }
+
+ public RequestMemoryManager local() {
+ return local.get().mgr;
+ }
+
+ public int getResourceId(int slotNum) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ return local.get().mgr.getResourceId(localId(slotNum));
+ }
+
+ public void setResourceId(int slotNum, int value) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ local.get().mgr.setResourceId(localId(slotNum), value);
+ }
+
+ public int getLockMode(int slotNum) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ return local.get().mgr.getLockMode(localId(slotNum));
+ }
+
+ public void setLockMode(int slotNum, int value) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ local.get().mgr.setLockMode(localId(slotNum), value);
+ }
+
+ public int getJobId(int slotNum) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ return local.get().mgr.getJobId(localId(slotNum));
+ }
+
+ public void setJobId(int slotNum, int value) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ local.get().mgr.setJobId(localId(slotNum), value);
+ }
+
+ public int getPrevJobRequest(int slotNum) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ return local.get().mgr.getPrevJobRequest(localId(slotNum));
+ }
+
+ public void setPrevJobRequest(int slotNum, int value) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ local.get().mgr.setPrevJobRequest(localId(slotNum), value);
+ }
+
+ public int getNextJobRequest(int slotNum) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ return local.get().mgr.getNextJobRequest(localId(slotNum));
+ }
+
+ public void setNextJobRequest(int slotNum, int value) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ local.get().mgr.setNextJobRequest(localId(slotNum), value);
+ }
+
+ public int getNextRequest(int slotNum) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ return local.get().mgr.getNextRequest(localId(slotNum));
+ }
+
+ public void setNextRequest(int slotNum, int value) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ local.get().mgr.setNextRequest(localId(slotNum), value);
+ }
+
+
+ static class LocalManager {
+ int arenaId;
+ RequestMemoryManager mgr;
+ }
+}
+
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestMemoryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestMemoryManager.java
new file mode 100644
index 0000000..df139b5
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/RequestMemoryManager.java
@@ -0,0 +1,273 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+
+public class RequestMemoryManager {
+
+ public static final int SHRINK_TIMER_THRESHOLD = 120000; //2min
+
+ static final int NO_SLOTS = 10;
+ static final int NEXT_FREE_SLOT_OFFSET = 0;
+
+ public static int ITEM_SIZE = 24;
+ public static int RESOURCE_ID_OFF = 0;
+ public static int LOCK_MODE_OFF = 4;
+ public static int JOB_ID_OFF = 8;
+ public static int PREV_JOB_REQUEST_OFF = 12;
+ public static int NEXT_JOB_REQUEST_OFF = 16;
+ public static int NEXT_REQUEST_OFF = 20;
+
+
+ private ArrayList<Buffer> buffers;
+ private int current;
+ private int occupiedSlots;
+ private long shrinkTimer;
+ private boolean isShrinkTimerOn;
+
+ public int allocate() {
+ if (buffers.get(current).isFull()) {
+ int size = buffers.size();
+ boolean needNewBuffer = true;
+ Buffer buffer;
+
+ for (int i = 0; i < size; i++) {
+ buffer = buffers.get(i);
+ if (! buffer.isInitialized()) {
+ buffer.initialize();
+ current = i;
+ needNewBuffer = false;
+ break;
+ }
+ }
+
+ if (needNewBuffer) {
+ buffers.add(new Buffer());
+ current = buffers.size() - 1;
+ }
+ }
+ ++occupiedSlots;
+ return buffers.get(current).allocate() + current * NO_SLOTS;
+ }
+
+ void deallocate(int slotNum) {
+ buffers.get(slotNum / NO_SLOTS).deallocate(slotNum % NO_SLOTS);
+ --occupiedSlots;
+
+ if (needShrink()) {
+ shrink();
+ }
+ }
+
+ /**
+ * Shrink policy:
+ * Shrink when the resource under-utilization lasts for a certain amount of time.
+ * TODO Need to figure out which of the policies is better
+ * case1.
+ * buffers status : O x x x x x O (O is initialized, x is deinitialized)
+ * In the above status, 'CURRENT' needShrink() returns 'TRUE'
+ * even if there is nothing to shrink or deallocate.
+ * It doesn't distinguish the deinitialized children from initialized children
+ * by calculating totalNumOfSlots = buffers.size() * ChildEntityLockInfoArrayManager.NUM_OF_SLOTS.
+ * In other words, it doesn't subtract the deinitialized children's slots.
+ * case2.
+ * buffers status : O O x x x x x
+ * However, in the above case, if we subtract the deinitialized children's slots,
+ * needShrink() will return false even if we shrink the buffers at this case.
+ *
+ * @return
+ */
+ private boolean needShrink() {
+ int size = buffers.size();
+ int usedSlots = occupiedSlots;
+ if (usedSlots == 0) {
+ usedSlots = 1;
+ }
+
+ if (size > 1 && size * NO_SLOTS / usedSlots >= 3) {
+ if (isShrinkTimerOn) {
+ if (System.currentTimeMillis() - shrinkTimer >= SHRINK_TIMER_THRESHOLD) {
+ isShrinkTimerOn = false;
+ return true;
+ }
+ } else {
+ //turn on timer
+ isShrinkTimerOn = true;
+ shrinkTimer = System.currentTimeMillis();
+ }
+ } else {
+ //turn off timer
+ isShrinkTimerOn = false;
+ }
+
+ return false;
+ }
+
+ /**
+ * Shrink() may
+ * deinitialize(:deallocates ByteBuffer of child) Children(s) or
+ * shrink buffers according to the deinitialized children's contiguity status.
+ * It doesn't deinitialze or shrink more than half of children at a time.
+ */
+ private void shrink() {
+ int i;
+ int removeCount = 0;
+ int size = buffers.size();
+ int maxDecreaseCount = size / 2;
+ Buffer buffer;
+
+ //The first buffer never be deinitialized.
+ for (i = 1; i < size; i++) {
+ if (buffers.get(i).isEmpty()) {
+ buffers.get(i).deinitialize();
+ }
+ }
+
+ //remove the empty buffers from the end
+ for (i = size - 1; i >= 1; i--) {
+ buffer = buffers.get(i);
+ if (! buffer.isInitialized()) {
+ buffers.remove(i);
+ if (++removeCount == maxDecreaseCount) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+ //reset allocChild to the first buffer
+ current = 0;
+
+ isShrinkTimerOn = false;
+ }
+
+ public int getResourceId(int slotNum) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + RESOURCE_ID_OFF);
+ }
+
+ public void setResourceId(int slotNum, int value) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + RESOURCE_ID_OFF, value);
+ }
+
+ public int getLockMode(int slotNum) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + LOCK_MODE_OFF);
+ }
+
+ public void setLockMode(int slotNum, int value) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + LOCK_MODE_OFF, value);
+ }
+
+ public int getJobId(int slotNum) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + JOB_ID_OFF);
+ }
+
+ public void setJobId(int slotNum, int value) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + JOB_ID_OFF, value);
+ }
+
+ public int getPrevJobRequest(int slotNum) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + PREV_JOB_REQUEST_OFF);
+ }
+
+ public void setPrevJobRequest(int slotNum, int value) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + PREV_JOB_REQUEST_OFF, value);
+ }
+
+ public int getNextJobRequest(int slotNum) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_JOB_REQUEST_OFF);
+ }
+
+ public void setNextJobRequest(int slotNum, int value) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_JOB_REQUEST_OFF, value);
+ }
+
+ public int getNextRequest(int slotNum) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_REQUEST_OFF);
+ }
+
+ public void setNextRequest(int slotNum, int value) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_REQUEST_OFF, value);
+ }
+
+
+
+ static class Buffer {
+ private ByteBuffer bb;
+ private int freeSlotNum;
+ private int occupiedSlots; //-1 represents 'deinitialized' state.
+
+ void initialize() {
+ bb = ByteBuffer.allocate(NO_SLOTS * ITEM_SIZE);
+ freeSlotNum = 0;
+ occupiedSlots = 0;
+
+ for (int i = 0; i < NO_SLOTS - 1; i++) {
+ setNextFreeSlot(i, i + 1);
+ }
+ setNextFreeSlot(NO_SLOTS - 1, -1); //-1 represents EOL(end of link)
+ }
+
+ public void deinitialize() {
+ bb = null;
+ occupiedSlots = -1;
+ }
+
+ public boolean isInitialized() {
+ return occupiedSlots >= 0;
+ }
+
+ public boolean isFull() {
+ return occupiedSlots == NO_SLOTS;
+ }
+
+ public boolean isEmpty() {
+ return occupiedSlots == 0;
+ }
+
+ public int allocate() {
+ int currentSlot = freeSlotNum;
+ freeSlotNum = getNextFreeSlot(currentSlot);
+ occupiedSlots++;
+ if (LockManager.IS_DEBUG_MODE) {
+ System.out.println(Thread.currentThread().getName()
+ + " allocate: " + currentSlot);
+ }
+ return currentSlot;
+ }
+
+ public void deallocate(int slotNum) {
+ setNextFreeSlot(slotNum, freeSlotNum);
+ freeSlotNum = slotNum;
+ occupiedSlots--;
+ if (LockManager.IS_DEBUG_MODE) {
+ System.out.println(Thread.currentThread().getName()
+ + " deallocate: " + slotNum);
+ }
+ }
+
+ public int getNextFreeSlot(int slotNum) {
+ return bb.getInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFFSET);
+ }
+
+ public void setNextFreeSlot(int slotNum, int nextFreeSlot) {
+ bb.putInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFFSET, nextFreeSlot);
+ }
+ }
+
+}
+
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceArenaManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceArenaManager.java
new file mode 100644
index 0000000..b7bceca
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceArenaManager.java
@@ -0,0 +1,183 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.util.ArrayList;
+
+public class ResourceArenaManager {
+
+ private ArrayList<ResourceMemoryManager> arenas;
+ private volatile int nextArena;
+ private ThreadLocal<LocalManager> local;
+
+ public ResourceArenaManager() {
+ int noArenas = Runtime.getRuntime().availableProcessors() * 2;
+ arenas = new ArrayList<ResourceMemoryManager>(noArenas);
+ local = new ThreadLocal<LocalManager>() {
+ @Override
+ protected LocalManager initialValue() {
+ return getNext();
+ }
+ };
+ }
+
+ public static int arenaId(int i) {
+ return (i >> 24) & 0xff;
+ }
+
+ public static int localId(int i) {
+ return i & 0xffffff;
+ }
+
+ public synchronized LocalManager getNext() {
+ ResourceMemoryManager mgr = arenas.get(nextArena);
+ if (mgr == null) {
+ mgr = new ResourceMemoryManager();
+ arenas.set(nextArena, mgr);
+ }
+ LocalManager res = new LocalManager();
+ res.mgr = mgr;
+ res.arenaId = nextArena;
+ nextArena = (nextArena + 1) % arenas.size();
+ return res;
+ }
+
+ public synchronized ResourceMemoryManager get(int i) {
+ return arenas.get(i);
+ }
+
+ public ResourceMemoryManager local() {
+ return local.get().mgr;
+ }
+
+ public int getLastHolder(int slotNum) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ return local.get().mgr.getLastHolder(localId(slotNum));
+ }
+
+ public void setLastHolder(int slotNum, int value) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ local.get().mgr.setLastHolder(localId(slotNum), value);
+ }
+
+ public int getFirstWaiter(int slotNum) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ return local.get().mgr.getFirstWaiter(localId(slotNum));
+ }
+
+ public void setFirstWaiter(int slotNum, int value) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ local.get().mgr.setFirstWaiter(localId(slotNum), value);
+ }
+
+ public int getFirstUpgrader(int slotNum) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ return local.get().mgr.getFirstUpgrader(localId(slotNum));
+ }
+
+ public void setFirstUpgrader(int slotNum, int value) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ local.get().mgr.setFirstUpgrader(localId(slotNum), value);
+ }
+
+ public int getMaxMode(int slotNum) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ return local.get().mgr.getMaxMode(localId(slotNum));
+ }
+
+ public void setMaxMode(int slotNum, int value) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ local.get().mgr.setMaxMode(localId(slotNum), value);
+ }
+
+ public int getDatasetId(int slotNum) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ return local.get().mgr.getDatasetId(localId(slotNum));
+ }
+
+ public void setDatasetId(int slotNum, int value) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ local.get().mgr.setDatasetId(localId(slotNum), value);
+ }
+
+ public int getPkHashVal(int slotNum) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ return local.get().mgr.getPkHashVal(localId(slotNum));
+ }
+
+ public void setPkHashVal(int slotNum, int value) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ local.get().mgr.setPkHashVal(localId(slotNum), value);
+ }
+
+ public int getNext(int slotNum) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ return local.get().mgr.getNext(localId(slotNum));
+ }
+
+ public void setNext(int slotNum, int value) {
+ final int arenaId = arenaId(slotNum);
+ if (arenaId != local.get().arenaId) {
+ local.get().arenaId = arenaId;
+ local.get().mgr = get(arenaId);
+ }
+ local.get().mgr.setNext(localId(slotNum), value);
+ }
+
+
+ static class LocalManager {
+ int arenaId;
+ ResourceMemoryManager mgr;
+ }
+}
+
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceMemoryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceMemoryManager.java
new file mode 100644
index 0000000..b49c7f8
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ResourceMemoryManager.java
@@ -0,0 +1,288 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+
+public class ResourceMemoryManager {
+
+ public static final int SHRINK_TIMER_THRESHOLD = 120000; //2min
+
+ static final int NO_SLOTS = 10;
+ static final int NEXT_FREE_SLOT_OFFSET = 0;
+
+ public static int ITEM_SIZE = 28;
+ public static int LAST_HOLDER_OFF = 0;
+ public static int FIRST_WAITER_OFF = 4;
+ public static int FIRST_UPGRADER_OFF = 8;
+ public static int MAX_MODE_OFF = 12;
+ public static int DATASET_ID_OFF = 16;
+ public static int PK_HASH_VAL_OFF = 20;
+ public static int NEXT_OFF = 24;
+
+
+ private ArrayList<Buffer> buffers;
+ private int current;
+ private int occupiedSlots;
+ private long shrinkTimer;
+ private boolean isShrinkTimerOn;
+
+ public int allocate() {
+ if (buffers.get(current).isFull()) {
+ int size = buffers.size();
+ boolean needNewBuffer = true;
+ Buffer buffer;
+
+ for (int i = 0; i < size; i++) {
+ buffer = buffers.get(i);
+ if (! buffer.isInitialized()) {
+ buffer.initialize();
+ current = i;
+ needNewBuffer = false;
+ break;
+ }
+ }
+
+ if (needNewBuffer) {
+ buffers.add(new Buffer());
+ current = buffers.size() - 1;
+ }
+ }
+ ++occupiedSlots;
+ return buffers.get(current).allocate() + current * NO_SLOTS;
+ }
+
+ void deallocate(int slotNum) {
+ buffers.get(slotNum / NO_SLOTS).deallocate(slotNum % NO_SLOTS);
+ --occupiedSlots;
+
+ if (needShrink()) {
+ shrink();
+ }
+ }
+
+ /**
+ * Shrink policy:
+ * Shrink when the resource under-utilization lasts for a certain amount of time.
+ * TODO Need to figure out which of the policies is better
+ * case1.
+ * buffers status : O x x x x x O (O is initialized, x is deinitialized)
+ * In the above status, 'CURRENT' needShrink() returns 'TRUE'
+ * even if there is nothing to shrink or deallocate.
+ * It doesn't distinguish the deinitialized children from initialized children
+ * by calculating totalNumOfSlots = buffers.size() * ChildEntityLockInfoArrayManager.NUM_OF_SLOTS.
+ * In other words, it doesn't subtract the deinitialized children's slots.
+ * case2.
+ * buffers status : O O x x x x x
+ * However, in the above case, if we subtract the deinitialized children's slots,
+ * needShrink() will return false even if we shrink the buffers at this case.
+ *
+ * @return
+ */
+ private boolean needShrink() {
+ int size = buffers.size();
+ int usedSlots = occupiedSlots;
+ if (usedSlots == 0) {
+ usedSlots = 1;
+ }
+
+ if (size > 1 && size * NO_SLOTS / usedSlots >= 3) {
+ if (isShrinkTimerOn) {
+ if (System.currentTimeMillis() - shrinkTimer >= SHRINK_TIMER_THRESHOLD) {
+ isShrinkTimerOn = false;
+ return true;
+ }
+ } else {
+ //turn on timer
+ isShrinkTimerOn = true;
+ shrinkTimer = System.currentTimeMillis();
+ }
+ } else {
+ //turn off timer
+ isShrinkTimerOn = false;
+ }
+
+ return false;
+ }
+
+ /**
+ * Shrink() may
+ * deinitialize(:deallocates ByteBuffer of child) Children(s) or
+ * shrink buffers according to the deinitialized children's contiguity status.
+ * It doesn't deinitialze or shrink more than half of children at a time.
+ */
+ private void shrink() {
+ int i;
+ int removeCount = 0;
+ int size = buffers.size();
+ int maxDecreaseCount = size / 2;
+ Buffer buffer;
+
+ //The first buffer never be deinitialized.
+ for (i = 1; i < size; i++) {
+ if (buffers.get(i).isEmpty()) {
+ buffers.get(i).deinitialize();
+ }
+ }
+
+ //remove the empty buffers from the end
+ for (i = size - 1; i >= 1; i--) {
+ buffer = buffers.get(i);
+ if (! buffer.isInitialized()) {
+ buffers.remove(i);
+ if (++removeCount == maxDecreaseCount) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+ //reset allocChild to the first buffer
+ current = 0;
+
+ isShrinkTimerOn = false;
+ }
+
+ public int getLastHolder(int slotNum) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + LAST_HOLDER_OFF);
+ }
+
+ public void setLastHolder(int slotNum, int value) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + LAST_HOLDER_OFF, value);
+ }
+
+ public int getFirstWaiter(int slotNum) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_WAITER_OFF);
+ }
+
+ public void setFirstWaiter(int slotNum, int value) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_WAITER_OFF, value);
+ }
+
+ public int getFirstUpgrader(int slotNum) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_UPGRADER_OFF);
+ }
+
+ public void setFirstUpgrader(int slotNum, int value) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + FIRST_UPGRADER_OFF, value);
+ }
+
+ public int getMaxMode(int slotNum) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + MAX_MODE_OFF);
+ }
+
+ public void setMaxMode(int slotNum, int value) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + MAX_MODE_OFF, value);
+ }
+
+ public int getDatasetId(int slotNum) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + DATASET_ID_OFF);
+ }
+
+ public void setDatasetId(int slotNum, int value) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + DATASET_ID_OFF, value);
+ }
+
+ public int getPkHashVal(int slotNum) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + PK_HASH_VAL_OFF);
+ }
+
+ public void setPkHashVal(int slotNum, int value) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + PK_HASH_VAL_OFF, value);
+ }
+
+ public int getNext(int slotNum) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ return b.getInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_OFF);
+ }
+
+ public void setNext(int slotNum, int value) {
+ final ByteBuffer b = buffers.get(slotNum / NO_SLOTS).bb;
+ b.putInt((slotNum % NO_SLOTS) * ITEM_SIZE + NEXT_OFF, value);
+ }
+
+
+
+ static class Buffer {
+ private ByteBuffer bb;
+ private int freeSlotNum;
+ private int occupiedSlots; //-1 represents 'deinitialized' state.
+
+ void initialize() {
+ bb = ByteBuffer.allocate(NO_SLOTS * ITEM_SIZE);
+ freeSlotNum = 0;
+ occupiedSlots = 0;
+
+ for (int i = 0; i < NO_SLOTS - 1; i++) {
+ setNextFreeSlot(i, i + 1);
+ }
+ setNextFreeSlot(NO_SLOTS - 1, -1); //-1 represents EOL(end of link)
+ }
+
+ public void deinitialize() {
+ bb = null;
+ occupiedSlots = -1;
+ }
+
+ public boolean isInitialized() {
+ return occupiedSlots >= 0;
+ }
+
+ public boolean isFull() {
+ return occupiedSlots == NO_SLOTS;
+ }
+
+ public boolean isEmpty() {
+ return occupiedSlots == 0;
+ }
+
+ public int allocate() {
+ int currentSlot = freeSlotNum;
+ freeSlotNum = getNextFreeSlot(currentSlot);
+ bb.putInt(currentSlot * ITEM_SIZE + LAST_HOLDER_OFF, -1);
+ bb.putInt(currentSlot * ITEM_SIZE + FIRST_WAITER_OFF, -1);
+ bb.putInt(currentSlot * ITEM_SIZE + FIRST_UPGRADER_OFF, -1);
+ bb.putInt(currentSlot * ITEM_SIZE + MAX_MODE_OFF, LockMode.NL);
+ occupiedSlots++;
+ if (LockManager.IS_DEBUG_MODE) {
+ System.out.println(Thread.currentThread().getName()
+ + " allocate: " + currentSlot);
+ }
+ return currentSlot;
+ }
+
+ public void deallocate(int slotNum) {
+ setNextFreeSlot(slotNum, freeSlotNum);
+ freeSlotNum = slotNum;
+ occupiedSlots--;
+ if (LockManager.IS_DEBUG_MODE) {
+ System.out.println(Thread.currentThread().getName()
+ + " deallocate: " + slotNum);
+ }
+ }
+
+ public int getNextFreeSlot(int slotNum) {
+ return bb.getInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFFSET);
+ }
+
+ public void setNextFreeSlot(int slotNum, int nextFreeSlot) {
+ bb.putInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFFSET, nextFreeSlot);
+ }
+ }
+
+}
+
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/StructuredMemoryManager.java.txt b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/StructuredMemoryManager.java.txt
new file mode 100644
index 0000000..d28919d
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/StructuredMemoryManager.java.txt
@@ -0,0 +1,207 @@
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+
+public class @E@MemoryManager {
+
+ public static final int SHRINK_TIMER_THRESHOLD = 120000; //2min
+
+ static final int NO_SLOTS = 10;
+ static final int NEXT_FREE_SLOT_OFFSET = 0;
+
+ @CONSTS@
+
+ private ArrayList<Buffer> buffers;
+ private int current;
+ private int occupiedSlots;
+ private long shrinkTimer;
+ private boolean isShrinkTimerOn;
+
+ public int allocate() {
+ if (buffers.get(current).isFull()) {
+ int size = buffers.size();
+ boolean needNewBuffer = true;
+ Buffer buffer;
+
+ for (int i = 0; i < size; i++) {
+ buffer = buffers.get(i);
+ if (! buffer.isInitialized()) {
+ buffer.initialize();
+ current = i;
+ needNewBuffer = false;
+ break;
+ }
+ }
+
+ if (needNewBuffer) {
+ buffers.add(new Buffer());
+ current = buffers.size() - 1;
+ }
+ }
+ ++occupiedSlots;
+ return buffers.get(current).allocate() + current * NO_SLOTS;
+ }
+
+ void deallocate(int slotNum) {
+ buffers.get(slotNum / NO_SLOTS).deallocate(slotNum % NO_SLOTS);
+ --occupiedSlots;
+
+ if (needShrink()) {
+ shrink();
+ }
+ }
+
+ /**
+ * Shrink policy:
+ * Shrink when the resource under-utilization lasts for a certain amount of time.
+ * TODO Need to figure out which of the policies is better
+ * case1.
+ * buffers status : O x x x x x O (O is initialized, x is deinitialized)
+ * In the above status, 'CURRENT' needShrink() returns 'TRUE'
+ * even if there is nothing to shrink or deallocate.
+ * It doesn't distinguish the deinitialized children from initialized children
+ * by calculating totalNumOfSlots = buffers.size() * ChildEntityLockInfoArrayManager.NUM_OF_SLOTS.
+ * In other words, it doesn't subtract the deinitialized children's slots.
+ * case2.
+ * buffers status : O O x x x x x
+ * However, in the above case, if we subtract the deinitialized children's slots,
+ * needShrink() will return false even if we shrink the buffers at this case.
+ *
+ * @return
+ */
+ private boolean needShrink() {
+ int size = buffers.size();
+ int usedSlots = occupiedSlots;
+ if (usedSlots == 0) {
+ usedSlots = 1;
+ }
+
+ if (size > 1 && size * NO_SLOTS / usedSlots >= 3) {
+ if (isShrinkTimerOn) {
+ if (System.currentTimeMillis() - shrinkTimer >= SHRINK_TIMER_THRESHOLD) {
+ isShrinkTimerOn = false;
+ return true;
+ }
+ } else {
+ //turn on timer
+ isShrinkTimerOn = true;
+ shrinkTimer = System.currentTimeMillis();
+ }
+ } else {
+ //turn off timer
+ isShrinkTimerOn = false;
+ }
+
+ return false;
+ }
+
+ /**
+ * Shrink() may
+ * deinitialize(:deallocates ByteBuffer of child) Children(s) or
+ * shrink buffers according to the deinitialized children's contiguity status.
+ * It doesn't deinitialze or shrink more than half of children at a time.
+ */
+ private void shrink() {
+ int i;
+ int removeCount = 0;
+ int size = buffers.size();
+ int maxDecreaseCount = size / 2;
+ Buffer buffer;
+
+ //The first buffer never be deinitialized.
+ for (i = 1; i < size; i++) {
+ if (buffers.get(i).isEmpty()) {
+ buffers.get(i).deinitialize();
+ }
+ }
+
+ //remove the empty buffers from the end
+ for (i = size - 1; i >= 1; i--) {
+ buffer = buffers.get(i);
+ if (! buffer.isInitialized()) {
+ buffers.remove(i);
+ if (++removeCount == maxDecreaseCount) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+ //reset allocChild to the first buffer
+ current = 0;
+
+ isShrinkTimerOn = false;
+ }
+
+ @METHODS@
+
+
+ static class Buffer {
+ private ByteBuffer bb;
+ private int freeSlotNum;
+ private int occupiedSlots; //-1 represents 'deinitialized' state.
+
+ void initialize() {
+ bb = ByteBuffer.allocate(NO_SLOTS * ITEM_SIZE);
+ freeSlotNum = 0;
+ occupiedSlots = 0;
+
+ for (int i = 0; i < NO_SLOTS - 1; i++) {
+ setNextFreeSlot(i, i + 1);
+ }
+ setNextFreeSlot(NO_SLOTS - 1, -1); //-1 represents EOL(end of link)
+ }
+
+ public void deinitialize() {
+ bb = null;
+ occupiedSlots = -1;
+ }
+
+ public boolean isInitialized() {
+ return occupiedSlots >= 0;
+ }
+
+ public boolean isFull() {
+ return occupiedSlots == NO_SLOTS;
+ }
+
+ public boolean isEmpty() {
+ return occupiedSlots == 0;
+ }
+
+ public int allocate() {
+ int currentSlot = freeSlotNum;
+ freeSlotNum = getNextFreeSlot(currentSlot);
+ @INIT_SLOT@
+ occupiedSlots++;
+ if (LockManager.IS_DEBUG_MODE) {
+ System.out.println(Thread.currentThread().getName()
+ + " allocate: " + currentSlot);
+ }
+ return currentSlot;
+ }
+
+ public void deallocate(int slotNum) {
+ setNextFreeSlot(slotNum, freeSlotNum);
+ freeSlotNum = slotNum;
+ occupiedSlots--;
+ if (LockManager.IS_DEBUG_MODE) {
+ System.out.println(Thread.currentThread().getName()
+ + " deallocate: " + slotNum);
+ }
+ }
+
+ public int getNextFreeSlot(int slotNum) {
+ return bb.getInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFFSET);
+ }
+
+ public void setNextFreeSlot(int slotNum, int nextFreeSlot) {
+ bb.putInt(slotNum * ITEM_SIZE + NEXT_FREE_SLOT_OFFSET, nextFreeSlot);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
index 78bca42..28af321 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
@@ -30,16 +30,12 @@
}
public static class LockManagerConstants {
- public static final String LOCK_CONF_DIR = "lock_conf";
- public static final String LOCK_CONF_FILE = "lock.conf";
- public static final int[] LOCK_CONFLICT_MATRIX = new int[] { 2, 3 };
- public static final int[] LOCK_CONVERT_MATRIX = new int[] { 2, 0 };
-
public static class LockMode {
- public static final byte S = 0;
- public static final byte X = 1;
- public static final byte IS = 2;
- public static final byte IX = 3;
+ public static final byte NL = 0;
+ public static final byte IS = 1;
+ public static final byte IX = 2;
+ public static final byte S = 3;
+ public static final byte X = 4;
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index aceeb82..4cb5fac 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -22,7 +22,7 @@
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
+import edu.uci.ics.asterix.transaction.management.service.locking.ConcurrentLockManager;
import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
import edu.uci.ics.asterix.transaction.management.service.recovery.CheckpointThread;
import edu.uci.ics.asterix.transaction.management.service.recovery.RecoveryManager;
@@ -46,7 +46,7 @@
this.id = id;
this.txnProperties = txnProperties;
this.transactionManager = new TransactionManager(this);
- this.lockManager = new LockManager(this);
+ this.lockManager = new ConcurrentLockManager(this);
this.logManager = new LogManager(this);
this.recoveryManager = new RecoveryManager(this);
this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;