Merge branch 'master' into westmann/locks
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
index 79221c1..13a80f1 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/java/edu/uci/ics/asterix/recordmanagergenerator/RecordManagerGeneratorMojo.java
@@ -67,7 +67,7 @@
String recordManagerTemplate = "RecordManager.java";
String arenaManagerTemplate = "ArenaManager.java";
- String[] supportTemplates = { "Stats.java", "AllocInfo.java", "TypeUtil.java" };
+ String[] supportTemplates = { "RecordManagerStats.java", "AllocInfo.java", "TypeUtil.java" };
private Map<String, RecordType> typeMap;
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
index 4abb3af..6df032e 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/ArenaManager.java
@@ -114,7 +114,7 @@
return append(new StringBuilder()).toString();
}
- public Stats addTo(Stats s) {
+ public RecordManagerStats addTo(RecordManagerStats s) {
s.arenas += noArenas;
for (int i = 0; i < noArenas; ++i) {
arenas[i].addTo(s);
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
index 9afc673..39a344e 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManager.java
@@ -206,7 +206,7 @@
return append(new StringBuilder()).toString();
}
- public Stats addTo(Stats s) {
+ public RecordManagerStats addTo(RecordManagerStats s) {
final int size = buffers.size();
s.buffers += size;
s.slots += size * NO_SLOTS;
@@ -302,7 +302,7 @@
return append(new StringBuilder()).toString();
}
- public void addTo(Stats s) {
+ public void addTo(RecordManagerStats s) {
if (isInitialized()) {
s.items += occupiedSlots;
}
diff --git a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/Stats.java b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManagerStats.java
similarity index 93%
rename from asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/Stats.java
rename to asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManagerStats.java
index c136101..5842ed6 100644
--- a/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/Stats.java
+++ b/asterix-maven-plugins/record-manager-generator-maven-plugin/src/main/resources/RecordManagerStats.java
@@ -1,6 +1,6 @@
package @PACKAGE@;
-public class Stats {
+public class RecordManagerStats {
int arenas = 0;
int buffers = 0;
int slots = 0;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 97c8235..9e6272e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -383,6 +383,7 @@
ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
.getDatasetId().getId()) : new BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager,
index.getDatasetId().getId());
+ final String path = file.getFile().getPath();
if (create) {
lsmBtree = LSMBTreeUtils.createLSMTree(
virtualBufferCaches,
@@ -405,11 +406,11 @@
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
localResourceMetadata, LocalResource.LSMBTreeResource);
ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
- localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, file.getFile()
- .getPath(), 0));
+ localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, path, 0));
indexLifecycleManager.register(resourceID, lsmBtree);
} else {
- resourceID = localResourceRepository.getResourceByName(file.getFile().getPath()).getResourceId();
+ final LocalResource resource = localResourceRepository.getResourceByName(path);
+ resourceID = resource.getResourceId();
lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resourceID);
if (lsmBtree == null) {
lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider,
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
index 9957edf..8de7ca5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
@@ -36,9 +36,8 @@
@Override
public boolean proceed(ITupleReference tuple) throws HyracksDataException {
- int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
try {
- return lockManager.tryLock(datasetId, pkHash, LockMode.S, txnCtx);
+ return lockManager.tryLock(datasetId, -1, LockMode.S, txnCtx);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
@@ -46,9 +45,8 @@
@Override
public void reconcile(ITupleReference tuple) throws HyracksDataException {
- int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
try {
- lockManager.lock(datasetId, pkHash, LockMode.S, txnCtx);
+ lockManager.lock(datasetId, -1, LockMode.S, txnCtx);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
@@ -56,12 +54,7 @@
@Override
public void cancel(ITupleReference tuple) throws HyracksDataException {
- int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
- try {
- lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx);
- } catch (ACIDException e) {
- throw new HyracksDataException(e);
- }
+ //no op
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index 6c6b65f..51307e6 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -42,9 +42,11 @@
*/
public class ConcurrentLockManager implements ILockManager, ILifeCycleComponent {
- private static final Logger LOGGER = Logger.getLogger(ConcurrentLockManager.class.getName());
-
- public static final boolean IS_DEBUG_MODE = false;//true
+ private static final Logger LOGGER
+ = Logger.getLogger(ConcurrentLockManager.class.getName());
+ private static final Level LVL = Level.FINER;
+
+ public static final boolean DEBUG_MODE = false;//true
private TransactionSubsystem txnSubsystem;
private ResourceGroupTable table;
@@ -53,11 +55,13 @@
private JobArenaManager jobArenaMgr;
private ConcurrentHashMap<Integer, Long> jobIdSlotMap;
private ThreadLocal<DatasetLockCache> dsLockCache;
-
+
+ private LockManagerStats stats = new LockManagerStats(10000);
+
enum LockAction {
ERR(false, false),
GET(false, false),
- UPD(false, true), // special version of GET that updates the max lock mode
+ UPD(false, true), // version of GET that updates the max lock mode
WAIT(true, false),
CONV(true, true) // convert (upgrade) a lock (e.g. from S to X)
;
@@ -107,22 +111,22 @@
public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
log("lock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+ stats.lock();
final int dsId = datasetId.getId();
final int jobId = txnContext.getJobId().getId();
if (entityHashValue != -1) {
- // get the intention lock on the dataset, if we want to lock an individual item
- final byte dsLockMode = LockMode.intentionMode(lockMode);
- if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
- lock(datasetId, -1, dsLockMode, txnContext);
- dsLockCache.get().put(jobId, dsId, dsLockMode);
+ lock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext);
+ } else {
+ if (dsLockCache.get().contains(jobId, dsId, lockMode)) {
+ return;
}
}
final long jobSlot = findOrAllocJobSlot(jobId);
- final ResourceGroup group = table.get(datasetId, entityHashValue);
+ final ResourceGroup group = table.get(dsId, entityHashValue);
group.getLatch();
try {
validateJob(txnContext);
@@ -149,6 +153,9 @@
throw new IllegalStateException();
}
}
+ if (entityHashValue == -1) {
+ dsLockCache.get().put(jobId, dsId, lockMode);
+ }
} finally {
group.releaseLatch();
}
@@ -208,20 +215,18 @@
public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
log("instantLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+ stats.instantLock();
final int dsId = datasetId.getId();
final int jobId = txnContext.getJobId().getId();
if (entityHashValue != -1) {
- // get the intention lock on the dataset, if we want to lock an individual item
- final byte dsLockMode = LockMode.intentionMode(lockMode);
- if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
- lock(datasetId, -1, dsLockMode, txnContext);
- dsLockCache.get().put(jobId, dsId, dsLockMode);
- }
+ lock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext);
+ } else {
+ throw new UnsupportedOperationException("instant locks are not supported on datasets");
}
- final ResourceGroup group = table.get(datasetId, entityHashValue);
+ final ResourceGroup group = table.get(dsId, entityHashValue);
if (group.firstResourceIndex.get() == -1l) {
validateJob(txnContext);
// if we do not have a resource in the group, we know that the
@@ -265,6 +270,7 @@
} finally {
if (reqSlot != -1) {
// deallocate request, if we allocated one earlier
+ if (DEBUG_MODE) LOGGER.finer("del req slot " + TypeUtil.Global.toString(reqSlot));
reqArenaMgr.deallocate(reqSlot);
}
group.releaseLatch();
@@ -275,24 +281,24 @@
public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
log("tryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+ stats.tryLock();
final int dsId = datasetId.getId();
final int jobId = txnContext.getJobId().getId();
if (entityHashValue != -1) {
- // get the intention lock on the dataset, if we want to lock an individual item
- final byte dsLockMode = LockMode.intentionMode(lockMode);
- if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
- if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
- return false;
- }
- dsLockCache.get().put(jobId, dsId, dsLockMode);
+ if (! tryLock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext)) {
+ return false;
+ }
+ } else {
+ if (dsLockCache.get().contains(jobId, dsId, lockMode)) {
+ return true;
}
}
final long jobSlot = findOrAllocJobSlot(jobId);
- final ResourceGroup group = table.get(datasetId, entityHashValue);
+ final ResourceGroup group = table.get(dsId, entityHashValue);
group.getLatch();
try {
@@ -300,7 +306,7 @@
final long resSlot = findOrAllocResourceSlot(group, dsId, entityHashValue);
final long reqSlot = allocRequestSlot(resSlot, jobSlot, lockMode);
-
+
final LockAction act = determineLockAction(resSlot, jobSlot, lockMode);
switch (act) {
case UPD:
@@ -308,6 +314,9 @@
// no break
case GET:
addHolder(reqSlot, resSlot, jobSlot);
+ if (entityHashValue == -1) {
+ dsLockCache.get().put(jobId, dsId, lockMode);
+ }
return true;
case WAIT:
case CONV:
@@ -318,7 +327,7 @@
} finally {
group.releaseLatch();
}
-
+
// if we did acquire the dataset lock, but not the entity lock, we keep
// it anyway and clean it up at the end of the job
}
@@ -327,22 +336,20 @@
public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
ITransactionContext txnContext) throws ACIDException {
log("instantTryLock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+ stats.instantTryLock();
final int dsId = datasetId.getId();
final int jobId = txnContext.getJobId().getId();
if (entityHashValue != -1) {
- // get the intention lock on the dataset, if we want to lock an individual item
- final byte dsLockMode = LockMode.intentionMode(lockMode);
- if (! dsLockCache.get().contains(jobId, dsId, dsLockMode)) {
- if (! tryLock(datasetId, -1, dsLockMode, txnContext)) {
- return false;
- }
- dsLockCache.get().put(jobId, dsId, dsLockMode);
+ if (! tryLock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext)) {
+ return false;
}
+ } else {
+ throw new UnsupportedOperationException("instant locks are not supported on datasets");
}
- final ResourceGroup group = table.get(datasetId, entityHashValue);
+ final ResourceGroup group = table.get(dsId, entityHashValue);
if (group.firstResourceIndex.get() == -1l) {
validateJob(txnContext);
// if we do not have a resource in the group, we know that the
@@ -382,23 +389,29 @@
@Override
public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext) throws ACIDException {
log("unlock", datasetId.getId(), entityHashValue, lockMode, txnContext);
+ final int jobId = txnContext.getJobId().getId();
+ final long jobSlot = jobIdSlotMap.get(jobId);
+ final int dsId = datasetId.getId();
+ unlock(dsId, entityHashValue, lockMode, jobSlot);
+ }
- ResourceGroup group = table.get(datasetId, entityHashValue);
+ private void unlock(int dsId, int entityHashValue, byte lockMode, long jobSlot) throws ACIDException {
+ log("unlock", dsId, entityHashValue, lockMode, null);
+ stats.unlock();
+
+ ResourceGroup group = table.get(dsId, entityHashValue);
group.getLatch();
try {
- int dsId = datasetId.getId();
long resource = findResourceInGroup(group, dsId, entityHashValue);
if (resource < 0) {
throw new IllegalStateException("resource (" + dsId + ", " + entityHashValue + ") not found");
}
- int jobId = txnContext.getJobId().getId();
- long jobSlot = findOrAllocJobSlot(jobId);
-
long holder = removeLastHolder(resource, jobSlot, lockMode);
// deallocate request
+ if (DEBUG_MODE) LOGGER.finer("del req slot " + TypeUtil.Global.toString(holder));
reqArenaMgr.deallocate(holder);
// deallocate resource or fix max lock mode
if (resourceNotUsed(resource)) {
@@ -411,6 +424,7 @@
}
resArenaMgr.setNext(prev, resArenaMgr.getNext(resource));
}
+ if (DEBUG_MODE) LOGGER.finer("del res slot " + TypeUtil.Global.toString(resource));
resArenaMgr.deallocate(resource);
} else {
final int oldMaxMode = resArenaMgr.getMaxMode(resource);
@@ -432,6 +446,7 @@
@Override
public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
log("releaseLocks", -1, -1, LockMode.ANY, txnContext);
+ stats.releaseLocks();
int jobId = txnContext.getJobId().getId();
Long jobSlot = jobIdSlotMap.get(jobId);
@@ -439,6 +454,12 @@
// we don't know the job, so there are no locks for it - we're done
return;
}
+ //System.err.println(table.append(new StringBuilder(), true).toString());
+ if (LOGGER.isLoggable(LVL)) {
+ LOGGER.log(LVL, "jobArenaMgr " + jobArenaMgr.addTo(new RecordManagerStats()).toString());
+ LOGGER.log(LVL, "resArenaMgr " + resArenaMgr.addTo(new RecordManagerStats()).toString());
+ LOGGER.log(LVL, "reqArenaMgr " + reqArenaMgr.addTo(new RecordManagerStats()).toString());
+ }
long holder;
synchronized (jobArenaMgr) {
holder = jobArenaMgr.getLastHolder(jobSlot);
@@ -447,28 +468,30 @@
long resource = reqArenaMgr.getResourceId(holder);
int dsId = resArenaMgr.getDatasetId(resource);
int pkHashVal = resArenaMgr.getPkHashVal(resource);
- unlock(new DatasetId(dsId), pkHashVal, LockMode.ANY, txnContext);
+ unlock(dsId, pkHashVal, LockMode.ANY, jobSlot);
synchronized (jobArenaMgr) {
holder = jobArenaMgr.getLastHolder(jobSlot);
}
}
+ if (DEBUG_MODE) LOGGER.finer("del job slot " + TypeUtil.Global.toString(jobSlot));
jobArenaMgr.deallocate(jobSlot);
- //System.err.println(table.append(new StringBuilder(), true).toString());
- //System.out.println("jobArenaMgr " + jobArenaMgr.addTo(new Stats()).toString());
- //System.out.println("resArenaMgr " + resArenaMgr.addTo(new Stats()).toString());
- //System.out.println("reqArenaMgr " + reqArenaMgr.addTo(new Stats()).toString());
+ jobIdSlotMap.remove(jobId);
+ stats.logCounters(LOGGER, Level.INFO, true);
+ //LOGGER.info(toString());
}
private long findOrAllocJobSlot(int jobId) {
Long jobSlot = jobIdSlotMap.get(jobId);
if (jobSlot == null) {
jobSlot = new Long(jobArenaMgr.allocate());
+ if (DEBUG_MODE) LOGGER.finer("new job slot " + TypeUtil.Global.toString(jobSlot) + " (" + jobId + ")");
jobArenaMgr.setJobId(jobSlot, jobId);
Long oldSlot = jobIdSlotMap.putIfAbsent(jobId, jobSlot);
if (oldSlot != null) {
// if another thread allocated a slot for this jobId between
// get(..) and putIfAbsent(..), we'll use that slot and
// deallocate the one we allocated
+ if (DEBUG_MODE) LOGGER.finer("del job slot " + TypeUtil.Global.toString(jobSlot) + " due to conflict");
jobArenaMgr.deallocate(jobSlot);
jobSlot = oldSlot;
}
@@ -487,6 +510,9 @@
resArenaMgr.setPkHashVal(resSlot, entityHashValue);
resArenaMgr.setNext(resSlot, group.firstResourceIndex.get());
group.firstResourceIndex.set(resSlot);
+ if (DEBUG_MODE) LOGGER.finer("new res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue + ")");
+ } else {
+ if (DEBUG_MODE) LOGGER.finer("fnd res slot " + TypeUtil.Global.toString(resSlot) + " (" + dsId + ", " + entityHashValue + ")");
}
return resSlot;
}
@@ -496,6 +522,12 @@
reqArenaMgr.setResourceId(reqSlot, resSlot);
reqArenaMgr.setLockMode(reqSlot, lockMode); // lock mode is a byte!!
reqArenaMgr.setJobSlot(reqSlot, jobSlot);
+ if (DEBUG_MODE) {
+ LOGGER.finer("new req slot " + TypeUtil.Global.toString(reqSlot)
+ + " (" + TypeUtil.Global.toString(resSlot)
+ + ", " + TypeUtil.Global.toString(jobSlot)
+ + ", " + LockMode.toString(lockMode) + ")");
+ }
return reqSlot;
}
@@ -538,6 +570,7 @@
}
private long findResourceInGroup(ResourceGroup group, int dsId, int entityHashValue) {
+ stats.logCounters(LOGGER, Level.INFO, false);
long resSlot = group.firstResourceIndex.get();
while (resSlot != -1) {
// either we already have a lock on this resource or we have a
@@ -551,7 +584,7 @@
}
return -1;
}
-
+
private void addHolder(long request, long resource, long job) {
long lastHolder = resArenaMgr.getLastHolder(resource);
reqArenaMgr.setNextRequest(request, lastHolder);
@@ -766,7 +799,7 @@
}
private void log(String string, int id, int entityHashValue, byte lockMode, ITransactionContext txnContext) {
- if (! LOGGER.isLoggable(Level.FINEST)) {
+ if (! LOGGER.isLoggable(LVL)) {
return;
}
StringBuilder sb = new StringBuilder();
@@ -784,7 +817,7 @@
sb.append(" , jobId : ").append(txnContext.getJobId());
}
sb.append(" }");
- LOGGER.finest(sb.toString());
+ LOGGER.log(LVL, sb.toString());
}
private void validateJob(ITransactionContext txnContext) throws ACIDException {
@@ -855,6 +888,7 @@
os.write(toString().getBytes());
os.flush();
} catch (IOException e) {
+ LOGGER.warning("caught exception when dumping state of ConcurrentLockManager: " + e.toString());
//ignore
}
}
@@ -912,9 +946,9 @@
}
}
- ResourceGroup get(DatasetId dId, int entityHashValue) {
+ ResourceGroup get(int dId, int entityHashValue) {
// TODO ensure good properties of hash function
- int h = Math.abs(dId.getId() ^ entityHashValue);
+ int h = Math.abs(dId ^ entityHashValue);
if (h < 0) h = 0;
return table[h % TABLE_SIZE];
}
@@ -979,6 +1013,7 @@
try {
condition.await();
} catch (InterruptedException e) {
+ LOGGER.finer("interrupted while wating on ResourceGroup");
throw new ACIDException(txnContext, "interrupted", e);
}
}
@@ -989,8 +1024,8 @@
}
void log(String s) {
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest(s + " " + toString());
+ if (LOGGER.isLoggable(LVL)) {
+ LOGGER.log(LVL, s + " " + toString());
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DummyLockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DummyLockManager.java
new file mode 100644
index 0000000..691ae16
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DummyLockManager.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.DatasetId;
+import edu.uci.ics.asterix.common.transactions.ILockManager;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+
+
+/**
+ * A dummy implementation of the ILockManager interface. It assumes that all
+ * requests are successful. It can be used to for jobs that are known to be
+ * conflict free, but it'll yield terrible results if there are conflicts.
+ *
+ * @author tillw
+ *
+ */
+public class DummyLockManager implements ILockManager, ILifeCycleComponent {
+
+ public DummyLockManager(TransactionSubsystem transactionSubsystem) {
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
+ }
+
+ @Override
+ public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+ throws ACIDException {
+ }
+
+ @Override
+ public void releaseLocks(ITransactionContext txnContext) throws ACIDException {
+ }
+
+ @Override
+ public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+ throws ACIDException {
+ }
+
+ @Override
+ public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext context)
+ throws ACIDException {
+ }
+
+ @Override
+ public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext context)
+ throws ACIDException {
+ return true;
+ }
+
+ @Override
+ public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
+ ITransactionContext txnContext) throws ACIDException {
+ return true;
+ }
+
+ @Override
+ public String prettyPrint() throws ACIDException {
+ return "DummyLockManager";
+ }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerStats.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerStats.java
new file mode 100644
index 0000000..14d1775
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerStats.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.service.locking;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+final class LockManagerStats {
+ private final int loggingPeriod;
+
+ private final AtomicLong lCnt = new AtomicLong();
+ private final AtomicLong ilCnt = new AtomicLong();
+ private final AtomicLong tlCnt = new AtomicLong();
+ private final AtomicLong itlCnt = new AtomicLong();
+ private final AtomicLong ulCnt = new AtomicLong();
+ private final AtomicLong rlCnt = new AtomicLong();
+
+ LockManagerStats(int loggingPeriod) {
+ this.loggingPeriod = loggingPeriod;
+ }
+
+ final void lock() { lCnt.incrementAndGet(); }
+ final void instantLock() { ilCnt.incrementAndGet(); }
+ final void tryLock() { tlCnt.incrementAndGet(); }
+ final void instantTryLock() { itlCnt.incrementAndGet(); }
+ final void unlock() { ulCnt.incrementAndGet(); }
+ final void releaseLocks() { rlCnt.incrementAndGet(); }
+
+ final int requestSum() {
+ return lCnt.intValue() + ilCnt.intValue() + tlCnt.intValue()
+ + itlCnt.intValue() + ulCnt.intValue() + rlCnt.intValue();
+ }
+
+ final StringBuilder append(StringBuilder sb) {
+ sb.append("{")
+ .append(" lock : ").append(lCnt)
+ .append(", instantLock : ").append(ilCnt)
+ .append(", tryLock : ").append(tlCnt)
+ .append(", instantTryLock : ").append(itlCnt)
+ .append(", unlock : ").append(ulCnt)
+ .append(", releaseLocks : ").append(rlCnt)
+ .append(" }");
+ return sb;
+ }
+
+ @Override
+ public String toString() {
+ return append(new StringBuilder()).toString();
+ }
+
+ final void logCounters(final Logger logger, final Level lvl, boolean always) {
+ if (logger.isLoggable(lvl)
+ && (always || requestSum() % loggingPeriod == 0)) {
+ logger.log(lvl, toString());
+ }
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index 1ed6fba..cec6cd3 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -50,6 +50,8 @@
private final LinkedBlockingQueue<ILogRecord> syncCommitQ;
private FileChannel fileChannel;
private boolean stop;
+ private DatasetId reusableDsId;
+ private JobId reusableJobId;
public LogPage(TransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLSN) {
this.txnSubsystem = txnSubsystem;
@@ -64,6 +66,8 @@
flushOffset = 0;
isLastPage = false;
syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
+ reusableDsId = new DatasetId(-1);
+ reusableJobId = new JobId(-1);
}
////////////////////////////////////
@@ -193,21 +197,19 @@
if (endOffset > beginOffset) {
logPageReader.initializeScan(beginOffset, endOffset);
- DatasetId dsId = new DatasetId(-1);
- JobId jId = new JobId(-1);
ITransactionContext txnCtx = null;
LogRecord logRecord = logPageReader.next();
while (logRecord != null) {
if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
- dsId.setId(logRecord.getDatasetId());
- jId.setId(logRecord.getJobId());
- txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jId, false);
- txnSubsystem.getLockManager().unlock(dsId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx);
+ reusableJobId.setId(logRecord.getJobId());
+ txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId, false);
+ reusableDsId.setId(logRecord.getDatasetId());
+ txnSubsystem.getLockManager().unlock(reusableDsId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx);
txnCtx.notifyOptracker(false);
} else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
- jId.setId(logRecord.getJobId());
- txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jId, false);
+ reusableJobId.setId(logRecord.getJobId());
+ txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId, false);
txnCtx.notifyOptracker(true);
notifyJobTerminator();
}