ASTERIXDB-1118: allow for lock conversion
Also improve debugability of ConcurrentLockManager and add new unit tests.
Change-Id: If49ed8d48fa8c71a52c880d4f42a2badbe6a57d7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/474
Reviewed-by: Taewoo Kim <wangsaeu@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index e5ad1b4..ae4aaa5 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -116,6 +116,6 @@
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
</dependency>
- </dependencies>
+ </dependencies>
</project>
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 3114195..af508f5 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -19,6 +19,14 @@
package org.apache.asterix.transaction.management.service.locking;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -31,30 +39,20 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.common.config.AsterixTransactionProperties;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.DatasetId;
-import org.apache.asterix.common.transactions.ILockManager;
-import org.apache.asterix.common.transactions.ITransactionContext;
-import org.apache.asterix.common.transactions.ITransactionManager;
-import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
-import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
-import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
-
/**
- * An implementation of the ILockManager interface.
+ * A concurrent implementation of the ILockManager interface.
*
- * @author tillw
+ * @see ResourceGroupTable
+ * @see ResourceGroup
*/
public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent {
- private static final Logger LOGGER = Logger.getLogger(ConcurrentLockManager.class.getName());
- private static final Level LVL = Level.FINER;
+ static final Logger LOGGER = Logger.getLogger(ConcurrentLockManager.class.getName());
+ static final Level LVL = Level.FINER;
public static final boolean DEBUG_MODE = false;//true
public static final boolean CHECK_CONSISTENCY = false;
- private TransactionSubsystem txnSubsystem;
private ResourceGroupTable table;
private ResourceArenaManager resArenaMgr;
private RequestArenaManager reqArenaMgr;
@@ -81,22 +79,21 @@
static LockAction[][] ACTION_MATRIX = {
// new NL IS IX S X
- { LockAction.ERR, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD }, // NL
- { LockAction.ERR, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT }, // IS
- { LockAction.ERR, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT }, // IX
- { LockAction.ERR, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT }, // S
- { LockAction.ERR, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT } // X
+ {LockAction.ERR, LockAction.UPD, LockAction.UPD, LockAction.UPD, LockAction.UPD}, // NL
+ {LockAction.ERR, LockAction.GET, LockAction.UPD, LockAction.UPD, LockAction.WAIT}, // IS
+ {LockAction.ERR, LockAction.GET, LockAction.GET, LockAction.WAIT, LockAction.WAIT}, // IX
+ {LockAction.ERR, LockAction.GET, LockAction.WAIT, LockAction.GET, LockAction.WAIT}, // S
+ {LockAction.ERR, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT, LockAction.WAIT} // X
};
- public ConcurrentLockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
- this.txnSubsystem = txnSubsystem;
+ public ConcurrentLockManager(final int lockManagerShrinkTimer) throws ACIDException {
+ this(lockManagerShrinkTimer, Runtime.getRuntime().availableProcessors() * 2, 1024);
+ // TODO increase table size?
+ }
- this.table = new ResourceGroupTable();
-
- final int lockManagerShrinkTimer = txnSubsystem.getTransactionProperties().getLockManagerShrinkTimer();
-
- int noArenas = Runtime.getRuntime().availableProcessors() * 2;
-
+ public ConcurrentLockManager(final int lockManagerShrinkTimer, final int noArenas, final int tableSize) throws
+ ACIDException {
+ this.table = new ResourceGroupTable(tableSize);
resArenaMgr = new ResourceArenaManager(noArenas, lockManagerShrinkTimer);
reqArenaMgr = new RequestArenaManager(noArenas, lockManagerShrinkTimer);
jobArenaMgr = new JobArenaManager(noArenas, lockManagerShrinkTimer);
@@ -108,10 +105,6 @@
};
}
- public AsterixTransactionProperties getTransactionProperties() {
- return this.txnSubsystem.getTransactionProperties();
- }
-
@Override
public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
@@ -142,6 +135,18 @@
while (!locked) {
final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
switch (act) {
+ case CONV:
+ if (introducesDeadlock(resSlot, jobSlot, NOPTracker.INSTANCE)) {
+ DeadlockTracker tracker = new CollectingTracker();
+ tracker.pushJob(jobSlot);
+ introducesDeadlock(resSlot, jobSlot, tracker);
+ requestAbort(txnContext, tracker.toString());
+ break;
+ } else if (hasOtherHolders(resSlot, jobSlot)) {
+ enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext);
+ break;
+ }
+ //no break
case UPD:
resArenaMgr.setMaxMode(resSlot, lockMode);
// no break
@@ -150,7 +155,6 @@
locked = true;
break;
case WAIT:
- case CONV:
enqueueWaiter(group, reqSlot, resSlot, jobSlot, act, txnContext);
break;
case ERR:
@@ -161,6 +165,8 @@
if (entityHashValue == -1) {
dsLockCache.get().put(jobId, dsId, lockMode);
}
+ } catch (InterruptedException e) {
+ throw new WaitInterruptedException(txnContext, "interrupted", e);
} finally {
group.releaseLatch();
}
@@ -170,7 +176,8 @@
}
private void enqueueWaiter(final ResourceGroup group, final long reqSlot, final long resSlot, final long jobSlot,
- final LockAction act, ITransactionContext txnContext) throws ACIDException {
+ final LockAction act, ITransactionContext txnContext) throws ACIDException,
+ InterruptedException {
final Queue queue = act.modify ? upgrader : waiter;
if (introducesDeadlock(resSlot, jobSlot, NOPTracker.INSTANCE)) {
DeadlockTracker tracker = new CollectingTracker();
@@ -214,6 +221,9 @@
}
static class CollectingTracker implements DeadlockTracker {
+
+ static final boolean DEBUG = false;
+
ArrayList<Long> slots = new ArrayList<Long>();
ArrayList<String> types = new ArrayList<String>();
@@ -221,26 +231,26 @@
public void pushResource(long resSlot) {
types.add("Resource");
slots.add(resSlot);
- System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ if (DEBUG) System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
}
@Override
public void pushRequest(long reqSlot) {
types.add("Request");
slots.add(reqSlot);
- System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ if (DEBUG) System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
}
@Override
public void pushJob(long jobSlot) {
types.add("Job");
slots.add(jobSlot);
- System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ if (DEBUG) System.err.println("push " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
}
@Override
public void pop() {
- System.err.println("pop " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
+ if (DEBUG) System.err.println("pop " + types.get(types.size() - 1) + " " + slots.get(slots.size() - 1));
types.remove(types.size() - 1);
slots.remove(slots.size() - 1);
}
@@ -257,15 +267,19 @@
/**
* determine if adding a job to the waiters of a resource will introduce a
- * cycle in the wait-graph where the job waits on itself
- *
- * @param resSlot
- * the slot that contains the information about the resource
- * @param jobSlot
- * the slot that contains the information about the job
+ * cycle in the wait-graph where the job waits on itself - but not directly on itself (which happens e.g. in the
+ * case of upgrading a lock from S to X).
+ *
+ * @param resSlot the slot that contains the information about the resource
+ * @param jobSlot the slot that contains the information about the job
* @return true if a cycle would be introduced, false otherwise
*/
private boolean introducesDeadlock(final long resSlot, final long jobSlot, final DeadlockTracker tracker) {
+ return introducesDeadlock(resSlot, jobSlot, tracker, 0);
+ }
+
+ private boolean introducesDeadlock(final long resSlot, final long jobSlot,
+ final DeadlockTracker tracker, final int depth) {
synchronized (jobArenaMgr) {
tracker.pushResource(resSlot);
long reqSlot = resArenaMgr.getLastHolder(resSlot);
@@ -273,14 +287,22 @@
tracker.pushRequest(reqSlot);
final long holderJobSlot = reqArenaMgr.getJobSlot(reqSlot);
tracker.pushJob(holderJobSlot);
- if (holderJobSlot == jobSlot) {
+ if (holderJobSlot == jobSlot && depth != 0) {
return true;
}
+
+ // To determine if we have a deadlock we need to look at the waiters and at the upgraders.
+ // The scanWaiters flag indicates if we are currently scanning the waiters (true) or the upgraders
+ // (false).
boolean scanWaiters = true;
long waiter = jobArenaMgr.getLastWaiter(holderJobSlot);
+ if (waiter < 0 && scanWaiters) {
+ scanWaiters = false;
+ waiter = jobArenaMgr.getLastUpgrader(holderJobSlot);
+ }
while (waiter >= 0) {
- long watingOnResSlot = reqArenaMgr.getResourceId(waiter);
- if (introducesDeadlock(watingOnResSlot, jobSlot, tracker)) {
+ long waitingOnResSlot = reqArenaMgr.getResourceId(waiter);
+ if (introducesDeadlock(waitingOnResSlot, jobSlot, tracker, depth + 1)) {
return true;
}
waiter = reqArenaMgr.getNextJobRequest(waiter);
@@ -289,6 +311,7 @@
waiter = jobArenaMgr.getLastUpgrader(holderJobSlot);
}
}
+
tracker.pop(); // job
tracker.pop(); // request
reqSlot = reqArenaMgr.getNextRequest(reqSlot);
@@ -354,6 +377,8 @@
throw new IllegalStateException();
}
}
+ } catch (InterruptedException e) {
+ throw new WaitInterruptedException(txnContext, "interrupted", e);
} finally {
if (reqSlot != -1) {
// deallocate request, if we allocated one earlier
@@ -422,7 +447,7 @@
@Override
public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
- ITransactionContext txnContext) throws ACIDException {
+ ITransactionContext txnContext) throws ACIDException {
log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
stats.instantTryLock();
@@ -524,17 +549,15 @@
final int oldMaxMode = resArenaMgr.getMaxMode(resource);
final int newMaxMode = determineNewMaxMode(resource, oldMaxMode);
resArenaMgr.setMaxMode(resource, newMaxMode);
- if (oldMaxMode != newMaxMode) {
- // the locking mode didn't change, current waiters won't be
- // able to acquire the lock, so we do not need to signal them
- group.wakeUp();
- }
+ group.wakeUp();
}
} finally {
group.releaseLatch();
}
- // dataset intention locks are cleaned up at the end of the job
+ // dataset intention locks are
+ // a) kept in dsLockCache and
+ // b) cleaned up only in releaseLocks at the end of the job
}
@Override
@@ -606,13 +629,15 @@
resArenaMgr.setPkHashVal(resSlot, entityHashValue);
resArenaMgr.setNext(resSlot, group.firstResourceIndex.get());
group.firstResourceIndex.set(resSlot);
- if (DEBUG_MODE)
- LOGGER.finer("new res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue
- + ")");
+ if (DEBUG_MODE) {
+ LOGGER.finer("new res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " +
+ entityHashValue + ")");
+ }
} else {
- if (DEBUG_MODE)
- LOGGER.finer("fnd res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue
- + ")");
+ if (DEBUG_MODE) {
+ LOGGER.finer("fnd res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " +
+ entityHashValue + ")");
+ }
}
return resSlot;
}
@@ -644,13 +669,10 @@
* a) (wait and) convert the lock once conversion becomes viable or
* b) acquire the lock if we want to lock the same resource with the same
* lock mode for the same job.
- *
- * @param resource
- * the resource slot that's being locked
- * @param job
- * the job slot of the job locking the resource
- * @param lockMode
- * the lock mode that the resource should be locked with
+ *
+ * @param resource the resource slot that's being locked
+ * @param job the job slot of the job locking the resource
+ * @param lockMode the lock mode that the resource should be locked with
* @return
*/
private LockAction updateActionForSameJob(long resource, long job, byte lockMode) {
@@ -698,6 +720,17 @@
}
}
+ private boolean hasOtherHolders(long resSlot, long jobSlot) {
+ long holder = resArenaMgr.getLastHolder(resSlot);
+ while (holder != -1) {
+ if (reqArenaMgr.getJobSlot(holder) != jobSlot) {
+ return true;
+ }
+ holder = reqArenaMgr.getNextRequest(holder);
+ }
+ return false;
+ }
+
private long removeLastHolder(long resource, long jobSlot, byte lockMode) {
long holder = resArenaMgr.getLastHolder(resource);
if (holder < 0) {
@@ -848,13 +881,10 @@
* remove the first request for a given job and lock mode from a request queue.
* If the value of the parameter lockMode is LockMode.ANY the first request
* for the job is removed - independent of the LockMode.
- *
- * @param head
- * the head of the request queue
- * @param jobSlot
- * the job slot
- * @param lockMode
- * the lock mode
+ *
+ * @param head the head of the request queue
+ * @param jobSlot the job slot
+ * @param lockMode the lock mode
* @return the slot of the first request that matched the given job
*/
private long removeRequestFromQueueForJob(long head, long jobSlot, byte lockMode) {
@@ -947,30 +977,34 @@
}
private void assertLocksCanBefoundInJobQueue() throws ACIDException {
- for (int i = 0; i < ResourceGroupTable.TABLE_SIZE; ++i) {
- final ResourceGroup group = table.get(i);
- if (group.tryLatch(100, TimeUnit.MILLISECONDS)) {
- try {
- long resSlot = group.firstResourceIndex.get();
- while (resSlot != -1) {
- int dsId = resArenaMgr.getDatasetId(resSlot);
- int entityHashValue = resArenaMgr.getPkHashVal(resSlot);
- long reqSlot = resArenaMgr.getLastHolder(resSlot);
- while (reqSlot != -1) {
- byte lockMode = (byte) reqArenaMgr.getLockMode(reqSlot);
- long jobSlot = reqArenaMgr.getJobSlot(reqSlot);
- int jobId = jobArenaMgr.getJobId(jobSlot);
- assertLockCanBeFoundInJobQueue(dsId, entityHashValue, lockMode, jobId);
- reqSlot = reqArenaMgr.getNextRequest(reqSlot);
+ try {
+ for (int i = 0; i < table.size; ++i) {
+ final ResourceGroup group = table.get(i);
+ if (group.tryLatch(100, TimeUnit.MILLISECONDS)) {
+ try {
+ long resSlot = group.firstResourceIndex.get();
+ while (resSlot != -1) {
+ int dsId = resArenaMgr.getDatasetId(resSlot);
+ int entityHashValue = resArenaMgr.getPkHashVal(resSlot);
+ long reqSlot = resArenaMgr.getLastHolder(resSlot);
+ while (reqSlot != -1) {
+ byte lockMode = (byte) reqArenaMgr.getLockMode(reqSlot);
+ long jobSlot = reqArenaMgr.getJobSlot(reqSlot);
+ int jobId = jobArenaMgr.getJobId(jobSlot);
+ assertLockCanBeFoundInJobQueue(dsId, entityHashValue, lockMode, jobId);
+ reqSlot = reqArenaMgr.getNextRequest(reqSlot);
+ }
+ resSlot = resArenaMgr.getNext(resSlot);
}
- resSlot = resArenaMgr.getNext(resSlot);
+ } finally {
+ group.releaseLatch();
}
- } finally {
- group.releaseLatch();
+ } else {
+ LOGGER.warning("Could not check locks for " + group);
}
- } else {
- LOGGER.warning("Could not check locks for " + group);
}
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("interrupted", e);
}
}
@@ -986,7 +1020,7 @@
/**
* tries to find a lock request searching though the job queue
- *
+ *
* @param dsId
* dataset id
* @param entityHashValue
@@ -1021,66 +1055,30 @@
return -1;
}
- private String resQueueToString(long resSlot) {
- return appendResQueue(new StringBuilder(), resSlot).toString();
+ private TablePrinter getResourceTablePrinter() {
+ return new ResourceTablePrinter(table, resArenaMgr, reqArenaMgr, jobArenaMgr);
}
- private StringBuilder appendResQueue(StringBuilder sb, long resSlot) {
- resArenaMgr.appendRecord(sb, resSlot);
- sb.append("\n");
- appendReqQueue(sb, resArenaMgr.getLastHolder(resSlot));
- return sb;
+ private TablePrinter getDumpTablePrinter() {
+ return new DumpTablePrinter(table, resArenaMgr, reqArenaMgr, jobArenaMgr, jobIdSlotMap);
}
- private StringBuilder appendReqQueue(StringBuilder sb, long head) {
- while (head != -1) {
- reqArenaMgr.appendRecord(sb, head);
- sb.append("\n");
- head = reqArenaMgr.getNextRequest(head);
- }
- return sb;
- }
-
- public StringBuilder append(StringBuilder sb) {
- table.getAllLatches();
- try {
- sb.append(">>dump_begin\t>>----- [resTable] -----\n");
- table.append(sb);
- sb.append(">>dump_end\t>>----- [resTable] -----\n");
-
- sb.append(">>dump_begin\t>>----- [resArenaMgr] -----\n");
- resArenaMgr.append(sb);
- sb.append(">>dump_end\t>>----- [resArenaMgr] -----\n");
-
- sb.append(">>dump_begin\t>>----- [reqArenaMgr] -----\n");
- reqArenaMgr.append(sb);
- sb.append(">>dump_end\t>>----- [reqArenaMgr] -----\n");
-
- sb.append(">>dump_begin\t>>----- [jobIdSlotMap] -----\n");
- for (Integer i : jobIdSlotMap.keySet()) {
- sb.append(i).append(" : ");
- TypeUtil.Global.append(sb, jobIdSlotMap.get(i));
- sb.append("\n");
- }
- sb.append(">>dump_end\t>>----- [jobIdSlotMap] -----\n");
-
- sb.append(">>dump_begin\t>>----- [jobArenaMgr] -----\n");
- jobArenaMgr.append(sb);
- sb.append(">>dump_end\t>>----- [jobArenaMgr] -----\n");
- } finally {
- table.releaseAllLatches();
- }
- return sb;
+ public String printByResource() {
+ return getResourceTablePrinter().append(new StringBuilder()).append("\n").toString();
}
public String toString() {
- return append(new StringBuilder()).toString();
+ return printByResource();
+ }
+
+ public String dump() {
+ return getDumpTablePrinter().append(new StringBuilder()).toString();
}
@Override
public String prettyPrint() throws ACIDException {
StringBuilder s = new StringBuilder("\n########### LockManager Status #############\n");
- return append(s).toString() + "\n";
+ return getDumpTablePrinter().append(s).toString() + "\n";
}
@Override
@@ -1090,7 +1088,7 @@
@Override
public void dumpState(OutputStream os) throws IOException {
- os.write(toString().getBytes());
+ os.write(dump().getBytes());
}
@Override
@@ -1140,119 +1138,5 @@
}
}
- private static class ResourceGroupTable {
- public static final int TABLE_SIZE = 1024; // TODO increase?
-
- private ResourceGroup[] table;
-
- public ResourceGroupTable() {
- table = new ResourceGroup[TABLE_SIZE];
- for (int i = 0; i < TABLE_SIZE; ++i) {
- table[i] = new ResourceGroup();
- }
- }
-
- ResourceGroup get(int dId, int entityHashValue) {
- // TODO ensure good properties of hash function
- int h = Math.abs(dId ^ entityHashValue);
- if (h < 0)
- h = 0;
- return table[h % TABLE_SIZE];
- }
-
- ResourceGroup get(int i) {
- return table[i];
- }
-
- public void getAllLatches() {
- for (int i = 0; i < TABLE_SIZE; ++i) {
- table[i].getLatch();
- }
- }
-
- public void releaseAllLatches() {
- for (int i = 0; i < TABLE_SIZE; ++i) {
- table[i].releaseLatch();
- }
- }
-
- public StringBuilder append(StringBuilder sb) {
- return append(sb, false);
- }
-
- public StringBuilder append(StringBuilder sb, boolean detail) {
- for (int i = 0; i < table.length; ++i) {
- sb.append(i).append(" : ");
- if (detail) {
- sb.append(table[i]);
- } else {
- sb.append(table[i].firstResourceIndex);
- }
- sb.append('\n');
- }
- return sb;
- }
- }
-
- private static class ResourceGroup {
- private ReentrantReadWriteLock latch;
- private Condition condition;
- AtomicLong firstResourceIndex;
-
- ResourceGroup() {
- latch = new ReentrantReadWriteLock();
- condition = latch.writeLock().newCondition();
- firstResourceIndex = new AtomicLong(-1);
- }
-
- void getLatch() {
- log("latch");
- latch.writeLock().lock();
- }
-
- boolean tryLatch(long timeout, TimeUnit unit) throws ACIDException {
- log("tryLatch");
- try {
- return latch.writeLock().tryLock(timeout, unit);
- } catch (InterruptedException e) {
- LOGGER.finer("interrupted while wating on ResourceGroup");
- throw new ACIDException("interrupted", e);
- }
- }
-
- void releaseLatch() {
- log("release");
- latch.writeLock().unlock();
- }
-
- boolean hasWaiters() {
- return latch.hasQueuedThreads();
- }
-
- void await(ITransactionContext txnContext) throws ACIDException {
- log("wait for");
- try {
- condition.await();
- } catch (InterruptedException e) {
- LOGGER.finer("interrupted while wating on ResourceGroup");
- throw new ACIDException(txnContext, "interrupted", e);
- }
- }
-
- void wakeUp() {
- log("notify");
- condition.signalAll();
- }
-
- void log(String s) {
- if (LOGGER.isLoggable(LVL)) {
- LOGGER.log(LVL, s + " " + toString());
- }
- }
-
- public String toString() {
- return "{ id : " + hashCode() + ", first : " + TypeUtil.Global.toString(firstResourceIndex.get())
- + ", waiters : " + (hasWaiters() ? "true" : "false") + " }";
- }
- }
}
+
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java
new file mode 100644
index 0000000..ffdb151
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/DumpTablePrinter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.service.locking;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DumpTablePrinter implements TablePrinter {
+ private ResourceGroupTable table;
+ private ResourceArenaManager resArenaMgr;
+ private RequestArenaManager reqArenaMgr;
+ private JobArenaManager jobArenaMgr;
+ private ConcurrentHashMap<Integer, Long> jobIdSlotMap;
+
+ DumpTablePrinter(ResourceGroupTable table,
+ ResourceArenaManager resArenaMgr,
+ RequestArenaManager reqArenaMgr,
+ JobArenaManager jobArenaMgr,
+ ConcurrentHashMap<Integer, Long> jobIdSlotMap) {
+ this.table = table;
+ this.resArenaMgr = resArenaMgr;
+ this.reqArenaMgr = reqArenaMgr;
+ this.jobArenaMgr = jobArenaMgr;
+ this.jobIdSlotMap = jobIdSlotMap;
+ }
+
+ public StringBuilder append(StringBuilder sb) {
+ table.getAllLatches();
+ try {
+ sb.append(">>dump_begin\t>>----- [resTable] -----\n");
+ table.append(sb);
+ sb.append(">>dump_end\t>>----- [resTable] -----\n");
+
+ sb.append(">>dump_begin\t>>----- [resArenaMgr] -----\n");
+ resArenaMgr.append(sb);
+ sb.append(">>dump_end\t>>----- [resArenaMgr] -----\n");
+
+ sb.append(">>dump_begin\t>>----- [reqArenaMgr] -----\n");
+ reqArenaMgr.append(sb);
+ sb.append(">>dump_end\t>>----- [reqArenaMgr] -----\n");
+
+ sb.append(">>dump_begin\t>>----- [jobIdSlotMap] -----\n");
+ for (Integer i : jobIdSlotMap.keySet()) {
+ sb.append(i).append(" : ");
+ TypeUtil.Global.append(sb, jobIdSlotMap.get(i));
+ sb.append("\n");
+ }
+ sb.append(">>dump_end\t>>----- [jobIdSlotMap] -----\n");
+
+ sb.append(">>dump_begin\t>>----- [jobArenaMgr] -----\n");
+ jobArenaMgr.append(sb);
+ sb.append(">>dump_end\t>>----- [jobArenaMgr] -----\n");
+ } finally {
+ table.releaseAllLatches();
+ }
+ return sb;
+ }
+
+ String resQueueToString(long resSlot) {
+ return appendResQueue(new StringBuilder(), resSlot).toString();
+ }
+
+ StringBuilder appendResQueue(StringBuilder sb, long resSlot) {
+ resArenaMgr.appendRecord(sb, resSlot);
+ sb.append("\n");
+ appendReqQueue(sb, resArenaMgr.getLastHolder(resSlot));
+ return sb;
+ }
+
+ StringBuilder appendReqQueue(StringBuilder sb, long head) {
+ while (head != -1) {
+ reqArenaMgr.appendRecord(sb, head);
+ sb.append("\n");
+ head = reqArenaMgr.getNextRequest(head);
+ }
+ return sb;
+ }
+}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index aaa96bb..1dbf16b 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -392,7 +392,6 @@
class LockRequestWorker implements Runnable {
String threadName;
- TransactionSubsystem txnProvider;
ILockManager lockMgr;
WorkerReadyQueue workerReadyQueue;
LockRequest lockRequest;
@@ -401,7 +400,6 @@
boolean isDone;
public LockRequestWorker(TransactionSubsystem txnProvider, WorkerReadyQueue workerReadyQueue, String threadName) {
- this.txnProvider = txnProvider;
this.lockMgr = txnProvider.getLockManager();
this.workerReadyQueue = workerReadyQueue;
this.threadName = new String(threadName);
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroup.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroup.java
new file mode 100644
index 0000000..bec4e53
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroup.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.locking;
+
+import org.apache.asterix.common.transactions.ITransactionContext;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A ResourceGroup represents a group of resources that are manged by a ConcurrentLockManager.
+ * All resources in a group share a common latch. I.e. all modifications of lock requests for any resource in a group
+ * are protected by the same latch.
+ *
+ * @see ConcurrentLockManager
+ */
+class ResourceGroup {
+ private ReentrantReadWriteLock latch;
+ private Condition condition;
+ AtomicLong firstResourceIndex;
+
+ ResourceGroup() {
+ latch = new ReentrantReadWriteLock();
+ condition = latch.writeLock().newCondition();
+ firstResourceIndex = new AtomicLong(-1);
+ }
+
+ void getLatch() {
+ log("latch");
+ latch.writeLock().lock();
+ }
+
+ boolean tryLatch(long timeout, TimeUnit unit) throws InterruptedException {
+ log("tryLatch");
+ try {
+ return latch.writeLock().tryLock(timeout, unit);
+ } catch (InterruptedException e) {
+ ConcurrentLockManager.LOGGER.finer("interrupted while wating on ResourceGroup");
+ throw e;
+ }
+ }
+
+ void releaseLatch() {
+ log("release");
+ latch.writeLock().unlock();
+ }
+
+ boolean hasWaiters() {
+ return latch.hasQueuedThreads();
+ }
+
+ void await(ITransactionContext txnContext) throws InterruptedException {
+ log("wait for");
+ try {
+ condition.await();
+ } catch (InterruptedException e) {
+ ConcurrentLockManager.LOGGER.finer("interrupted while waiting on ResourceGroup");
+ throw e;
+ }
+ }
+
+ void wakeUp() {
+ log("notify");
+ condition.signalAll();
+ }
+
+ void log(String s) {
+ if (ConcurrentLockManager.LOGGER.isLoggable(ConcurrentLockManager.LVL)) {
+ ConcurrentLockManager.LOGGER.log(ConcurrentLockManager.LVL, s + " " + toString());
+ }
+ }
+
+ public String toString() {
+ return "{ id : " + hashCode() + ", first : " + TypeUtil.Global.toString(firstResourceIndex.get()) + ", " +
+ "waiters : " + (hasWaiters() ? "true" : "false") + " }";
+ }
+}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroupTable.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroupTable.java
new file mode 100644
index 0000000..213ccd9
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceGroupTable.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.locking;
+
+/**
+ * A hash table for ResourceGroups. As each ResourceGroup has a latch that protects the modifications for resources in
+ * that group, the size of a ResourceGroupTable determines the maximal number of lock requests that can concurrently
+ * be served by a ConcurrentLockManager.
+ *
+ * @see ResourceGroup
+ * @see ConcurrentLockManager
+ */
+
+class ResourceGroupTable {
+ public final int size;
+
+ private ResourceGroup[] table;
+
+ public ResourceGroupTable(int size) {
+ this.size = size;
+ table = new ResourceGroup[size];
+ for (int i = 0; i < size; ++i) {
+ table[i] = new ResourceGroup();
+ }
+ }
+
+ ResourceGroup get(int dId, int entityHashValue) {
+ // TODO ensure good properties of hash function
+ int h = Math.abs(dId ^ entityHashValue);
+ if (h < 0) h = 0;
+ return table[h % size];
+ }
+
+ ResourceGroup get(int i) {
+ return table[i];
+ }
+
+ public void getAllLatches() {
+ for (int i = 0; i < size; ++i) {
+ table[i].getLatch();
+ }
+ }
+
+ public void releaseAllLatches() {
+ for (int i = 0; i < size; ++i) {
+ table[i].releaseLatch();
+ }
+ }
+
+ public StringBuilder append(StringBuilder sb) {
+ return append(sb, false);
+ }
+
+ public StringBuilder append(StringBuilder sb, boolean detail) {
+ for (int i = 0; i < table.length; ++i) {
+ sb.append(i).append(" : ");
+ if (detail) {
+ sb.append(table[i]);
+ } else {
+ sb.append(table[i].firstResourceIndex);
+ }
+ sb.append('\n');
+ }
+ return sb;
+ }
+}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java
new file mode 100644
index 0000000..90c1f69
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ResourceTablePrinter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.service.locking;
+
+import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
+
+import static org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+
+/**
+ * Creates a JSON serialization of the lock table of the ConcurrentLockManager organized by resource. I.e. the
+ * serialization will contain all resources for which lock request are recorded in the table - along with a list of
+ * the requests for each resource.
+ *
+ * @see ConcurrentLockManager
+ */
+public class ResourceTablePrinter implements TablePrinter {
+ private ResourceGroupTable table;
+ private ResourceArenaManager resArenaMgr;
+ private RequestArenaManager reqArenaMgr;
+ private JobArenaManager jobArenaMgr;
+
+ ResourceTablePrinter(ResourceGroupTable table,
+ ResourceArenaManager resArenaMgr,
+ RequestArenaManager reqArenaMgr,
+ JobArenaManager jobArenaMgr) {
+ this.table = table;
+ this.resArenaMgr = resArenaMgr;
+ this.reqArenaMgr = reqArenaMgr;
+ this.jobArenaMgr = jobArenaMgr;
+ }
+
+ public StringBuilder append(StringBuilder sb) {
+ table.getAllLatches();
+ sb.append("[\n");
+ int i = 0;
+ long res = -1;
+ while (res == -1 && i < table.size) {
+ res = table.get(i++).firstResourceIndex.get();
+ }
+ while (i < table.size) {
+ sb = appendResource(sb, res);
+ res = resArenaMgr.getNext(res);
+ while (res == -1 && i < table.size) {
+ res = table.get(i++).firstResourceIndex.get();
+ }
+ if (res == -1) {
+ sb.append("\n");
+ break;
+ } else {
+ sb.append(",\n");
+ }
+ }
+ table.releaseAllLatches();
+ return sb.append("]");
+ }
+
+ StringBuilder appendResource(StringBuilder sb, long res) {
+ sb.append("{ \"dataset\": ").append(resArenaMgr.getDatasetId(res));
+ sb.append(", \"hash\": ").append(resArenaMgr.getPkHashVal(res));
+ sb.append(", \"max mode\": ").append(string(resArenaMgr.getMaxMode(res)));
+ long lastHolder = resArenaMgr.getLastHolder(res);
+ if (lastHolder != -1) {
+ sb = appendRequests(sb.append(", \"holders\": "), lastHolder);
+ }
+ long firstUpgrader = resArenaMgr.getFirstUpgrader(res);
+ if (firstUpgrader != -1) {
+ sb = appendRequests(sb.append(", \"upgraders\": "), firstUpgrader);
+ }
+ long firstWaiter = resArenaMgr.getFirstWaiter(res);
+ if (firstWaiter != -1) {
+ sb = appendRequests(sb.append(", \"waiters\": "), firstWaiter);
+ }
+ return sb.append(" }");
+ }
+
+ StringBuilder appendRequests(StringBuilder sb, long req) {
+ sb.append("[ ");
+ while (req != -1) {
+ appendRequest(sb, req);
+ req = reqArenaMgr.getNextRequest(req);
+ sb.append(req == -1 ? " ]" : ", ");
+ }
+ return sb;
+ }
+
+ StringBuilder appendRequest(StringBuilder sb, long req) {
+ long job = reqArenaMgr.getJobSlot(req);
+ sb.append("{ \"job\": ").append(jobArenaMgr.getJobId(job));
+ sb.append(", \"mode\": \"").append(string(reqArenaMgr.getLockMode(req)));
+ return sb.append("\" }");
+ }
+
+ private static final String string(int lockMode) {
+ return LockMode.toString((byte) lockMode);
+ }
+}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TablePrinter.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TablePrinter.java
new file mode 100644
index 0000000..2b4260b
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TablePrinter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.service.locking;
+
+public interface TablePrinter {
+ StringBuilder append(StringBuilder sb);
+}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/WaitInterruptedException.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/WaitInterruptedException.java
new file mode 100644
index 0000000..8171f77
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/WaitInterruptedException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.locking;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ITransactionContext;
+
+public class WaitInterruptedException extends ACIDException {
+ public WaitInterruptedException(ITransactionContext txnContext, String message, Throwable cause) {
+ super(txnContext, message, cause);
+ }
+}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index d371e94..6650ac6 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -54,7 +54,7 @@
this.id = id;
this.txnProperties = txnProperties;
this.transactionManager = new TransactionManager(this);
- this.lockManager = new ConcurrentLockManager(this);
+ this.lockManager = new ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer());
AsterixReplicationProperties asterixReplicationProperties = null;
if (asterixAppRuntimeContextProvider != null) {
@@ -108,4 +108,4 @@
return id;
}
-}
\ No newline at end of file
+}
diff --git a/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
new file mode 100644
index 0000000..5cabd04
--- /dev/null
+++ b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
@@ -0,0 +1,398 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.asterix.transaction.management.service.locking;
+
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.JobId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Logger;
+
+import static org.apache.asterix.transaction.management.service.locking.Request.Kind;
+import static org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class LockManagerUnitTest {
+
+ public static int LOCK_MGR_SHRINK_TIMER = 5000;
+ public static int LOCK_MGR_ARENAS = 2;
+ public static int LOCK_MGR_TABLE_SIZE = 10;
+
+ static int INITIAL_TIMESTAMP = 0;
+ static long COORDINATOR_SLEEP = 20;
+ static int TIMEOUT_MS = 100;
+
+ static {
+ Logger.getLogger(ConcurrentLockManager.class.getName()).addHandler(new ConsoleHandler());
+ }
+
+ Map<Integer, ITransactionContext> jobMap;
+ ILockManager lockMgr;
+
+ // set to e.g. System.err to get some output
+ PrintStream out = System.out;
+ PrintStream err = null; //System.err;
+
+ //--------------------------------------------------------------------
+ // JUnit methods
+ //--------------------------------------------------------------------
+
+ @Before
+ public void setUp() throws Exception {
+ jobMap = new HashMap<>();
+ lockMgr = new ConcurrentLockManager(LOCK_MGR_SHRINK_TIMER, LOCK_MGR_ARENAS, LOCK_MGR_TABLE_SIZE);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ lockMgr = null;
+ jobMap = null;
+ }
+
+ @Test
+ public void testSimpleSharedUnlock() throws Exception {
+ List<Request> reqs = new ArrayList<>();
+ reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+ reqs.add(req(Kind.UNLOCK, j(1), d(1), e(1), LockMode.S));
+ reqs.add(req(Kind.UNLOCK, j(1), d(1), e(-1), LockMode.IS));
+ reportErrors(execute(reqs));
+ }
+
+ @Test
+ public void testSimpleSharedRelease() throws Exception {
+ List<Request> reqs = new ArrayList<>();
+ reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+ reqs.add(req(Kind.RELEASE, j(1)));
+ reportErrors(execute(reqs));
+ }
+
+ @Test
+ public void testReacquire() throws Exception {
+ List<Request> reqs = new ArrayList<>();
+ reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+ reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+ reqs.add(req(Kind.RELEASE, j(1)));
+ reportErrors(execute(reqs));
+ }
+
+ @Test
+ public void testInstant() throws Exception {
+ List<Request> reqs = new ArrayList<>();
+ reqs.add(req(Kind.INSTANT_LOCK, j(1), d(1), e(1), LockMode.X));
+ reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X));
+ reqs.add(req(Kind.PRINT));
+ reqs.add(req(Kind.INSTANT_LOCK, j(3), d(1), e(1), LockMode.S));
+ expectError(execute(reqs), j(3), WaitInterruptedException.class);
+ }
+
+ @Test
+ public void testTry() throws Exception {
+ List<Request> reqs = new ArrayList<>();
+ reqs.add(req(Kind.TRY_LOCK, j(1), d(1), e(1), LockMode.S));
+ reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.S));
+ reqs.add(req(Kind.PRINT));
+ reqs.add(req(Kind.TRY_LOCK, j(3), d(1), e(1), LockMode.X));
+ reportErrors(execute(reqs));
+ }
+
+ @Test
+ public void testInstantTry() throws Exception {
+ List<Request> reqs = new ArrayList<>();
+ reqs.add(req(Kind.INSTANT_LOCK, j(1), d(1), e(1), LockMode.X));
+ reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X));
+ reqs.add(req(Kind.PRINT));
+ reqs.add(req(Kind.INSTANT_TRY_LOCK, j(3), d(1), e(1), LockMode.S));
+ reportErrors(execute(reqs));
+ }
+
+ @Test
+ public void testDeadlock() throws Exception {
+ List<Request> reqs = new ArrayList<>();
+ reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X));
+ reqs.add(req(Kind.LOCK, j(2), d(1), e(2), LockMode.X));
+ reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X));
+ reqs.add(req(Kind.LOCK, j(1), d(1), e(2), LockMode.X));
+ reqs.add(req(Kind.RELEASE, j(1)));
+ reqs.add(req(Kind.RELEASE, j(2)));
+ expectError(execute(reqs), j(1), ACIDException.class);
+ }
+
+ @Test
+ public void testUpgrade() throws Exception {
+ List<Request> reqs = new ArrayList<>();
+ reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+ reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X));
+ reqs.add(req(Kind.RELEASE, j(1)));
+ reportErrors(execute(reqs));
+ }
+
+ @Test
+ public void testUpgradeDeadlock() throws Exception {
+ List<Request> reqs = new ArrayList<>();
+ reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+ reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.S));
+ reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X));
+ reqs.add(req(Kind.PRINT));
+ reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X));
+ reqs.add(req(Kind.RELEASE, j(1)));
+ reqs.add(req(Kind.RELEASE, j(2)));
+ expectError(execute(reqs), j(2), ACIDException.class);
+ }
+
+ @Test
+ /**
+ * Runs into a time-out and j(1) gets interrupted by
+ * the test. This scenario happens only in this test as there
+ * is additional synchronization between the locking threads
+ * through the coordinator.
+ */
+ public void testTimeout() throws Exception {
+ List<Request> reqs = new ArrayList<>();
+ reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
+ reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.S));
+ reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X));
+ reqs.add(req(Kind.RELEASE, j(1)));
+ reqs.add(req(Kind.RELEASE, j(2)));
+ // this runs into a time-out and j(1) gets interrupted
+ expectError(execute(reqs), j(1), WaitInterruptedException.class);
+ }
+
+ //--------------------------------------------------------------------
+ // Helper methods
+ //--------------------------------------------------------------------
+
+ /**
+ * Executes a list of requests where
+ * a) each job runs in a different thread and
+ * b) the threads/jobs are synchronized
+ * The synchronization ensures that the requests are submitted to the
+ * LockManager in list order, however they are fulfilled in the order
+ * decided by the LockManager
+ *
+ * @param reqs a list of requests that will be execute in order
+ * @return a map of (JodId, exception) pairs that can either be handled
+ * by the test or thrown using #reportErrors
+ */
+ private Map<String, Throwable> execute(List<Request> reqs) throws InterruptedException {
+ if (err != null) {
+ err.println("*** start ***");
+ }
+ final AtomicInteger timeStamp = new AtomicInteger(INITIAL_TIMESTAMP);
+ Set<Locker> lockers = createLockers(reqs, timeStamp);
+ Map<String, Thread> threads = startThreads(lockers);
+
+ int coordinatorTime = timeStamp.get();
+ while (active(lockers)) {
+ if (err != null) {
+ err.println("coordinatorTime = " + coordinatorTime);
+ }
+ if (coordinatorTime == timeStamp.get()) {
+ Thread.sleep(COORDINATOR_SLEEP);
+ if (coordinatorTime == timeStamp.get()) {
+ Locker timedOut = timedOut(lockers);
+ if (timedOut != null) {
+ if (err != null) {
+ err.println(timedOut.name + " timed out");
+ }
+ break;
+ }
+ }
+ }
+ coordinatorTime = timeStamp.get();
+ }
+ Map<String, Throwable> result = stopThreads(lockers, threads);
+ return result;
+ }
+
+ private boolean active(Set<Locker> lockers) {
+ for (Locker locker : lockers) {
+ if (locker.active()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private Locker timedOut(Set<Locker> lockers) {
+ for (Locker locker : lockers) {
+ if (locker.timedOut()) {
+ return locker;
+ }
+ }
+ return null;
+ }
+
+ private Set<Locker> createLockers(List<Request> reqs, AtomicInteger timeStamp) {
+ Set<Locker> lockers = new HashSet<>();
+ lockers.add(new Locker(lockMgr, null, reqs, timeStamp, err));
+ for (ITransactionContext txnCtx : jobMap.values()) {
+ Locker locker = new Locker(lockMgr, txnCtx, reqs, timeStamp, err);
+ lockers.add(locker);
+ }
+ return lockers;
+ }
+
+ private Map<String, Thread> startThreads(Set<Locker> lockers) {
+ Map<String, Thread> threads = new HashMap<>(lockers.size());
+ for (Locker locker : lockers) {
+ Thread t = new Thread(locker, locker.name);
+ threads.put(locker.name, t);
+ t.start();
+ }
+ return threads;
+ }
+
+ private Map<String, Throwable> stopThreads(Set<Locker> lockers, Map<String, Thread> threads) throws
+ InterruptedException {
+ Map<String, Throwable> result = new HashMap<>();
+ for (Locker locker : lockers) {
+ stopThread(threads.get(locker.name));
+ List<Throwable> errors = locker.getErrors();
+ if (errors != null) {
+ errors.forEach(error -> result.put(locker.name, error));
+ }
+ }
+ return result;
+ }
+
+ private void stopThread(Thread t) throws InterruptedException {
+ if (err != null) {
+ err.println("stopping " + t.getName() + " " + t.getState());
+ }
+ boolean done = false;
+ while (!done) {
+ switch (t.getState()) {
+ case NEW:
+ case RUNNABLE:
+ case TERMINATED:
+ done = true;
+ break;
+ default:
+ if (err != null) {
+ err.println("interrupting " + t.getName());
+ }
+ t.interrupt();
+ }
+ }
+ if (err != null) {
+ err.println("joining " + t.getName());
+ }
+ t.join();
+ }
+
+ /**
+ * throws the first Throwable found in the map.
+ * This is the default way to handle the errors returned by #execute
+ *
+ * @param errors a map of (JodId, exception) pairs
+ */
+ void reportErrors(Map<String, Throwable> errors) {
+ for (String name : errors.keySet()) {
+ throw new AssertionError("job " + name + " caught something", errors.get(name));
+ }
+ out.println("no errors");
+ }
+
+ void printErrors(Map<String, Throwable> errors) {
+ errors.keySet().forEach(name -> out.println("Thread " + name + " caught " + errors.get(name)));
+ }
+
+ /**
+ * gets the error for a specific job from the errors map
+ *
+ * @param errors a map of (JodId, throwable) pairs
+ * @param txnCtx the transaction context of the job whose error is requested
+ * @return throwable for said error
+ */
+ private static Throwable getError(Map<String, Throwable> errors, ITransactionContext txnCtx) {
+ return errors.get(txnCtx.getJobId().toString());
+ }
+
+ /**
+ * asserts that the error for a specific job from the errors map is of a specific class
+ *
+ * @param errors a map of (JodId, throwable) pairs
+ * @param txnCtx the transaction context of the job whose error is requested
+ * @param clazz the exception class
+ */
+ private void expectError(Map<String, Throwable> errors, ITransactionContext txnCtx,
+ Class<? extends Throwable> clazz) throws Exception {
+ Throwable error = getError(errors, txnCtx);
+ if (error == null) {
+ throw new AssertionError("expected " + clazz.getSimpleName() + " for " + txnCtx.getJobId() + ", got no " +
+ "exception");
+ }
+ if (!clazz.isInstance(error)) {
+ throw new AssertionError(error);
+ }
+ out.println("caught expected " + error);
+ }
+
+ //--------------------------------------------------------------------
+ // Convenience methods to make test description more compact
+ //--------------------------------------------------------------------
+
+ private Request req(final Kind kind, final ITransactionContext txnCtx,
+ final DatasetId dsId, final int hashValue, final byte lockMode) {
+ return Request.create(kind, txnCtx, dsId, hashValue, lockMode);
+ }
+
+ private Request req(final Kind kind, final ITransactionContext txnCtx) {
+ return Request.create(kind, txnCtx);
+ }
+
+ private Request req(final Kind kind) {
+ return Request.create(kind, out);
+ }
+
+ private static DatasetId d(int id) {
+ return new DatasetId(id);
+ }
+
+ private static int e(int i) {
+ return i;
+ }
+
+ private ITransactionContext j(int jId) {
+ if (!jobMap.containsKey(jId)) {
+ ITransactionContext mockTxnContext = mock(ITransactionContext.class);
+ when(mockTxnContext.getJobId()).thenReturn(new JobId(jId));
+ jobMap.put(jId, mockTxnContext);
+ }
+ return jobMap.get(jId);
+ }
+
+}
diff --git a/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java
new file mode 100644
index 0000000..164fc07
--- /dev/null
+++ b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Locker.java
@@ -0,0 +1,187 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.transaction.management.service.locking;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.junit.Assert;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Executes a sequence of lock requests against an ILockManager.
+ * Lockers are run by different threads in the LockManagerUnitTest.
+ *
+ * @see ILockManager
+ * @see LockManagerUnitTest
+ */
+class Locker implements Runnable {
+
+ public String name;
+
+ private ILockManager lockMgr;
+
+ private List<Requester> requests;
+ private Iterator<Requester> reqIter;
+ private volatile Requester curReq;
+ private int reqStart;
+
+ private AtomicInteger globalTime;
+ private List<Throwable> errors;
+
+ private PrintStream err;
+
+ /**
+ * @param lockMgr the ILockManager to send requests to
+ * @param txnCtx the ITransactionContext that identifies the transaction that this Locker represents
+ * @param allRequests an ordered list of lock requests for multiple transactions, this Locker will only execute
+ * requests for the transaction identified by txnCtx
+ * @param time a global timestamp that is used to synchronize different lockers to ensure that requests are started
+ * in the order given in allRequests
+ * @param err a stream to write log/error information to
+ *
+ * @see Request
+ */
+ Locker(ILockManager lockMgr, ITransactionContext txnCtx, List<Request> allRequests, AtomicInteger time,
+ PrintStream err) {
+ this.name = txnCtx == null ? "admin" : txnCtx.getJobId().toString();
+ this.lockMgr = lockMgr;
+
+ this.requests = new LinkedList<>();
+ for (int pos = 0; pos < allRequests.size(); ++pos) {
+ Request req = allRequests.get(pos);
+ if (req.txnCtx == txnCtx) {
+ requests.add(new Requester(pos, req));
+ }
+ }
+ this.reqIter = requests.iterator();
+ this.globalTime = time;
+ this.err = err;
+ }
+
+ private boolean hasErrors() {
+ return errors != null && errors.size() > 0;
+ }
+
+ synchronized List<Throwable> getErrors() {
+ return errors;
+ }
+
+ private synchronized void addError(Throwable error) {
+ log("caught " + error);
+ if (this.errors == null) {
+ this.errors = Collections.synchronizedList(new ArrayList<Throwable>());
+ }
+ this.errors.add(error);
+ }
+
+ public synchronized boolean active() {
+ return !hasErrors() && (reqIter.hasNext() || curReq != null);
+ }
+
+ public synchronized boolean timedOut() {
+ return reqStart > 0 && (currentTime() - reqStart) > LockManagerUnitTest.TIMEOUT_MS;
+ }
+
+ @Override
+ public void run() {
+ log("running");
+ try {
+ while (! hasErrors() && reqIter.hasNext()) {
+ curReq = reqIter.next();
+ int localTime = globalTime.get();
+ while (localTime < curReq.time) {
+ Thread.sleep(10);
+ localTime = globalTime.get();
+ }
+ if (localTime != curReq.time) {
+ throw new AssertionError("missed time for request " + curReq);
+ }
+ log("will exec at t=" + localTime + " " + curReq);
+ try {
+ reqStart = currentTime();
+ Assert.assertEquals(localTime, globalTime.getAndIncrement());
+ log("incremented");
+ curReq.setResult(curReq.request.execute(lockMgr) ? Requester.SUCCESS : Requester.FAIL);
+ } catch (ACIDException e) {
+ curReq.setResult(Requester.ERROR);
+ addError(e);
+ } finally {
+ reqStart = -1;
+ }
+ log("time " + localTime);
+ }
+ curReq = null;
+ } catch (InterruptedException ie) {
+ log("got interrupted");
+ } catch (Throwable e) {
+ if (err != null) {
+ e.printStackTrace(err);
+ }
+ addError(e);
+ }
+ log("done");
+ }
+
+ private void log(String msg) {
+ if (err != null) {
+ err.println(Thread.currentThread().getName() + " " + msg);
+ }
+ }
+
+ private static int currentTime() {
+ return ((int) System.currentTimeMillis()) & 0x7fffffff;
+ }
+
+ public String toString() {
+ return "[" + name + "]" + curReq;
+ }
+}
+
+class Requester {
+
+ public static byte NONE = -1;
+ public static byte FAIL = 0;
+ public static byte SUCCESS = 1;
+ public static byte ERROR = 2;
+
+ int time;
+ Request request;
+ byte result = NONE;
+
+ Requester(int time, Request request) {
+ this.time = time;
+ this.request = request;
+ }
+
+ void setResult(byte res) {
+ result = res;
+ }
+
+ public String toString() {
+ return request.toString() + " t=" + time;
+ }
+}
diff --git a/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
new file mode 100644
index 0000000..c7e0c42
--- /dev/null
+++ b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/Request.java
@@ -0,0 +1,166 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.transaction.management.service.locking;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
+
+import java.io.PrintStream;
+
+/**
+ * repesents a lock request for testing.
+ */
+abstract class Request {
+ /** the kind of a request */
+ enum Kind {
+ /** requests an instant-try-lock */
+ INSTANT_TRY_LOCK,
+ /** requests an instant-lock */
+ INSTANT_LOCK,
+ /** requests a lock */
+ LOCK,
+ /** prints a JSON representation of the lock table by entity */
+ PRINT,
+ /** releases all locks */
+ RELEASE,
+ /** requests a try-lock */
+ TRY_LOCK,
+ /** unlocks a lock */
+ UNLOCK
+ }
+
+ Kind kind;
+ ITransactionContext txnCtx;
+
+ Request(Kind kind, ITransactionContext txnCtx) {
+ this.kind = kind;
+ this.txnCtx = txnCtx;
+ }
+
+ String asString(final Kind kind, final ITransactionContext txnCtx,
+ final DatasetId dsId, final int hashValue, final byte lockMode) {
+ return txnCtx.getJobId().toString() + ":" + kind.name() + ":" + dsId.getId() + ":" + hashValue + ":"
+ + TransactionManagementConstants.LockManagerConstants.LockMode.toString(lockMode);
+ }
+
+ abstract boolean execute(ILockManager lockMgr) throws ACIDException;
+
+ static Request create(final Kind kind, final ITransactionContext txnCtx,
+ final DatasetId dsId, final int hashValue, final byte lockMode) {
+ switch (kind) {
+ case INSTANT_TRY_LOCK:
+ return new Request(kind, txnCtx) {
+ boolean execute(ILockManager lockMgr) throws ACIDException {
+ return lockMgr.instantTryLock(dsId, hashValue, lockMode, txnCtx);
+ }
+
+ public String toString() {
+ return asString(kind, txnCtx, dsId, hashValue, lockMode);
+ }
+ };
+ case INSTANT_LOCK:
+ return new Request(kind, txnCtx) {
+ boolean execute(ILockManager lockMgr) throws ACIDException {
+ lockMgr.instantLock(dsId, hashValue, lockMode, txnCtx);
+ return true;
+ }
+
+ public String toString() {
+ return asString(kind, txnCtx, dsId, hashValue, lockMode);
+ }
+ };
+ case LOCK:
+ return new Request(kind, txnCtx) {
+ boolean execute(ILockManager lockMgr) throws ACIDException {
+ lockMgr.lock(dsId, hashValue, lockMode, txnCtx);
+ return true;
+ }
+
+ public String toString() {
+ return asString(kind, txnCtx, dsId, hashValue, lockMode);
+ }
+ };
+ case TRY_LOCK:
+ return new Request(kind, txnCtx) {
+ boolean execute(ILockManager lockMgr) throws ACIDException {
+ return lockMgr.tryLock(dsId, hashValue, lockMode, txnCtx);
+ }
+
+ public String toString() {
+ return asString(kind, txnCtx, dsId, hashValue, lockMode);
+ }
+ };
+ case UNLOCK:
+ return new Request(kind, txnCtx) {
+ boolean execute(ILockManager lockMgr) throws ACIDException {
+ lockMgr.unlock(dsId, hashValue, lockMode, txnCtx);
+ return true;
+ }
+
+ public String toString() {
+ return asString(kind, txnCtx, dsId, hashValue, lockMode);
+ }
+ };
+ default:
+ }
+ throw new AssertionError("Illegal Request Kind " + kind);
+ }
+
+ static Request create(final Kind kind, final ITransactionContext txnCtx) {
+ if (kind == Kind.RELEASE) {
+ return new Request(kind, txnCtx) {
+ boolean execute(ILockManager lockMgr) throws ACIDException {
+ lockMgr.releaseLocks(txnCtx);
+ return true;
+ }
+
+ public String toString() {
+ return txnCtx.getJobId().toString() + ":" + kind.name();
+ }
+ };
+ }
+ throw new AssertionError("Illegal Request Kind " + kind);
+ }
+
+ static Request create(final Kind kind, final PrintStream out) {
+ if (kind == Kind.PRINT) {
+ return new Request(kind, null) {
+ boolean execute(ILockManager lockMgr) throws ACIDException {
+ if (out == null) {
+ return false;
+ }
+ if (!(lockMgr instanceof ConcurrentLockManager)) {
+ out.print("cannot print");
+ return false;
+ }
+ out.print(((ConcurrentLockManager) lockMgr).printByResource());
+ return true;
+ }
+
+ public String toString() {
+ return kind.name();
+ }
+ };
+ }
+ throw new AssertionError("Illegal Request Kind " + kind);
+ }
+}