another bug fix(issue437: lost log record) and code clean up
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
index c08462f..354a745 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
@@ -19,6 +19,9 @@
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Represent a buffer that is backed by a physical file. Provider custom APIs
@@ -35,6 +38,9 @@
private int bufferNextWriteOffset;
private final int diskSectorSize;
+ private final ReadWriteLock latch;
+ private final AtomicInteger referenceCount;
+
public FileBasedBuffer(String filePath, long offset, int bufferSize, int diskSectorSize) throws IOException {
this.filePath = filePath;
buffer = ByteBuffer.allocate(bufferSize);
@@ -48,6 +54,8 @@
bufferLastFlushOffset = 0;
bufferNextWriteOffset = 0;
this.diskSectorSize = diskSectorSize;
+ this.latch = new ReentrantReadWriteLock(true);
+ referenceCount = new AtomicInteger(0);
}
public String getFilePath() {
@@ -200,4 +208,38 @@
}
}
+ @Override
+ public void acquireWriteLatch() {
+ latch.writeLock().lock();
+ }
+
+ @Override
+ public void releaseWriteLatch() {
+ latch.writeLock().unlock();
+ }
+
+ @Override
+ public void acquireReadLatch() {
+ latch.readLock().lock();
+ }
+
+ @Override
+ public void releaseReadLatch() {
+ latch.readLock().unlock();
+ }
+
+ @Override
+ public void incRefCnt() {
+ referenceCount.incrementAndGet();
+ }
+
+ @Override
+ public void decRefCnt() {
+ referenceCount.decrementAndGet();
+ }
+
+ @Override
+ public int getRefCnt() {
+ return referenceCount.get();
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
index ce13df4..4f6d2b8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
@@ -46,5 +46,19 @@
int getBufferNextWriteOffset();
void setBufferNextWriteOffset(int offset);
+
+ public void acquireWriteLatch();
+
+ public void releaseWriteLatch();
+
+ public void acquireReadLatch();
+
+ public void releaseReadLatch();
+
+ public void incRefCnt();
+
+ public void decRefCnt();
+
+ public int getRefCnt();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
index 9f20b5d..26229a7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
@@ -61,15 +61,6 @@
public void readLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException;
/**
- * Flushes the log records up to the lsn represented by the
- * logicalLogLocator
- *
- * @param logicalLogLocator
- * @throws ACIDException
- */
- public void flushLog(LogicalLogLocator logicalLogLocator) throws ACIDException;
-
- /**
* Retrieves the configuration parameters of the ILogManager
*
* @return LogManagerProperties: the configuration parameters for the
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
index 80f74cb..ce5649a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
@@ -61,5 +61,7 @@
int getLogHeaderSize(byte logType);
int getLogChecksumSize();
+
+ int getCommitLogSize();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 36d052d..333a661 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -27,15 +27,12 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageOwnershipStatus;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageState;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -48,6 +45,9 @@
private final TransactionSubsystem provider;
private LogManagerProperties logManagerProperties;
private LogPageFlushThread logPageFlusher;
+ private final int logPageSize;
+ private long statLogSize;
+ private long statLogCount;
/*
* the array of log pages. The number of log pages is configurable. Pages
@@ -62,47 +62,6 @@
*/
private int numLogPages;
- /*
- * Initially all pages have an owner count of 1 that is the LogManager. When
- * a transaction requests to write in a log page, the owner count is
- * incremented. The log manager reserves space in the log page and puts in
- * the log header but leaves the space for the content and the checksum
- * (covering the whole log record). When the content has been put, the log
- * manager computes the checksum and puts it after the content. At this
- * point, the ownership count is decremented as the transaction is done with
- * using the page. When a page is requested to be flushed, logPageFlusher
- * set the count to 0(LOG_FLUSHER: meaning that the page is being flushed)
- * only if the count is 1(LOG_WRITER: meaning that there is no other
- * transactions who own the page to write logs.) After flushing the page,
- * logPageFlusher set this count to 1.
- */
- private AtomicInteger[] logPageOwnerCount;
-
- static class PageOwnershipStatus {
- public static final int LOG_WRITER = 1;
- public static final int LOG_FLUSHER = 0;
- }
-
- /*
- * LogPageStatus: A page is either ACTIVE or INACTIVE. The status for each
- * page is maintained in logPageStatus. A page is ACTIVE when the LogManager
- * can allocate space in the page for writing a log record. Initially all
- * pages are ACTIVE. As transactions fill up space by writing log records, a
- * page may not have sufficient space left for serving a request by a
- * transaction. When this happens, the page is flushed to disk by calling
- * logPageFlusher.requestFlush(). In the requestFlush(), after
- * groupCommitWaitTime, the page status is set to INACTIVE. Then, there is
- * no more writer on the page(meaning the corresponding logPageOwnerCount is
- * 1), the page is flushed by the logPageFlusher and the status is reset to
- * ACTIVE by the logPageFlusher.
- */
- private AtomicInteger[] logPageStatus;
-
- static class PageState {
- public static final int INACTIVE = 0;
- public static final int ACTIVE = 1;
- }
-
private AtomicLong lastFlushedLSN = new AtomicLong(-1);
/*
@@ -129,10 +88,6 @@
return lastFlushedLSN;
}
- public AtomicInteger getLogPageStatus(int pageIndex) {
- return logPageStatus[pageIndex];
- }
-
public AtomicLong getCurrentLsn() {
return lsn;
}
@@ -144,13 +99,19 @@
public LogManager(TransactionSubsystem provider) throws ACIDException {
this.provider = provider;
initLogManagerProperties(this.provider.getId());
+ logPageSize = logManagerProperties.getLogPageSize();
initLogManager();
+ statLogSize = 0;
+ statLogCount = 0;
}
public LogManager(TransactionSubsystem provider, String nodeId) throws ACIDException {
this.provider = provider;
initLogManagerProperties(nodeId);
+ logPageSize = logManagerProperties.getLogPageSize();
initLogManager();
+ statLogSize = 0;
+ statLogCount = 0;
}
/*
@@ -186,9 +147,6 @@
private void initLogManager() throws ACIDException {
logRecordHelper = new LogRecordHelper(this);
numLogPages = logManagerProperties.getNumLogPages();
- logPageOwnerCount = new AtomicInteger[numLogPages];
- logPageStatus = new AtomicInteger[numLogPages];
-
activeTxnCountMaps = new ArrayList<HashMap<TransactionContext, Integer>>(numLogPages);
for (int i = 0; i < numLogPages; i++) {
activeTxnCountMaps.add(new HashMap<TransactionContext, Integer>());
@@ -202,14 +160,6 @@
initLSN();
/*
- * initialize meta data for each log page.
- */
- for (int i = 0; i < numLogPages; i++) {
- logPageOwnerCount[i] = new AtomicInteger(PageOwnershipStatus.LOG_WRITER);
- logPageStatus[i] = new AtomicInteger(PageState.ACTIVE);
- }
-
- /*
* initialize the log pages.
*/
initializeLogPages(startingLSN);
@@ -226,7 +176,7 @@
}
public int getLogPageIndex(long lsnValue) {
- return (int) (((lsnValue - startingLSN) / logManagerProperties.getLogPageSize()) % numLogPages);
+ return (int) (((lsnValue - startingLSN) / logPageSize) % numLogPages);
}
/*
@@ -242,28 +192,7 @@
* record is (to be) placed.
*/
public int getLogPageOffset(long lsnValue) {
- return (int) (lsnValue % logManagerProperties.getLogPageSize());
- }
-
- /*
- * a transaction thread under certain scenarios is required to wait until
- * the page where it has to write a log record becomes available for writing
- * a log record.
- */
- private void waitUntillPageIsAvailableForWritingLog(int pageIndex) throws ACIDException {
- if (logPageStatus[pageIndex].get() == PageState.ACTIVE
- && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER) {
- return;
- }
- try {
- synchronized (logPages[pageIndex]) {
- while (!(logPageStatus[pageIndex].get() == PageState.ACTIVE && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER)) {
- logPages[pageIndex].wait();
- }
- }
- } catch (InterruptedException e) {
- throw new ACIDException(" thread interrupted while waiting for page " + pageIndex + " to be available ", e);
- }
+ return (int) (lsnValue % logPageSize);
}
/*
@@ -277,7 +206,6 @@
* @param logType: the type of log record.
*/
private long getLsn(int entrySize, byte logType) throws ACIDException {
- long pageSize = logManagerProperties.getLogPageSize();
while (true) {
boolean forwardPage = false;
@@ -294,9 +222,9 @@
// check if the log record will cross page boundaries, a case that
// is not allowed.
- if ((next - 1) / pageSize != old / pageSize || (next % pageSize == 0)) {
+ if ((next - 1) / logPageSize != old / logPageSize || (next % logPageSize == 0)) {
- if ((old != 0 && old % pageSize == 0)) {
+ if ((old != 0 && old % logPageSize == 0)) {
// On second thought, this shall never be the case as it
// means that the lsn is
// currently at the beginning of a page and we still need to
@@ -309,7 +237,7 @@
} else {
// set the lsn to point to the beginning of the next page.
- retVal = ((old / pageSize) + 1) * pageSize;
+ retVal = ((old / logPageSize) + 1) * logPageSize;
}
next = retVal;
@@ -323,20 +251,6 @@
pageIndex = getNextPageInSequence(pageIndex);
}
- /*
- * we do not want to keep allocating LSNs if the corresponding page
- * is unavailable. Consider a scenario when the log flusher thread
- * is incredibly slow in flushing pages. Transaction threads will
- * acquire an lsn each for writing their next log record. When a
- * page has been made available, mulltiple transaction threads that
- * were waiting can continue to write their log record at the
- * assigned LSNs. Two transaction threads may get LSNs that are on
- * the same log page but actually differ by the size of the log
- * buffer. This would be erroneous. Transaction threads are made to
- * wait upfront for avoiding this situation.
- */
- waitUntillPageIsAvailableForWritingLog(pageIndex);
-
if (!lsn.compareAndSet(old, next)) {
// Atomic call -> returns true only when the value represented
// by lsn is same as
@@ -347,7 +261,7 @@
if (forwardPage) {
// forward the nextWriteOffset in the log page
- logPages[pageIndex].setBufferNextWriteOffset(logManagerProperties.getLogPageSize());
+ logPages[pageIndex].setBufferNextWriteOffset(logPageSize);
addFlushRequest(prevPage, old, false);
@@ -356,21 +270,18 @@
continue;
} else {
- // the transaction thread has been given a space in a log page,
- // but is made to wait until the page is available.
- // (Is this needed? when does this wait happen?)
- waitUntillPageIsAvailableForWritingLog(pageIndex);
-
+ logPages[pageIndex].acquireReadLatch();
// increment the counter as the transaction thread now holds a
// space in the log page and hence is an owner.
- logPageOwnerCount[pageIndex].incrementAndGet();
+ logPages[pageIndex].incRefCnt();
+ logPages[pageIndex].releaseReadLatch();
// Before the count is incremented, if the flusher flushed the
// allocated page,
// then retry to get new LSN. Otherwise, the log with allocated
// lsn will be lost.
if (lastFlushedLSN.get() >= retVal) {
- logPageOwnerCount[pageIndex].decrementAndGet();
+ logPages[pageIndex].decRefCnt();
continue;
}
}
@@ -400,10 +311,10 @@
int totalLogSize = logRecordHelper.getLogRecordSize(logType, logContentSize);
// check for the total space requirement to be less than a log page.
- if (totalLogSize > logManagerProperties.getLogPageSize()) {
+ if (totalLogSize > logPageSize) {
throw new ACIDException(
" Maximum Log Content Size is "
- + (logManagerProperties.getLogPageSize() - logRecordHelper.getLogHeaderSize(LogType.UPDATE) - logRecordHelper
+ + (logPageSize - logRecordHelper.getLogHeaderSize(LogType.UPDATE) - logRecordHelper
.getLogChecksumSize()));
}
@@ -487,9 +398,9 @@
checksum);
// forward the nextWriteOffset in the log page
- int bufferNextWriteOffset = (int) ((currentLSN + totalLogSize) % logManagerProperties.getLogPageSize());
+ int bufferNextWriteOffset = (int) ((currentLSN + totalLogSize) % logPageSize);
if (bufferNextWriteOffset == 0) {
- bufferNextWriteOffset = logManagerProperties.getLogPageSize();
+ bufferNextWriteOffset = logPageSize;
}
logPages[pageIndex].setBufferNextWriteOffset(bufferNextWriteOffset);
@@ -499,7 +410,11 @@
// release the ownership as the log record has been placed in
// created space.
- logPageOwnerCount[pageIndex].decrementAndGet();
+ logPages[pageIndex].decRefCnt();
+
+ //collect statistics
+ statLogSize += totalLogSize;
+ statLogCount++;
// indicating that the transaction thread has released ownership
decremented = true;
@@ -516,6 +431,9 @@
addFlushRequest(pageIndex, currentLSN, false);
} else if (logType == LogType.COMMIT) {
addFlushRequest(pageIndex, currentLSN, true);
+ if (IS_DEBUG_MODE) {
+ System.out.println("Running sum of log size: " + statLogSize + ", log count: " + statLogCount);
+ }
}
} catch (Exception e) {
@@ -524,7 +442,7 @@
+ " logger encountered exception", e);
} finally {
if (!decremented) {
- logPageOwnerCount[pageIndex].decrementAndGet();
+ logPages[pageIndex].decRefCnt();
}
}
}
@@ -537,14 +455,13 @@
String filePath = LogUtil.getLogFilePath(logManagerProperties, getLogFileId(lsn));
- logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition),
- logManagerProperties.getLogPageSize());
+ logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition), logPageSize);
}
@Override
public ILogCursor readLog(PhysicalLogLocator physicalLogLocator, ILogFilter logFilter) throws IOException,
ACIDException {
- LogCursor cursor = new LogCursor(this, physicalLogLocator, logFilter, logManagerProperties.getLogPageSize());
+ LogCursor cursor = new LogCursor(this, physicalLogLocator, logFilter, logPageSize);
return cursor;
}
@@ -555,7 +472,7 @@
String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, lsnValue));
long fileOffset = LogUtil.getFileOffset(this, lsnValue);
- ByteBuffer buffer = ByteBuffer.allocate(logManagerProperties.getLogPageSize());
+ ByteBuffer buffer = ByteBuffer.allocate(logPageSize);
RandomAccessFile raf = null;
FileChannel fileChannel = null;
try {
@@ -616,7 +533,7 @@
// minimize memory allocation overhead. current code allocates the
// log page size per reading a log record.
- byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
+ byte[] pageContent = new byte[logPageSize];
// take a lock on the log page so that the page is not flushed to
// disk interim
@@ -663,8 +580,11 @@
public boolean isMemoryRead(long currentLSN) {
long flushLSN = lastFlushedLSN.get();
- long logPageBeginOffset = flushLSN - (flushLSN % logManagerProperties.getLogPageSize());
- long logPageEndOffset = logPageBeginOffset + logManagerProperties.getLogPageSize();
+ if ((flushLSN + 1) % logPageSize == 0) {
+ return false;
+ }
+ long logPageBeginOffset = flushLSN - (flushLSN % logPageSize);
+ long logPageEndOffset = logPageBeginOffset + logPageSize;
if (currentLSN > flushLSN || (currentLSN >= logPageBeginOffset && currentLSN < logPageEndOffset)) {
return true;
} else {
@@ -710,9 +630,7 @@
try {
String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, startingLSN));
for (int i = 0; i < numLogPages; i++) {
- logPages[i].open(filePath,
- LogUtil.getFileOffset(this, startingLSN) + i * logManagerProperties.getLogPageSize(),
- logManagerProperties.getLogPageSize());
+ logPages[i].open(filePath, LogUtil.getFileOffset(this, startingLSN) + i * logPageSize, logPageSize);
}
} catch (Exception e) {
throw new ACIDException(Thread.currentThread().getName() + " unable to create log buffer", e);
@@ -725,39 +643,23 @@
}
/*
- * This method shall be called by the Buffer manager when it needs to evict
- * a page from the cache. TODO: Change the implementation from a looping
- * logic to event based when log manager support is integrated with the
- * Buffer Manager.
- */
- @Override
- public synchronized void flushLog(LogicalLogLocator logicalLogLocator) throws ACIDException {
- if (logicalLogLocator.getLsn() > lsn.get()) {
- throw new ACIDException(" invalid lsn " + logicalLogLocator.getLsn());
- }
- while (lastFlushedLSN.get() < logicalLogLocator.getLsn());
- }
-
- /*
* Map each log page to cover a physical byte range over a log file. When a
* page is flushed, the page contents are put to disk in the corresponding
* byte range.
*/
private void initializeLogPages(long beginLsn) throws ACIDException {
try {
- String filePath = LogUtil.getLogFilePath(logManagerProperties,
- LogUtil.getFileId(this, beginLsn));
+ String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, beginLsn));
long nextDiskWriteOffset = LogUtil.getFileOffset(this, beginLsn);
- long nextBufferWriteOffset = nextDiskWriteOffset % logManagerProperties.getLogPageSize();
+ long nextBufferWriteOffset = nextDiskWriteOffset % logPageSize;
long bufferBeginOffset = nextDiskWriteOffset - nextBufferWriteOffset;
for (int i = 0; i < numLogPages; i++) {
- logPages[i] = FileUtil.getFileBasedBuffer(filePath,
- bufferBeginOffset + i * logManagerProperties.getLogPageSize(),
- logManagerProperties.getLogPageSize(), logManagerProperties.getDiskSectorSize());
+ logPages[i] = FileUtil.getFileBasedBuffer(filePath, bufferBeginOffset + i * logPageSize, logPageSize,
+ logManagerProperties.getDiskSectorSize());
if (i == 0) {
- logPages[i].setBufferLastFlushOffset((int)nextBufferWriteOffset);
- logPages[i].setBufferNextWriteOffset((int)nextBufferWriteOffset);
+ logPages[i].setBufferLastFlushOffset((int) nextBufferWriteOffset);
+ logPages[i].setBufferNextWriteOffset((int) nextBufferWriteOffset);
logPages[i].setDiskNextWriteOffset(nextDiskWriteOffset);
}
}
@@ -787,10 +689,6 @@
return logPages[pageIndex];
}
- public AtomicInteger getLogPageOwnershipCount(int pageIndex) {
- return logPageOwnerCount[pageIndex];
- }
-
public IFileBasedBuffer[] getLogPages() {
return logPages;
}
@@ -930,64 +828,62 @@
}
synchronized (logManager.getLogPage(flushPageIndex)) {
+ logManager.getLogPage(flushPageIndex).acquireWriteLatch();
+ try {
- // #. sleep for the groupCommitWaitTime
- sleep(groupCommitWaitPeriod);
+ //if the log page is already full, don't wait.
+ if (logManager.getLogPage(flushPageIndex).getBufferNextWriteOffset() > logPageSize
+ - logManager.getLogRecordHelper().getCommitLogSize()) {
+ // #. sleep for the groupCommitWaitTime
+ sleep(groupCommitWaitPeriod);
+ }
- // #. set the logPageStatus to INACTIVE in order to prevent
- // other txns from writing on this page.
- logManager.getLogPageStatus(flushPageIndex).set(PageState.INACTIVE);
+ // #. need to wait until the reference count reaches 0
+ while (logManager.getLogPage(flushPageIndex).getRefCnt() != 0) {
+ sleep(0);
+ }
- // #. need to wait until the logPageOwnerCount reaches 1
- // (LOG_WRITER)
- // meaning every one has finished writing logs on this page.
- while (logManager.getLogPageOwnershipCount(flushPageIndex).get() != PageOwnershipStatus.LOG_WRITER) {
- sleep(0);
+ beforeFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
+
+ // put the content to disk (the thread still has a lock on the log page)
+ logManager.getLogPage(flushPageIndex).flush();
+
+ afterFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
+
+ // increment the last flushed lsn
+ logManager.incrementLastFlushedLsn(afterFlushOffset - beforeFlushOffset);
+
+ // increment currentLSN if currentLSN is less than flushLSN.
+ if (logManager.getLastFlushedLsn().get() + 1 > logManager.getCurrentLsn().get()) {
+ logManager.getCurrentLsn().set(logManager.getLastFlushedLsn().get() + 1);
+ }
+
+ // Map the log page to a new region in the log file if the flushOffset reached the logPageSize
+ if (afterFlushOffset == logPageSize) {
+ long diskNextWriteOffset = logManager.getLogPages()[flushPageIndex]
+ .getDiskNextWriteOffset() + logBufferSize;
+ logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1 + logBufferSize,
+ diskNextWriteOffset, flushPageIndex);
+ resetFlushPageIndex = true;
+ }
+
+ // decrement activeTxnCountOnIndexes
+ logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);
+
+ // #. checks the queue whether there is another flush
+ // request on the same log buffer
+ // If there is another request, then simply remove it.
+ if (flushRequestQueue[flushPageIndex].peek() != null) {
+ flushRequestQueue[flushPageIndex].take();
+ }
+
+ // notify all waiting (transaction) threads.
+ logManager.getLogPage(flushPageIndex).notifyAll();
+
+ } finally {
+ logManager.getLogPage(flushPageIndex).releaseWriteLatch();
}
- // #. set the logPageOwnerCount to 0 (LOG_FLUSHER)
- // meaning it is flushing.
- logManager.getLogPageOwnershipCount(flushPageIndex).set(PageOwnershipStatus.LOG_FLUSHER);
-
- beforeFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
-
- // put the content to disk (the thread still has a lock on
- // the log page)
- logManager.getLogPage(flushPageIndex).flush();
-
- afterFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
-
- // increment the last flushed lsn
- logManager.incrementLastFlushedLsn(afterFlushOffset - beforeFlushOffset);
-
- // Map the log page to a new region in the log file if the flushOffset reached the logPageSize
- if (afterFlushOffset == logPageSize) {
- long diskNextWriteOffset = logManager.getLogPages()[flushPageIndex].getDiskNextWriteOffset()
- + logBufferSize;
- logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1 + logBufferSize,
- diskNextWriteOffset, flushPageIndex);
- resetFlushPageIndex = true;
- }
-
- // decrement activeTxnCountOnIndexes
- logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);
-
- // reset the count to 1
- logManager.getLogPageOwnershipCount(flushPageIndex).set(PageOwnershipStatus.LOG_WRITER);
-
- // mark the page as ACTIVE
- logManager.getLogPageStatus(flushPageIndex).set(LogManager.PageState.ACTIVE);
-
- // #. checks the queue whether there is another flush
- // request on the same log buffer
- // If there is another request, then simply remove it.
- if (flushRequestQueue[flushPageIndex].peek() != null) {
- flushRequestQueue[flushPageIndex].take();
- }
-
- // notify all waiting (transaction) threads.
- logManager.getLogPage(flushPageIndex).notifyAll();
-
if (resetFlushPageIndex) {
flushPageIndex = logManager.getNextPageInSequence(flushPageIndex);
resetFlushPageIndex = false;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
index 8eae204..e46b7ac 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
@@ -61,6 +61,8 @@
private final int RESOURCE_MGR_ID_POS = 33;
private final int LOG_RECORD_SIZE_POS = 34;
+ public final int COMMIT_LOG_SIZE = PREV_LSN_POS + LOG_CHECKSUM_SIZE;
+
private ILogManager logManager;
public LogRecordHelper(ILogManager logManager) {
@@ -236,7 +238,7 @@
if (logType == LogType.UPDATE) {
return 46 + logBodySize;
} else {
- return 25;
+ return COMMIT_LOG_SIZE;
}
}
@@ -253,4 +255,8 @@
public int getLogChecksumSize() {
return LOG_CHECKSUM_SIZE;
}
+
+ public int getCommitLogSize() {
+ return COMMIT_LOG_SIZE;
+ }
}