inline batchUnlock to avoid dependency on LockManager implementation
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 933afcd..4667c80 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -82,7 +82,7 @@
emptyQ = new LinkedBlockingQueue<LogPage>(numLogPages);
flushQ = new LinkedBlockingQueue<LogPage>(numLogPages);
for (int i = 0; i < numLogPages; i++) {
- emptyQ.offer(new LogPage((LockManager) txnSubsystem.getLockManager(), logPageSize, flushLSN));
+ emptyQ.offer(new LogPage(txnSubsystem, logPageSize, flushLSN));
}
appendLSN = initializeLogAnchor(nextLogFileId);
flushLSN.set(appendLSN);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index a3f42a7..75a74c9 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -22,16 +22,20 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILogPage;
import edu.uci.ics.asterix.common.transactions.ILogRecord;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.common.transactions.MutableLong;
import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
public class LogPage implements ILogPage {
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(LogPage.class.getName());
- private final LockManager lockMgr;
+ private final TransactionSubsystem txnSubsystem;
private final LogPageReader logPageReader;
private final int logPageSize;
private final MutableLong flushLSN;
@@ -46,8 +50,8 @@
private FileChannel fileChannel;
private boolean stop;
- public LogPage(LockManager lockMgr, int logPageSize, MutableLong flushLSN) {
- this.lockMgr = lockMgr;
+ public LogPage(TransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLSN) {
+ this.txnSubsystem = txnSubsystem;
this.logPageSize = logPageSize;
this.flushLSN = flushLSN;
appendBuffer = ByteBuffer.allocate(logPageSize);
@@ -187,7 +191,27 @@
private void batchUnlock(int beginOffset, int endOffset) throws ACIDException {
if (endOffset > beginOffset) {
logPageReader.initializeScan(beginOffset, endOffset);
- lockMgr.batchUnlock(this, logPageReader);
+
+ DatasetId dsId = new DatasetId(-1);
+ JobId jId = new JobId(-1);
+ ITransactionContext txnCtx = null;
+
+ LogRecord logRecord = logPageReader.next();
+ while (logRecord != null) {
+ if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
+ dsId.setId(logRecord.getDatasetId());
+ jId.setId(logRecord.getJobId());
+ txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jId, false);
+ txnSubsystem.getLockManager().unlock(dsId, logRecord.getPKHashValue(), txnCtx);
+ txnCtx.notifyOptracker(false);
+ } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
+ jId.setId(logRecord.getJobId());
+ txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jId, false);
+ txnCtx.notifyOptracker(true);
+ notifyJobTerminator();
+ }
+ logRecord = logPageReader.next();
+ }
}
}