introduce LockMode.ANY to indicate that the first lock will be unlocked during unlock
synchonize on jobArenaManager in releaseLocks
get all latches in ConcurrentLockManager.append to ensure a consistent snapshot
some cleanup
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
index dbe7e82..410f207 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
@@ -61,6 +61,7 @@
/**
* @param datasetId
* @param entityHashValue
+ * @param lockMode
* @param txnContext
* @throws ACIDException
* TODO
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 44bc607..96665ba 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -40,7 +40,6 @@
*
* @author tillw
*/
-
public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent {
private static final Logger LOGGER = Logger.getLogger(ConcurrentLockManager.class.getName());
@@ -48,7 +47,6 @@
public static final boolean IS_DEBUG_MODE = false;//true
private TransactionSubsystem txnSubsystem;
-
private ResourceGroupTable table;
private ResourceArenaManager resArenaMgr;
private RequestArenaManager reqArenaMgr;
@@ -159,7 +157,7 @@
private void enqueueWaiter(final ResourceGroup group, final long reqSlot,
final long resSlot, final long jobSlot, final LockAction act,
ITransactionContext txnContext) throws ACIDException {
- final Queue queue = act.modify ? Upgrader : Waiter;
+ final Queue queue = act.modify ? upgrader : waiter;
if (! introducesDeadlock(resSlot, jobSlot)) {
queue.add(reqSlot, resSlot, jobSlot);
} else {
@@ -244,14 +242,12 @@
final long jobSlot = findOrAllocJobSlot(jobId);
- boolean locked = false;
- while (! locked) {
+ while (true) {
final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
switch (act) {
case UPD:
case GET:
- locked = true;
- break;
+ return;
case WAIT:
case CONV:
if (reqSlot == -1) {
@@ -387,12 +383,10 @@
ResourceGroup group = table.get(datasetId, entityHashValue);
group.getLatch();
-
try {
int dsId = datasetId.getId();
long resource = findResourceInGroup(group, dsId, entityHashValue);
-
if (resource < 0) {
throw new IllegalStateException("resource (" + dsId + ", " + entityHashValue + ") not found");
}
@@ -435,7 +429,7 @@
@Override
public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
- log("releaseLocks", -1, -1, LockMode.NL, txnContext);
+ log("releaseLocks", -1, -1, LockMode.ANY, txnContext);
int jobId = txnContext.getJobId().getId();
Long jobSlot = jobIdSlotMap.get(jobId);
@@ -443,16 +437,17 @@
// we don't know the job, so there are no locks for it - we're done
return;
}
- long holder = jobArenaMgr.getLastHolder(jobSlot);
- while (holder != -1) {
- long resource = reqArenaMgr.getResourceId(holder);
- int dsId = resArenaMgr.getDatasetId(resource);
- int pkHashVal = resArenaMgr.getPkHashVal(resource);
- unlock(new DatasetId(dsId), pkHashVal, LockMode.NL, txnContext);
- holder = jobArenaMgr.getLastHolder(jobSlot);
- }
- jobArenaMgr.deallocate(jobSlot);
-
+ synchronized (jobArenaMgr) {
+ long holder = jobArenaMgr.getLastHolder(jobSlot);
+ while (holder != -1) {
+ long resource = reqArenaMgr.getResourceId(holder);
+ int dsId = resArenaMgr.getDatasetId(resource);
+ int pkHashVal = resArenaMgr.getPkHashVal(resource);
+ unlock(new DatasetId(dsId), pkHashVal, LockMode.ANY, txnContext);
+ holder = jobArenaMgr.getLastHolder(jobSlot);
+ }
+ jobArenaMgr.deallocate(jobSlot);
+ }
//System.err.println(table.append(new StringBuilder(), true).toString());
//System.out.println("jobArenaMgr " + jobArenaMgr.addTo(new Stats()).toString());
//System.out.println("resArenaMgr " + resArenaMgr.addTo(new Stats()).toString());
@@ -589,7 +584,7 @@
private boolean requestMatches(long holder, long jobSlot, byte lockMode) {
return jobSlot == reqArenaMgr.getJobSlot(holder)
- && (lockMode == LockMode.NL
+ && (lockMode == LockMode.ANY
|| lockMode == reqArenaMgr.getLockMode(holder));
}
@@ -612,7 +607,7 @@
void remove(long request, long resource, long job);
}
- Queue Waiter = new Queue() {
+ final Queue waiter = new Queue() {
public void add(long request, long resource, long job) {
long waiter = resArenaMgr.getFirstWaiter(resource);
reqArenaMgr.setNextRequest(request, -1);
@@ -643,7 +638,7 @@
}
};
- Queue Upgrader = new Queue() {
+ final Queue upgrader = new Queue() {
public void add(long request, long resource, long job) {
long upgrader = resArenaMgr.getFirstUpgrader(resource);
reqArenaMgr.setNextRequest(request, -1);
@@ -691,12 +686,6 @@
reqArenaMgr.setNextRequest(head, appendee);
}
- /**
- *
- * @param head
- * @param reqSlot
- * @return
- */
private long removeRequestFromQueueForSlot(long head, long reqSlot) {
long cur = head;
long prev = cur;
@@ -808,30 +797,34 @@
}
public StringBuilder append(StringBuilder sb) {
- sb.append(">>dump_begin\t>>----- [resTable] -----\n");
- table.append(sb);
- sb.append(">>dump_end\t>>----- [resTable] -----\n");
+ table.getAllLatches();
+ try {
+ sb.append(">>dump_begin\t>>----- [resTable] -----\n");
+ table.append(sb);
+ sb.append(">>dump_end\t>>----- [resTable] -----\n");
- sb.append(">>dump_begin\t>>----- [resArenaMgr] -----\n");
- resArenaMgr.append(sb);
- sb.append(">>dump_end\t>>----- [resArenaMgr] -----\n");
-
- sb.append(">>dump_begin\t>>----- [reqArenaMgr] -----\n");
- reqArenaMgr.append(sb);
- sb.append(">>dump_end\t>>----- [reqArenaMgr] -----\n");
-
- sb.append(">>dump_begin\t>>----- [jobIdSlotMap] -----\n");
- for(Integer i : jobIdSlotMap.keySet()) {
- sb.append(i).append(" : ");
- TypeUtil.Global.append(sb, jobIdSlotMap.get(i));
- sb.append("\n");
+ sb.append(">>dump_begin\t>>----- [resArenaMgr] -----\n");
+ resArenaMgr.append(sb);
+ sb.append(">>dump_end\t>>----- [resArenaMgr] -----\n");
+
+ sb.append(">>dump_begin\t>>----- [reqArenaMgr] -----\n");
+ reqArenaMgr.append(sb);
+ sb.append(">>dump_end\t>>----- [reqArenaMgr] -----\n");
+
+ sb.append(">>dump_begin\t>>----- [jobIdSlotMap] -----\n");
+ for(Integer i : jobIdSlotMap.keySet()) {
+ sb.append(i).append(" : ");
+ TypeUtil.Global.append(sb, jobIdSlotMap.get(i));
+ sb.append("\n");
+ }
+ sb.append(">>dump_end\t>>----- [jobIdSlotMap] -----\n");
+
+ sb.append(">>dump_begin\t>>----- [jobArenaMgr] -----\n");
+ jobArenaMgr.append(sb);
+ sb.append(">>dump_end\t>>----- [jobArenaMgr] -----\n");
+ } finally {
+ table.releaseAllLatches();
}
- sb.append(">>dump_end\t>>----- [jobIdSlotMap] -----\n");
-
- sb.append(">>dump_begin\t>>----- [jobArenaMgr] -----\n");
- jobArenaMgr.append(sb);
- sb.append(">>dump_end\t>>----- [jobArenaMgr] -----\n");
-
return sb;
}
@@ -921,6 +914,18 @@
return table[h % TABLE_SIZE];
}
+ public void getAllLatches() {
+ for (int i = 0; i < TABLE_SIZE; ++i) {
+ table[i].getLatch();
+ }
+ }
+
+ public void releaseAllLatches() {
+ for (int i = 0; i < TABLE_SIZE; ++i) {
+ table[i].releaseLatch();
+ }
+ }
+
public StringBuilder append(StringBuilder sb) {
return append(sb, false);
}
@@ -979,10 +984,9 @@
}
void log(String s) {
- if (! LOGGER.isLoggable(Level.FINEST)) {
- return;
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest(s + " " + toString());
}
- LOGGER.finest(s + " " + toString());
}
public String toString() {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index b17a787..98b16c0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -278,7 +278,7 @@
did = entityInfoManager.getDatasetId(entityInfo);
entityHashValue = entityInfoManager.getPKHashVal(entityInfo);
if (did == datasetId.getId() && entityHashValue != -1) {
- this.unlock(datasetId, entityHashValue, LockMode.NL, txnContext);
+ this.unlock(datasetId, entityHashValue, LockMode.ANY, txnContext);
}
entityInfo = prevEntityInfo;
@@ -2211,7 +2211,7 @@
tempDatasetIdObj.setId(logRecord.getDatasetId());
tempJobIdObj.setId(logRecord.getJobId());
txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj, false);
- unlock(tempDatasetIdObj, logRecord.getPKHashValue(), LockMode.NL, txnCtx);
+ unlock(tempDatasetIdObj, logRecord.getPKHashValue(), LockMode.ANY, txnCtx);
txnCtx.notifyOptracker(false);
} else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
tempJobIdObj.setId(logRecord.getJobId());
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index e14a0f8..1ed6fba 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -203,7 +203,7 @@
dsId.setId(logRecord.getDatasetId());
jId.setId(logRecord.getJobId());
txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jId, false);
- txnSubsystem.getLockManager().unlock(dsId, logRecord.getPKHashValue(), LockMode.NL, txnCtx);
+ txnSubsystem.getLockManager().unlock(dsId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx);
txnCtx.notifyOptracker(false);
} else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
jId.setId(logRecord.getJobId());
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
index c17689b..91f2535 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
@@ -31,11 +31,12 @@
public static class LockManagerConstants {
public static class LockMode {
- public static final byte NL = 0;
- public static final byte IS = 1;
- public static final byte IX = 2;
- public static final byte S = 3;
- public static final byte X = 4;
+ public static final byte ANY = -1;
+ public static final byte NL = 0;
+ public static final byte IS = 1;
+ public static final byte IX = 2;
+ public static final byte S = 3;
+ public static final byte X = 4;
public static byte intentionMode(byte mode) {
switch (mode) {
@@ -48,15 +49,15 @@
public static String toString(byte mode) {
switch (mode) {
- case NL: return "NL";
- case IS: return "IS";
- case IX: return "IX";
- case S: return "S";
- case X: return "X";
- default: throw new IllegalArgumentException("no such lock mode");
+ case ANY: return "ANY";
+ case NL: return "NL";
+ case IS: return "IS";
+ case IX: return "IX";
+ case S: return "S";
+ case X: return "X";
+ default: throw new IllegalArgumentException("no such lock mode");
}
}
}
}
-
}