fixes for issue333(aborting transaction failure), 334(non-existing storage directory exception in a non-new-universe), 337(true instant lock), and a bug in creating multiple log files
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
index 092f99c..5b55e9a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
@@ -18,12 +18,14 @@
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.service.locking.ILockManager;
import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
/**
* Secondary-index modifications do not require any locking.
@@ -40,8 +42,8 @@
protected final TransactionSubsystem txnSubsystem;
public SecondaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
- TransactionContext txnCtx, ILockManager lockManager,
- TransactionSubsystem txnSubsystem, long resourceId, byte resourceType, IndexOperation indexOp) {
+ TransactionContext txnCtx, ILockManager lockManager, TransactionSubsystem txnSubsystem, long resourceId,
+ byte resourceType, IndexOperation indexOp) {
super(datasetId, primaryKeyFields, txnCtx, lockManager);
this.resourceId = resourceId;
this.resourceType = resourceType;
@@ -60,8 +62,21 @@
IndexLogger logger = txnSubsystem.getTreeLoggerRepository().getIndexLogger(resourceId, resourceType);
int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
try {
+ IndexOperation effectiveOldOp;
+ if (resourceType == ResourceType.LSM_BTREE) {
+ LSMBTreeTupleReference lsmBTreeTuple = (LSMBTreeTupleReference) before;
+ if (before == null) {
+ effectiveOldOp = IndexOperation.NOOP;
+ } else if (lsmBTreeTuple != null && lsmBTreeTuple.isAntimatter()) {
+ effectiveOldOp = IndexOperation.DELETE;
+ } else {
+ effectiveOldOp = IndexOperation.INSERT;
+ }
+ } else {
+ effectiveOldOp = oldOp;
+ }
logger.generateLogRecord(txnSubsystem, txnCtx, datasetId.getId(), pkHash, resourceId, indexOp, after,
- oldOp, before);
+ effectiveOldOp, before);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 838dc6d..0a12ba1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -87,7 +87,8 @@
//#. load all local resources.
File rootDirFile = new File(this.rootDir);
if (!rootDirFile.exists()) {
- throw new HyracksDataException(rootDirFile.getAbsolutePath() + "doesn't exist.");
+ //rootDir may not exist if this node is not the metadata node and doesn't have any user data.
+ return;
}
FilenameFilter filter = new FilenameFilter() {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index b1a28ca..b3c7f8b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -617,8 +617,9 @@
throws ACIDException {
internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
}
-
- private void instantUnlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext) throws ACIDException {
+
+ private void instantUnlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext)
+ throws ACIDException {
internalUnlock(datasetId, entityHashValue, txnContext, true, false);
}
@@ -981,20 +982,189 @@
@Override
public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode, TransactionContext txnContext)
throws ACIDException {
- boolean isGranted = false;
- // try {
- // isGranted = internalTryLock(datasetId, entityHashValue, lockMode, txnContext);
- // return isGranted;
- // } finally {
- // if (isGranted) {
- // unlock(datasetId, entityHashValue, txnContext);
- // }
- // }
- isGranted = internalTryLock(datasetId, entityHashValue, lockMode, txnContext, true);
- if (isGranted) {
- instantUnlock(datasetId, entityHashValue, txnContext);
+ return internalInstantTryLock(datasetId, entityHashValue, lockMode, txnContext);
+ }
+
+ private boolean internalInstantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
+ TransactionContext txnContext) throws ACIDException {
+ DatasetLockInfo dLockInfo = null;
+ boolean isSuccess = true;
+
+ latchLockTable();
+ validateJob(txnContext);
+
+ if (IS_DEBUG_MODE) {
+ trackLockRequest("Requested", RequestType.INSTANT_TRY_LOCK, datasetId, entityHashValue, lockMode,
+ txnContext, dLockInfo, -1);
}
- return isGranted;
+
+ dLockInfo = datasetResourceHT.get(datasetId);
+
+ //#. if the datasetLockInfo doesn't exist in datasetResourceHT
+ if (dLockInfo == null || dLockInfo.isNoHolder()) {
+ if (IS_DEBUG_MODE) {
+ trackLockRequest("Granted", RequestType.INSTANT_TRY_LOCK, datasetId, entityHashValue, lockMode,
+ txnContext, dLockInfo, -1);
+ }
+
+ unlatchLockTable();
+ return true;
+ }
+
+ //#. the datasetLockInfo exists in datasetResourceHT.
+ //1. handle dataset-granule lock
+ //tryLockDatasetGranuleRevertOperation = 0;
+ isSuccess = instantTryLockDatasetGranule(datasetId, entityHashValue, lockMode, txnContext);
+ if (isSuccess && entityHashValue != -1) {
+ //2. handle entity-granule lock
+ isSuccess = instantTryLockEntityGranule(datasetId, entityHashValue, lockMode, txnContext);
+ }
+
+ if (IS_DEBUG_MODE) {
+ if (isSuccess) {
+ trackLockRequest("Granted", RequestType.INSTANT_TRY_LOCK, datasetId, entityHashValue, lockMode,
+ txnContext, dLockInfo, -1);
+ } else {
+ trackLockRequest("Failed", RequestType.INSTANT_TRY_LOCK, datasetId, entityHashValue, lockMode,
+ txnContext, dLockInfo, -1);
+ }
+ }
+
+ unlatchLockTable();
+
+ return isSuccess;
+ }
+
+ private boolean instantTryLockDatasetGranule(DatasetId datasetId, int entityHashValue, byte lockMode,
+ TransactionContext txnContext) throws ACIDException {
+ JobId jobId = txnContext.getJobId();
+ int jId = jobId.getId(); //int-type jobId
+ int dId = datasetId.getId(); //int-type datasetId
+ int waiterObjId;
+ int entityInfo = -1;
+ DatasetLockInfo dLockInfo;
+ JobInfo jobInfo;
+ boolean isUpgrade = false;
+ byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX;
+
+ dLockInfo = datasetResourceHT.get(datasetId);
+ jobInfo = jobHT.get(jobId);
+
+ //check duplicated call
+
+ //1. lock request causing duplicated upgrading requests from different threads in a same job
+ waiterObjId = dLockInfo.findUpgraderFromUpgraderList(jId, entityHashValue);
+ if (waiterObjId != -1) {
+ return false;
+ }
+
+ //2. lock request causing duplicated waiting requests from different threads in a same job
+ waiterObjId = dLockInfo.findWaiterFromWaiterList(jId, entityHashValue);
+ if (waiterObjId != -1) {
+ return false;
+ }
+
+ //3. lock request causing duplicated holding requests from different threads or a single thread in a same job
+ entityInfo = dLockInfo.findEntityInfoFromHolderList(jId, entityHashValue);
+ if (entityInfo == -1) { //new call from this job -> doesn't mean that eLockInfo doesn't exist since another thread might have create the eLockInfo already.
+
+ //return fail if any upgrader exists or upgrading lock mode is not compatible
+ if (dLockInfo.getFirstUpgrader() != -1 || dLockInfo.getFirstWaiter() != -1
+ || !dLockInfo.isCompatible(datasetLockMode)) {
+
+ //[Notice]
+ //There has been no same caller as (jId, dId, entityHashValue) triplet.
+ //But there could be the same caller as (jId, dId) pair.
+ //For example, two requests (J1, D1, E1) and (J1, D1, E2) are considered as duplicated call in dataset-granule perspective.
+ //Therefore, the above duplicated call case is covered in the following code.
+ //find the same dataset-granule lock request, that is, (J1, D1) pair in the above example.
+ if (jobInfo != null && jobInfo.isDatasetLockGranted(dId, LockMode.IS)) {
+ if (dLockInfo.isCompatible(datasetLockMode)) {
+ //this is duplicated call
+ return true;
+ }
+ }
+
+ return false;
+ }
+ } else {
+ isUpgrade = isLockUpgrade(entityInfoManager.getDatasetLockMode(entityInfo), lockMode);
+ if (isUpgrade) { //upgrade call
+ //return fail if any upgrader exists or upgrading lock mode is not compatible
+ if (dLockInfo.getFirstUpgrader() != -1 || !dLockInfo.isUpgradeCompatible(datasetLockMode, entityInfo)) {
+ return false;
+ }
+ }
+ /************************************
+ * else { //duplicated call
+ * //do nothing
+ * }
+ *************************************/
+ }
+
+ return true;
+ }
+
+ private boolean instantTryLockEntityGranule(DatasetId datasetId, int entityHashValue, byte lockMode,
+ TransactionContext txnContext) throws ACIDException {
+ JobId jobId = txnContext.getJobId();
+ int jId = jobId.getId(); //int-type jobId
+ int waiterObjId;
+ int eLockInfo = -1;
+ int entityInfo;
+ DatasetLockInfo dLockInfo;
+ boolean isUpgrade = false;
+
+ dLockInfo = datasetResourceHT.get(datasetId);
+ eLockInfo = dLockInfo.getEntityResourceHT().get(entityHashValue);
+
+ if (eLockInfo != -1) {
+ //check duplicated call
+
+ //1. lock request causing duplicated upgrading requests from different threads in a same job
+ waiterObjId = entityLockInfoManager.findUpgraderFromUpgraderList(eLockInfo, jId, entityHashValue);
+ if (waiterObjId != -1) {
+ return false;
+ }
+
+ //2. lock request causing duplicated waiting requests from different threads in a same job
+ waiterObjId = entityLockInfoManager.findWaiterFromWaiterList(eLockInfo, jId, entityHashValue);
+ if (waiterObjId != -1) {
+ return false;
+ }
+
+ //3. lock request causing duplicated holding requests from different threads or a single thread in a same job
+ entityInfo = entityLockInfoManager.findEntityInfoFromHolderList(eLockInfo, jId, entityHashValue);
+ if (entityInfo != -1) {//duplicated call or upgrader
+
+ isUpgrade = isLockUpgrade(entityInfoManager.getEntityLockMode(entityInfo), lockMode);
+ if (isUpgrade) {//upgrade call
+ //wait if any upgrader exists or upgrading lock mode is not compatible
+ if (entityLockInfoManager.getUpgrader(eLockInfo) != -1
+ || !entityLockInfoManager.isUpgradeCompatible(eLockInfo, lockMode, entityInfo)) {
+ return false;
+ }
+ }
+ /***************************
+ * else {//duplicated call
+ * //do nothing
+ * }
+ ****************************/
+ } else {//new call from this job, but still eLockInfo exists since other threads hold it or wait on it
+ if (entityLockInfoManager.getUpgrader(eLockInfo) != -1
+ || entityLockInfoManager.getFirstWaiter(eLockInfo) != -1
+ || !entityLockInfoManager.isCompatible(eLockInfo, lockMode)) {
+ return false;
+ }
+ }
+ }
+ /*******************************
+ * else {//eLockInfo doesn't exist, so this lock request is the first request and can be granted without waiting.
+ * //do nothing
+ * }
+ *********************************/
+
+ return true;
}
private boolean internalTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
@@ -1007,7 +1177,7 @@
DatasetLockInfo dLockInfo = null;
JobInfo jobInfo;
byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX;
- boolean isSuccess = false;
+ boolean isSuccess = true;
boolean doEscalate = false;
latchLockTable();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 4e96d2e..1e0e0ae 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -524,9 +524,9 @@
* This method resets the log page and is called by the log flusher thread
* after a page has been flushed to disk.
*/
- public void resetLogPage(long nextWritePosition, int pageIndex) throws IOException {
+ public void resetLogPage(long lsn, long nextWritePosition, int pageIndex) throws IOException {
- String filePath = LogUtil.getLogFilePath(logManagerProperties, getLogFileId(nextWritePosition));
+ String filePath = LogUtil.getLogFilePath(logManagerProperties, getLogFileId(lsn));
logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition),
logManagerProperties.getLogPageSize());
@@ -906,19 +906,6 @@
synchronized (logManager.getLogPage(pageToFlush)) {
- // lock the internal state of the log manager and create a
- // log file if necessary.
- int prevLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get());
- int nextLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get()
- + logManager.getLogManagerProperties().getLogPageSize());
- if (prevLogFileId != nextLogFileId) {
- String filePath = LogUtil.getLogFilePath(logManager.getLogManagerProperties(), nextLogFileId);
- FileUtil.createFileIfNotExists(filePath);
- logManager.getLogPage(pageToFlush).reset(
- LogUtil.getLogFilePath(logManager.getLogManagerProperties(), nextLogFileId), 0,
- logManager.getLogManagerProperties().getLogPageSize());
- }
-
// #. sleep during the groupCommitWaitTime
sleep(groupCommitWaitPeriod);
@@ -941,6 +928,13 @@
// the log page)
logManager.getLogPage(pageToFlush).flush();
+ // Map the log page to a new region in the log file.
+ long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition()
+ + logManager.getLogManagerProperties().getLogBufferSize();
+
+ logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1
+ + logManager.getLogManagerProperties().getLogBufferSize(), nextWritePosition, pageToFlush);
+
// increment the last flushed lsn and lastFlushedPage
logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize());
@@ -950,12 +944,6 @@
// reset the count to 1
logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_WRITER);
- // Map the log page to a new region in the log file.
- long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition()
- + logManager.getLogManagerProperties().getLogBufferSize();
-
- logManager.resetLogPage(nextWritePosition, pageToFlush);
-
// mark the page as ACTIVE
logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE);