changes to deal with entity level commit and setting the lastLSN to the modified LSMIndexes
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@819 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ILockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ILockManager.java
index 207ee58..1341cc1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ILockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/ILockManager.java
@@ -69,6 +69,15 @@
public void unlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext) throws ACIDException;
/**
+ *
+ * @param datasetId
+ * @param entityHashValue
+ * @param txnContext
+ * @throws ACIDException TODO
+ */
+ public void unlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext, boolean commitFlag) throws ACIDException;
+
+ /**
* Call to lock and unlock a specific resource in a specific lock mode
* @param datasetId
* @param entityHashValue
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 0691504..db603da 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -24,6 +24,9 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager.TransactionState;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
@@ -39,7 +42,7 @@
* @author pouria, kisskys
*/
-public class LockManager implements ILockManager{
+public class LockManager implements ILockManager {
private static final Logger LOGGER = Logger.getLogger(LockManager.class.getName());
private static final int LOCK_MANAGER_INITIAL_HASH_TABLE_SIZE = 50;// do we need this?
@@ -67,11 +70,10 @@
private LockRequestTracker lockRequestTracker; //for debugging
private ConsecutiveWakeupContext consecutiveWakeupContext;
-
- //----------------------------------------------------------
- // ITransactionManager Members
- //----------------------------------------------------------
- private Map<JobId, TransactionContext> transactionContextRepository = new HashMap<JobId, TransactionContext>();
+
+ //TODO
+ //This code should be taken care properly when there is a way to avoid doubling memory space for txnIds.
+ private LogicalLogLocator logicalLogLocator;
public LockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
this.txnSubsystem = txnSubsystem;
@@ -88,6 +90,8 @@
this.tempDatasetIdObj = new DatasetId(0);
this.consecutiveWakeupContext = new ConsecutiveWakeupContext();
+ this.logicalLogLocator = LogUtil.getDummyLogicalLogLocator(txnSubsystem.getLogManager());
+
if (IS_DEBUG_MODE) {
this.lockRequestTracker = new LockRequestTracker();
}
@@ -272,7 +276,7 @@
jobInfo = new JobInfo(entityInfoManager, lockWaiterManager, txnContext);
jobHT.put(jobId, jobInfo);
}
-
+
//wait if any upgrader exists or upgrading lock mode is not compatible
if (dLockInfo.getFirstUpgrader() != -1 || dLockInfo.getFirstWaiter() != -1
|| !dLockInfo.isCompatible(datasetLockMode)) {
@@ -298,10 +302,9 @@
}
//add entityInfo to JobInfo's holding-resource list
jobInfo.addHoldingResource(entityInfo);
-
+
return entityInfo;
- }
- else {
+ } else {
//considered as upgrader
waiterCount = handleLockWaiter(dLockInfo, -1, entityInfo, true, true, txnContext, jobInfo, -1);
if (waiterCount > 0) {
@@ -320,7 +323,7 @@
}
}
/////////////////////////////////////////////////////////////////////////////////////////////
-
+
waiterCount = handleLockWaiter(dLockInfo, -1, entityInfo, false, true, txnContext, jobInfo, -1);
} else {
waiterCount = 1;
@@ -367,11 +370,11 @@
} else { //duplicated call
entityInfoManager.increaseDatasetLockCount(entityInfo);
datasetLockMode = entityInfoManager.getDatasetLockMode(entityInfo);
-
+
if (entityHashValue == -1) { //dataset-granule
dLockInfo.increaseLockCount(datasetLockMode);
} else { //entity-granule
- datasetLockMode = datasetLockMode == LockMode.S? LockMode.IS: LockMode.IX;
+ datasetLockMode = datasetLockMode == LockMode.S ? LockMode.IS : LockMode.IX;
dLockInfo.increaseLockCount(datasetLockMode);
}
}
@@ -441,8 +444,8 @@
//wait if any upgrader exists or upgrading lock mode is not compatible
if (entityLockInfoManager.getUpgrader(eLockInfo) != -1
|| !entityLockInfoManager.isUpgradeCompatible(eLockInfo, lockMode, entityInfo)) {
- waiterCount = handleLockWaiter(dLockInfo, eLockInfo, entityInfo, true, false, txnContext, jobInfo,
- -1);
+ waiterCount = handleLockWaiter(dLockInfo, eLockInfo, entityInfo, true, false, txnContext,
+ jobInfo, -1);
} else {
waiterCount = 1;
}
@@ -466,7 +469,8 @@
if (entityLockInfoManager.getUpgrader(eLockInfo) != -1
|| entityLockInfoManager.getFirstWaiter(eLockInfo) != -1
|| !entityLockInfoManager.isCompatible(eLockInfo, lockMode)) {
- waiterCount = handleLockWaiter(dLockInfo, eLockInfo, entityInfo, false, false, txnContext, jobInfo, -1);
+ waiterCount = handleLockWaiter(dLockInfo, eLockInfo, entityInfo, false, false, txnContext, jobInfo,
+ -1);
} else {
waiterCount = 1;
}
@@ -488,6 +492,17 @@
@Override
public void unlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext) throws ACIDException {
+ internalUnlock(datasetId, entityHashValue, txnContext, false);
+ }
+
+ @Override
+ public void unlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext, boolean commitFlag)
+ throws ACIDException {
+ internalUnlock(datasetId, entityHashValue, txnContext, commitFlag);
+ }
+
+ private void internalUnlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext,
+ boolean commitFlag) throws ACIDException {
JobId jobId = txnContext.getJobId();
int eLockInfo = -1;
DatasetLockInfo dLockInfo = null;
@@ -547,6 +562,18 @@
int waitingEntityInfo;
LockWaiter waiterObj;
+ //TODO
+ //This code should be taken care properly when there is a way to avoid doubling memory space for txnIds.
+ //This commit log is written here in order to avoid increasing the memory space for managing transactionIds
+ if (commitFlag) {
+ if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) {
+ txnSubsystem.getLogManager().log(LogType.COMMIT, txnContext, datasetId.getId(), entityHashValue,
+ -1, (byte) 0, 0, null, null, logicalLogLocator);
+ }
+
+ txnContext.setLastLSNToIndexes(logicalLogLocator.getLsn());
+ }
+
//1) wake up waiters and remove holder
//wake up waiters of dataset-granule lock
wakeUpDatasetLockWaiters(dLockInfo);
@@ -625,7 +652,7 @@
JobInfo jobInfo = jobHT.get(jobId);
if (jobInfo == null) {
unlatchLockTable();
- return ;
+ return;
}
//remove waiterObj of JobInfo
@@ -649,12 +676,12 @@
+ entityInfoManager.getJobId(entityInfo) + "'s lock request!!!");
}
}
-
+
//1. remove from waiter(or upgrader)'s list of dLockInfo or eLockInfo.
did = entityInfoManager.getDatasetId(entityInfo);
tempDatasetIdObj.setId(did);
dLockInfo = datasetResourceHT.get(tempDatasetIdObj);
-
+
if (waiterObj.isWaitingOnEntityLock()) {
entityHashValue = entityInfoManager.getPKHashVal(entityInfo);
eLockInfo = dLockInfo.getEntityResourceHT().get(entityHashValue);
@@ -670,7 +697,7 @@
dLockInfo.removeUpgrader(waiterObjId);
}
}
-
+
//2. wake-up waiters
latchWaitNotify();
synchronized (waiterObj) {
@@ -682,7 +709,7 @@
}
waiterObj.notifyAll();
}
-
+
//3. deallocate waiterObj
lockWaiterManager.deallocate(waiterObjId);
@@ -711,10 +738,10 @@
datasetLockCount = entityInfoManager.getDatasetLockCount(entityInfo);
if (datasetLockCount != 0) {
dLockInfo.decreaseLockCount(lockMode, datasetLockCount);
-
+
//wakeup waiters of datasetLock and remove holder from datasetLockInfo
wakeUpDatasetLockWaiters(dLockInfo);
-
+
//remove the holder from datasetLockInfo only if the lock is dataset-granule lock.
//--> this also removes the holding resource from jobInfo
//(Because the IX and IS lock's holders are handled implicitly,
@@ -726,7 +753,7 @@
lockMode = entityInfoManager.getDatasetLockMode(entityInfo);
lockMode = lockMode == LockMode.S ? LockMode.IS : LockMode.IX;
datasetLockCount = entityInfoManager.getDatasetLockCount(entityInfo);
-
+
if (datasetLockCount != 0) {
dLockInfo.decreaseLockCount(lockMode, datasetLockCount);
}
@@ -740,7 +767,7 @@
System.out.println("eLockInfo:" + eLockInfo);
}
}
-
+
if (entityLockCount != 0) {
entityLockInfoManager.decreaseLockCount(eLockInfo, lockMode, (short) entityLockCount);
}
@@ -750,10 +777,10 @@
wakeUpDatasetLockWaiters(dLockInfo);
}
- if (entityLockCount != 0) {
+ if (entityLockCount != 0) {
//wakeup waiters of entityLock
wakeUpEntityLockWaiters(eLockInfo);
-
+
//remove the holder from entityLockInfo
//--> this also removes the holding resource from jobInfo
entityLockInfoManager.removeHolder(eLockInfo, entityInfo, jobInfo);
@@ -782,7 +809,7 @@
//remove JobInfo
jobHT.remove(jobId);
-
+
if (existWaiter) {
txnContext.setStatus(TransactionContext.TIMED_OUT_STATUS);
txnContext.setTxnState(TransactionState.ABORTED);
@@ -991,7 +1018,7 @@
}
return null;
}
-
+
public String getRequestHistoryForAllJobs() {
if (IS_DEBUG_MODE) {
return lockRequestTracker.getRequestHistoryForAllJobs();
@@ -1012,7 +1039,7 @@
//see tryLockDatasetGranule() function to know the revert operation
switch (tryLockDatasetGranuleRevertOperation) {
-
+
case 1://[revertOperation1]: reverting 'adding a holder'
if (entityHashValue == -1) {
@@ -1047,7 +1074,7 @@
if (entityHashValue == -1) { //dataset-granule
dLockInfo.decreaseLockCount(datasetLockMode);
} else { //entity-granule
- datasetLockMode = datasetLockMode == LockMode.S? LockMode.IS: LockMode.IX;
+ datasetLockMode = datasetLockMode == LockMode.S ? LockMode.IS : LockMode.IX;
dLockInfo.decreaseLockCount(datasetLockMode);
}
@@ -1099,11 +1126,11 @@
jobHT.put(jobId, jobInfo);
}
//////////////////////////////////////////////////////////////////////////////////////
-
+
//return fail if any upgrader exists or upgrading lock mode is not compatible
if (dLockInfo.getFirstUpgrader() != -1 || dLockInfo.getFirstWaiter() != -1
|| !dLockInfo.isCompatible(datasetLockMode)) {
-
+
//[Notice]
//There has been no same caller as (jId, dId, entityHashValue) triplet.
//But there could be the same caller as (jId, dId) pair.
@@ -1123,19 +1150,19 @@
}
//add entityInfo to JobInfo's holding-resource list
jobInfo.addHoldingResource(entityInfo);
-
+
tryLockDatasetGranuleRevertOperation = 1;
-
+
return entityInfo;
}
}
-
+
//revert [part of revertOperation1] before return
if (jobInfo.getLastHoldingResource() == -1 && jobInfo.getFirstWaitingResource() == -1) {
jobHT.remove(jobId);
}
entityInfoManager.deallocate(entityInfo);
-
+
return -2;
}
@@ -1190,14 +1217,14 @@
//[revertOperation3]
entityInfoManager.increaseDatasetLockCount(entityInfo);
datasetLockMode = entityInfoManager.getDatasetLockMode(entityInfo);
-
+
if (entityHashValue == -1) { //dataset-granule
dLockInfo.increaseLockCount(datasetLockMode);
} else { //entity-granule
- datasetLockMode = datasetLockMode == LockMode.S? LockMode.IS: LockMode.IX;
+ datasetLockMode = datasetLockMode == LockMode.S ? LockMode.IS : LockMode.IX;
dLockInfo.increaseLockCount(datasetLockMode);
}
-
+
tryLockDatasetGranuleRevertOperation = 3;
//////////////////////////////////////////////////////////////////////////////////////
@@ -1305,7 +1332,8 @@
int waiterCount = 0;
boolean isInterruptedExceptionOccurred = false;
- if (duplicatedWaiterObjId != -1 || isDeadlockFree(dLockInfo, eLockInfo, entityInfo, isDatasetLockInfo, isUpgrade)) {//deadlock free -> wait
+ if (duplicatedWaiterObjId != -1
+ || isDeadlockFree(dLockInfo, eLockInfo, entityInfo, isDatasetLockInfo, isUpgrade)) {//deadlock free -> wait
if (duplicatedWaiterObjId == -1) {
waiterId = lockWaiterManager.allocate(); //initial value of waiterObj: wait = true, victim = false
waiter = lockWaiterManager.getLockWaiter(waiterId);
@@ -1362,7 +1390,7 @@
}
}
}
-
+
if (isInterruptedExceptionOccurred) {
throw new ACIDException("InterruptedException is caught");
}
@@ -1407,7 +1435,7 @@
}
//if (!isUpgrade && isDatasetLockInfo) {
- jobInfo.removeWaitingResource(waiterId);
+ jobInfo.removeWaitingResource(waiterId);
//}
lockWaiterManager.deallocate(waiterId);
}
@@ -1429,7 +1457,8 @@
return waiterCount;
}
- private boolean isDeadlockFree(DatasetLockInfo dLockInfo, int eLockInfo, int entityInfo, boolean isDatasetLockInfo, boolean isUpgrade) {
+ private boolean isDeadlockFree(DatasetLockInfo dLockInfo, int eLockInfo, int entityInfo, boolean isDatasetLockInfo,
+ boolean isUpgrade) {
return deadlockDetector.isSafeToAdd(dLockInfo, eLockInfo, entityInfo, isDatasetLockInfo, isUpgrade);
}
@@ -1439,7 +1468,6 @@
throw new ACIDException("Transaction " + txnContext.getJobId()
+ " should abort (requested by the Lock Manager)");
}
-
/**
* For now, upgrading lock granule from entity-granule to dataset-granule is not supported!!
@@ -1527,13 +1555,14 @@
int entityInfo;
LockWaiter waiterObj;
byte entityLockMode;
-
+
consecutiveWakeupContext.reset();
while (waiterObjId != -1) {
//wake up upgraders
waiterObj = lockWaiterManager.getLockWaiter(waiterObjId);
entityInfo = waiterObj.getEntityInfoSlot();
- if (entityLockInfoManager.isUpgradeCompatible(eLockInfo, LockMode.X, entityInfo) && consecutiveWakeupContext.isCompatible(LockMode.X)) {
+ if (entityLockInfoManager.isUpgradeCompatible(eLockInfo, LockMode.X, entityInfo)
+ && consecutiveWakeupContext.isCompatible(LockMode.X)) {
consecutiveWakeupContext.setLockMode(LockMode.X);
latchWaitNotify();
synchronized (waiterObj) {
@@ -1551,7 +1580,7 @@
break;
}
}
-
+
if (areAllUpgradersAwaken) {
//wake up waiters
waiterObjId = entityLockInfoManager.getFirstWaiter(eLockInfo);
@@ -1559,7 +1588,8 @@
waiterObj = lockWaiterManager.getLockWaiter(waiterObjId);
entityInfo = waiterObj.getEntityInfoSlot();
entityLockMode = entityInfoManager.getEntityLockMode(entityInfo);
- if (entityLockInfoManager.isCompatible(eLockInfo, entityLockMode) && consecutiveWakeupContext.isCompatible(entityLockMode)) {
+ if (entityLockInfoManager.isCompatible(eLockInfo, entityLockMode)
+ && consecutiveWakeupContext.isCompatible(entityLockMode)) {
consecutiveWakeupContext.setLockMode(entityLockMode);
//compatible waiter is waken up
latchWaitNotify();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 52d766d..ebcd506 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -22,6 +22,7 @@
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.opcallbacks.AbstractOperationCallback;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
import edu.uci.ics.asterix.transaction.management.resource.ICloseable;
import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
@@ -77,12 +78,19 @@
status = ACTIVE_STATUS;
}
- public void registerIndex(ILSMIndex index) {
- indexes.add(index);
+ public void registerIndexAndCallback(ILSMIndex index, AbstractOperationCallback callback) {
+ synchronized (indexes) {
+ indexes.add(index);
+ callbacks.add(callback);
+ }
}
- public void registerCallback(AbstractOperationCallback callback) {
- callbacks.add(callback);
+ public void setLastLSNToIndexes(long lastLSN) {
+ synchronized (indexes) {
+ for (ILSMIndex index : indexes) {
+ ((IndexOperationTracker) index.getOperationTracker()).setLastLSN(lastLSN);
+ }
+ }
}
public void setTransactionType(TransactionType transactionType) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 0af6303..f537e88 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -36,7 +36,8 @@
}
@Override
- public void abortTransaction(TransactionContext txnContext, DatasetId datasetId, int PKHashVal) throws ACIDException {
+ public void abortTransaction(TransactionContext txnContext, DatasetId datasetId, int PKHashVal)
+ throws ACIDException {
synchronized (txnContext) {
if (txnContext.getTxnState().equals(TransactionState.ABORTED)) {
return;
@@ -82,14 +83,25 @@
}
@Override
- public void commitTransaction(TransactionContext txnContext, DatasetId datasetId, int PKHashVal) throws ACIDException {
+ public void commitTransaction(TransactionContext txnContext, DatasetId datasetId, int PKHashVal)
+ throws ACIDException {
synchronized (txnContext) {
if ((txnContext.getTxnState().equals(TransactionState.COMMITTED))) {
return;
}
+ //There is either job-level commit or entity-level commit.
+ //The job-level commit will have -1 value both for datasetId and PKHashVal.
+
+ //for entity-level commit
+ if (PKHashVal != -1) {
+ transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext);
+ return;
+ }
+
+ //for job-level commit
try {
- if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) {
+ if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) {
transactionProvider.getLogManager().log(LogType.COMMIT, txnContext, -1, -1, -1, (byte) 0, 0, null,
null, txnContext.getLastLogLocator());
}
@@ -108,7 +120,8 @@
}
@Override
- public void completedTransaction(TransactionContext txnContext, DatasetId datasetId, int PKHashVal, boolean success) throws ACIDException {
+ public void completedTransaction(TransactionContext txnContext, DatasetId datasetId, int PKHashVal, boolean success)
+ throws ACIDException {
if (!success) {
abortTransaction(txnContext, datasetId, PKHashVal);
} else {