use lock counts to determine whether to decrease active transaction count on unlock
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
index 6e42e2d..79fbf3b 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
@@ -64,8 +64,10 @@
* @param txnContext
* @throws ACIDException
* TODO
+ * @return true if the lock count is 0, false otherwise.
*/
- public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException;
+ public boolean unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
+ throws ACIDException;
/**
* @param datasetId
@@ -73,8 +75,9 @@
* @param txnContext
* @throws ACIDException
* TODO
+ * @return true if the lock count is 0, false otherwise.
*/
- public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
+ public boolean unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
throws ACIDException;
/**
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 8fb7494..dc6f111 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -633,14 +633,15 @@
}
@Override
- public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException {
- internalUnlock(datasetId, entityHashValue, txnContext, false, false);
+ public boolean unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
+ throws ACIDException {
+ return internalUnlock(datasetId, entityHashValue, txnContext, false, false);
}
@Override
- public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
+ public boolean unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
throws ACIDException {
- internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
+ return internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
}
private void instantUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
@@ -648,7 +649,7 @@
internalUnlock(datasetId, entityHashValue, txnContext, true, false);
}
- private void internalUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext,
+ private boolean internalUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext,
boolean isInstant, boolean commitFlag) throws ACIDException {
JobId jobId = txnContext.getJobId();
int eLockInfo = -1;
@@ -657,6 +658,7 @@
int entityInfo = -1;
byte datasetLockMode;
+ boolean lockCountIsZero = false;
if (IS_DEBUG_MODE) {
if (entityHashValue == -1) {
throw new UnsupportedOperationException(
@@ -704,6 +706,7 @@
if (entityInfoManager.getEntityLockCount(entityInfo) == 0
&& entityInfoManager.getDatasetLockCount(entityInfo) == 0) {
+ lockCountIsZero = true;
int threadCount = 0; //number of threads(in the same job) waiting on the same resource
int waiterObjId = jobInfo.getFirstWaitingResource();
int waitingEntityInfo;
@@ -783,6 +786,7 @@
} finally {
unlatchLockTable();
}
+ return lockCountIsZero;
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 4e8808f..85bdff8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -26,6 +26,7 @@
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
/**
* An implementation of the @see ITransactionManager interface that provides
@@ -105,15 +106,23 @@
//for entity-level commit
if (PKHashVal != -1) {
- transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
- /*****************************
- try {
- //decrease the transaction reference count on index
- txnContext.decreaseActiveTransactionCountOnIndexes();
- } catch (HyracksDataException e) {
- throw new ACIDException("failed to complete index operation", e);
+ boolean countIsZero = transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext,
+ true);
+ if (!countIsZero) {
+ // Lock count != 0 for a particular entity implies that the entity has been locked
+ // more than once (probably due to a hash collision in our current model).
+ // It is safe to decrease the active transaction count on indexes since,
+ // by virtue of the counter not being zero, there is another transaction
+ // that has increased the transaction count. Thus, decreasing it will not
+ // allow the data to be flushed (yet). The flush will occur when the log page
+ // flush thread decides to decrease the count for the last time.
+ try {
+ //decrease the transaction reference count on index
+ txnContext.decreaseActiveTransactionCountOnIndexes();
+ } catch (HyracksDataException e) {
+ throw new ACIDException("failed to complete index operation", e);
+ }
}
- *****************************/
return;
}
@@ -151,11 +160,11 @@
public TransactionSubsystem getTransactionProvider() {
return transactionProvider;
}
-
+
public void setMaxJobId(int jobId) {
maxJobId.set(Math.max(maxJobId.get(), jobId));
}
-
+
public int getMaxJobId() {
return maxJobId.get();
}