changes to fix Issue 409(supporting variable log page size)
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
index 4319b8b..c08462f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
@@ -27,22 +27,27 @@
public class FileBasedBuffer extends Buffer implements IFileBasedBuffer {
private String filePath;
- private long nextWritePosition;
private FileChannel fileChannel;
private RandomAccessFile raf;
- private int size;
+ private int bufferSize;
- public FileBasedBuffer(String filePath, long offset, int size) throws IOException {
+ private int bufferLastFlushOffset;
+ private int bufferNextWriteOffset;
+ private final int diskSectorSize;
+
+ public FileBasedBuffer(String filePath, long offset, int bufferSize, int diskSectorSize) throws IOException {
this.filePath = filePath;
- this.nextWritePosition = offset;
- buffer = ByteBuffer.allocate(size);
+ buffer = ByteBuffer.allocate(bufferSize);
raf = new RandomAccessFile(new File(filePath), "rw");
- raf.seek(offset);
fileChannel = raf.getChannel();
+ fileChannel.position(offset);
fileChannel.read(buffer);
buffer.position(0);
- this.size = size;
- buffer.limit(size);
+ this.bufferSize = bufferSize;
+ buffer.limit(bufferSize);
+ bufferLastFlushOffset = 0;
+ bufferNextWriteOffset = 0;
+ this.diskSectorSize = diskSectorSize;
}
public String getFilePath() {
@@ -53,17 +58,9 @@
this.filePath = filePath;
}
- public long getOffset() {
- return nextWritePosition;
- }
-
- public void setOffset(long offset) {
- this.nextWritePosition = offset;
- }
-
@Override
public int getSize() {
- return buffer.limit();
+ return bufferSize;
}
public void clear() {
@@ -72,11 +69,18 @@
@Override
public void flush() throws IOException {
- buffer.position(0);
- buffer.limit(size);
+ //flush
+ int pos = bufferLastFlushOffset;
+ int limit = (((bufferNextWriteOffset - 1) / diskSectorSize) + 1) * diskSectorSize;
+ buffer.position(pos);
+ buffer.limit(limit);
fileChannel.write(buffer);
fileChannel.force(true);
- erase();
+
+ //update variables
+ bufferLastFlushOffset = limit;
+ bufferNextWriteOffset = limit;
+ buffer.limit(bufferSize);
}
@Override
@@ -124,45 +128,76 @@
* starting at offset.
*/
@Override
- public void reset(String filePath, long nextWritePosition, int size) throws IOException {
+ public void reset(String filePath, long diskNextWriteOffset, int bufferSize) throws IOException {
if (!filePath.equals(this.filePath)) {
raf.close();//required?
fileChannel.close();
raf = new RandomAccessFile(filePath, "rw");
this.filePath = filePath;
}
- this.nextWritePosition = nextWritePosition;
- raf.seek(nextWritePosition);
fileChannel = raf.getChannel();
+ fileChannel.position(diskNextWriteOffset);
erase();
buffer.position(0);
- buffer.limit(size);
- this.size = size;
+ buffer.limit(bufferSize);
+ this.bufferSize = bufferSize;
+
+ bufferLastFlushOffset = 0;
+ bufferNextWriteOffset = 0;
}
-
+
@Override
public void close() throws IOException {
fileChannel.close();
}
-
+
@Override
- public void open(String filePath, long offset, int size) throws IOException {
+ public void open(String filePath, long offset, int bufferSize) throws IOException {
raf = new RandomAccessFile(filePath, "rw");
- this.nextWritePosition = offset;
fileChannel = raf.getChannel();
fileChannel.position(offset);
erase();
buffer.position(0);
- buffer.limit(size);
- this.size = size;
+ buffer.limit(bufferSize);
+ this.bufferSize = bufferSize;
+ bufferLastFlushOffset = 0;
+ bufferNextWriteOffset = 0;
}
- public long getNextWritePosition() {
- return nextWritePosition;
+ @Override
+ public long getDiskNextWriteOffset() throws IOException {
+ return fileChannel.position();
}
- public void setNextWritePosition(long nextWritePosition) {
- this.nextWritePosition = nextWritePosition;
+ @Override
+ public void setDiskNextWriteOffset(long offset) throws IOException {
+ fileChannel.position(offset);
+ }
+
+ @Override
+ public int getBufferLastFlushOffset() {
+ return bufferLastFlushOffset;
+ }
+
+ @Override
+ public void setBufferLastFlushOffset(int offset) {
+ this.bufferLastFlushOffset = offset;
+ }
+
+ @Override
+ public int getBufferNextWriteOffset() {
+ synchronized (fileChannel) {
+ return bufferNextWriteOffset;
+ }
+ }
+
+ @Override
+ public void setBufferNextWriteOffset(int offset) {
+ synchronized (fileChannel) {
+ if (bufferNextWriteOffset < offset) {
+ bufferNextWriteOffset = offset;
+ }
+ }
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java
index f2ffa0e..46e03f1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java
@@ -38,8 +38,8 @@
return (new File(path)).mkdir();
}
- public static IFileBasedBuffer getFileBasedBuffer(String filePath, long offset, int size) throws IOException {
- IFileBasedBuffer fileBasedBuffer = new FileBasedBuffer(filePath, offset, size);
+ public static IFileBasedBuffer getFileBasedBuffer(String filePath, long offset, int bufferSize, int diskSectorSize) throws IOException {
+ IFileBasedBuffer fileBasedBuffer = new FileBasedBuffer(filePath, offset, bufferSize, diskSectorSize);
return fileBasedBuffer;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
index e1f9f95..ce13df4 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
@@ -31,12 +31,20 @@
*/
public void reset(String filePath, long offset, int size) throws IOException;
- public long getNextWritePosition();
+ public long getDiskNextWriteOffset() throws IOException;
- public void setNextWritePosition(long writePosition);
+ public void setDiskNextWriteOffset(long writePosition) throws IOException;
public void close() throws IOException;
public void open(String filePath, long offset, int size) throws IOException;
+ int getBufferLastFlushOffset();
+
+ void setBufferLastFlushOffset(int offset);
+
+ int getBufferNextWriteOffset();
+
+ void setBufferNextWriteOffset(int offset);
+
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
index c629d03..9f20b5d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
@@ -53,17 +53,6 @@
ACIDException;
/**
- * Provides a cursor for retrieving logs that satisfy a given ILogFilter
- * instance. Log records are retrieved in increasing order of lsn
- *
- * @param logFilter
- * specifies the filtering criteria for the retrieved logs
- * @return LogCursor an iterator for the retrieved logs
- * @throws ACIDException
- */
- public ILogCursor readLog(ILogFilter logFilter) throws ACIDException;
-
- /**
* @param logicalLogLocator TODO
* @param PhysicalLogLocator
* specifies the location of the log record to be read
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index 8a2b188..1376e1c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -23,27 +23,16 @@
private final LogManager logManager;
private final ILogFilter logFilter;
+ private final int logPageSize;
private IBuffer readOnlyBuffer;
private LogicalLogLocator logicalLogLocator = null;
- private long bufferIndex = 0;
- private boolean firstNext = true;
- private boolean readMemory = false;
- private long readLSN = 0;
private boolean needReloadBuffer = true;
- /**
- * @param logFilter
- */
- public LogCursor(final LogManager logManager, ILogFilter logFilter) throws ACIDException {
+ public LogCursor(final LogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter,
+ int logPageSize) throws IOException, ACIDException {
this.logFilter = logFilter;
this.logManager = logManager;
-
- }
-
- public LogCursor(final LogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter)
- throws IOException, ACIDException {
- this.logFilter = logFilter;
- this.logManager = logManager;
+ this.logPageSize = logPageSize;
initialize(startingPhysicalLogLocator);
}
@@ -57,7 +46,8 @@
File file = new File(filePath);
if (file.exists()) {
return FileUtil.getFileBasedBuffer(filePath, lsn
- % logManager.getLogManagerProperties().getLogPartitionSize(), size);
+ % logManager.getLogManagerProperties().getLogPartitionSize(), size, logManager
+ .getLogManagerProperties().getDiskSectorSize());
} else {
return null;
}
@@ -87,8 +77,7 @@
return false;
}
- //if the lsn to read is greater than the last flushed lsn, then read from memory
- if (logicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
+ if (logManager.isMemoryRead(logicalLogLocator.getLsn())) {
return readFromMemory(currentLogLocator);
}
@@ -96,10 +85,9 @@
//needReloadBuffer is set to true if the log record is read from the memory log page.
if (needReloadBuffer) {
//log page size doesn't exceed integer boundary
- int offset = (int)(logicalLogLocator.getLsn() % logManager.getLogManagerProperties().getLogPageSize());
+ int offset = (int) (logicalLogLocator.getLsn() % logPageSize);
long adjustedLSN = logicalLogLocator.getLsn() - offset;
- readOnlyBuffer = getReadOnlyBuffer(adjustedLSN, logManager.getLogManagerProperties()
- .getLogPageSize());
+ readOnlyBuffer = getReadOnlyBuffer(adjustedLSN, logPageSize);
logicalLogLocator.setBuffer(readOnlyBuffer);
logicalLogLocator.setMemoryOffset(offset);
needReloadBuffer = false;
@@ -117,7 +105,7 @@
logicalLogLocator.increaseMemoryOffset(1);
logicalLogLocator.incrementLsn();
bytesSkipped++;
- if (bytesSkipped > logManager.getLogManagerProperties().getLogPageSize()) {
+ if (bytesSkipped > logPageSize) {
return false; // the maximum size of a log record is limited to
// a log page size. If we have skipped as many
// bytes without finding a log record, it
@@ -133,10 +121,9 @@
// need to reload the buffer
// TODO
// reduce IO by reading more pages(equal to logBufferSize) at a time.
- long lsnpos = ((logicalLogLocator.getLsn() / logManager.getLogManagerProperties().getLogPageSize()) + 1)
- * logManager.getLogManagerProperties().getLogPageSize();
+ long lsnpos = ((logicalLogLocator.getLsn() / logPageSize) + 1) * logPageSize;
- readOnlyBuffer = getReadOnlyBuffer(lsnpos, logManager.getLogManagerProperties().getLogPageSize());
+ readOnlyBuffer = getReadOnlyBuffer(lsnpos, logPageSize);
if (readOnlyBuffer != null) {
logicalLogLocator.setBuffer(readOnlyBuffer);
logicalLogLocator.setLsn(lsnpos);
@@ -190,12 +177,11 @@
IFileBasedBuffer logPage = logManager.getLogPage(pageIndex);
synchronized (logPage) {
// need to check again if the log record in the log buffer or has reached the disk
- if (lsn > logManager.getLastFlushedLsn().get()) {
+ if (logManager.isMemoryRead(lsn)) {
//find the magic number to identify the start of the log record
//----------------------------------------------------------------
int readNumber = -1;
- int logPageSize = logManager.getLogManagerProperties().getLogPageSize();
int logMagicNumber = logManager.getLogManagerProperties().LOG_MAGIC_NUMBER;
int bytesSkipped = 0;
boolean logRecordBeginPosFound = false;
@@ -223,7 +209,8 @@
// need to read the next log page
readOnlyBuffer = null;
logicalLogLocator.setBuffer(null);
- logicalLogLocator.setLsn(lsn / logPageSize + 1);
+ lsn = ((logicalLogLocator.getLsn() / logPageSize) + 1) * logPageSize;
+ logicalLogLocator.setLsn(lsn);
logicalLogLocator.setMemoryOffset(0);
return next(currentLogLocator);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 9b8f09c..36d052d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -199,7 +199,7 @@
/*
* place the log anchor at the end of the last log record written.
*/
- PhysicalLogLocator nextPhysicalLsn = initLSN();
+ initLSN();
/*
* initialize meta data for each log page.
@@ -212,7 +212,7 @@
/*
* initialize the log pages.
*/
- initializeLogPages(nextPhysicalLsn);
+ initializeLogPages(startingLSN);
/*
* Instantiate and begin the LogFlusher thread. The Log Flusher thread
@@ -242,7 +242,7 @@
* record is (to be) placed.
*/
public int getLogPageOffset(long lsnValue) {
- return (int) ((lsnValue - startingLSN) % logManagerProperties.getLogPageSize());
+ return (int) (lsnValue % logManagerProperties.getLogPageSize());
}
/*
@@ -345,6 +345,10 @@
}
if (forwardPage) {
+
+ // forward the nextWriteOffset in the log page
+ logPages[pageIndex].setBufferNextWriteOffset(logManagerProperties.getLogPageSize());
+
addFlushRequest(prevPage, old, false);
// The transaction thread that discovers the need to forward a
@@ -482,6 +486,13 @@
logPages[pageIndex].writeLong(pageOffset + logRecordHelper.getLogHeaderSize(logType) + logContentSize,
checksum);
+ // forward the nextWriteOffset in the log page
+ int bufferNextWriteOffset = (int) ((currentLSN + totalLogSize) % logManagerProperties.getLogPageSize());
+ if (bufferNextWriteOffset == 0) {
+ bufferNextWriteOffset = logManagerProperties.getLogPageSize();
+ }
+ logPages[pageIndex].setBufferNextWriteOffset(bufferNextWriteOffset);
+
if (IS_DEBUG_MODE) {
System.out.println("--------------> LSN(" + currentLSN + ") is written");
}
@@ -531,15 +542,9 @@
}
@Override
- public ILogCursor readLog(ILogFilter logFilter) throws ACIDException {
- LogCursor cursor = new LogCursor(this, logFilter);
- return cursor;
- }
-
- @Override
public ILogCursor readLog(PhysicalLogLocator physicalLogLocator, ILogFilter logFilter) throws IOException,
ACIDException {
- LogCursor cursor = new LogCursor(this, physicalLogLocator, logFilter);
+ LogCursor cursor = new LogCursor(this, physicalLogLocator, logFilter, logManagerProperties.getLogPageSize());
return cursor;
}
@@ -603,7 +608,7 @@
}
/* check if the log record in the log buffer or has reached the disk. */
- if (lsnValue > getLastFlushedLsn().get()) {
+ if (isMemoryRead(lsnValue)) {
int pageIndex = getLogPageIndex(lsnValue);
int pageOffset = getLogPageOffset(lsnValue);
@@ -619,8 +624,8 @@
// need to check again (this thread may have got de-scheduled
// and must refresh!)
- if (lsnValue > getLastFlushedLsn().get()) {
+ if (isMemoryRead(lsnValue)) {
// get the log record length
logPages[pageIndex].getBytes(pageContent, 0, pageContent.length);
byte logType = pageContent[pageOffset + 4];
@@ -656,6 +661,17 @@
readDiskLog(lsnValue, logicalLogLocator);
}
+ public boolean isMemoryRead(long currentLSN) {
+ long flushLSN = lastFlushedLSN.get();
+ long logPageBeginOffset = flushLSN - (flushLSN % logManagerProperties.getLogPageSize());
+ long logPageEndOffset = logPageBeginOffset + logManagerProperties.getLogPageSize();
+ if (currentLSN > flushLSN || (currentLSN >= logPageBeginOffset && currentLSN < logPageEndOffset)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
public void renewLogFiles() throws ACIDException {
List<String> logFileNames = LogUtil.getLogFiles(logManagerProperties);
for (String name : logFileNames) {
@@ -670,7 +686,7 @@
logPageFlusher.renew();
}
- private PhysicalLogLocator initLSN() throws ACIDException {
+ private void initLSN() throws ACIDException {
PhysicalLogLocator nextPhysicalLsn = LogUtil.initializeLogAnchor(this);
startingLSN = nextPhysicalLsn.getLsn();
lastFlushedLSN.set(startingLSN - 1);
@@ -678,7 +694,6 @@
LOGGER.info(" Starting lsn is : " + startingLSN);
}
lsn.set(startingLSN);
- return nextPhysicalLsn;
}
private void closeLogPages() throws ACIDException {
@@ -728,15 +743,23 @@
* page is flushed, the page contents are put to disk in the corresponding
* byte range.
*/
- private void initializeLogPages(PhysicalLogLocator physicalLogLocator) throws ACIDException {
+ private void initializeLogPages(long beginLsn) throws ACIDException {
try {
String filePath = LogUtil.getLogFilePath(logManagerProperties,
- LogUtil.getFileId(this, physicalLogLocator.getLsn()));
+ LogUtil.getFileId(this, beginLsn));
+ long nextDiskWriteOffset = LogUtil.getFileOffset(this, beginLsn);
+ long nextBufferWriteOffset = nextDiskWriteOffset % logManagerProperties.getLogPageSize();
+ long bufferBeginOffset = nextDiskWriteOffset - nextBufferWriteOffset;
+
for (int i = 0; i < numLogPages; i++) {
- logPages[i] = FileUtil.getFileBasedBuffer(
- filePath,
- LogUtil.getFileOffset(this, physicalLogLocator.getLsn()) + i
- * logManagerProperties.getLogPageSize(), logManagerProperties.getLogPageSize());
+ logPages[i] = FileUtil.getFileBasedBuffer(filePath,
+ bufferBeginOffset + i * logManagerProperties.getLogPageSize(),
+ logManagerProperties.getLogPageSize(), logManagerProperties.getDiskSectorSize());
+ if (i == 0) {
+ logPages[i].setBufferLastFlushOffset((int)nextBufferWriteOffset);
+ logPages[i].setBufferNextWriteOffset((int)nextBufferWriteOffset);
+ logPages[i].setDiskNextWriteOffset(nextDiskWriteOffset);
+ }
}
} catch (Exception e) {
e.printStackTrace();
@@ -829,7 +852,7 @@
*/
private final LinkedBlockingQueue<Object>[] flushRequestQueue;
private final Object[] flushRequests;
- private int pageToFlush;
+ private int flushPageIndex;
private final long groupCommitWaitPeriod;
private boolean isRenewRequest;
@@ -843,14 +866,14 @@
flushRequestQueue[i] = new LinkedBlockingQueue<Object>(1);
flushRequests[i] = new Object();
}
- this.pageToFlush = -1;
+ this.flushPageIndex = 0;
groupCommitWaitPeriod = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
isRenewRequest = false;
}
public void renew() {
isRenewRequest = true;
- pageToFlush = -1;
+ flushPageIndex = 0;
this.interrupt();
isRenewRequest = false;
}
@@ -886,15 +909,19 @@
@Override
public void run() {
+ int logPageSize = logManager.getLogManagerProperties().getLogPageSize();
+ int logBufferSize = logManager.getLogManagerProperties().getLogBufferSize();
+ int beforeFlushOffset = 0;
+ int afterFlushOffset = 0;
+ boolean resetFlushPageIndex = false;
+
while (true) {
try {
- pageToFlush = logManager.getNextPageInSequence(pageToFlush);
-
// A wait call on the linkedBLockingQueue. The flusher thread is
// notified when an object is added to the queue. Please note
// that each page has an associated blocking queue.
try {
- flushRequestQueue[pageToFlush].take();
+ flushRequestQueue[flushPageIndex].take();
} catch (InterruptedException ie) {
while (isRenewRequest) {
sleep(1);
@@ -902,58 +929,69 @@
continue;
}
- synchronized (logManager.getLogPage(pageToFlush)) {
+ synchronized (logManager.getLogPage(flushPageIndex)) {
- // #. sleep during the groupCommitWaitTime
+ // #. sleep for the groupCommitWaitTime
sleep(groupCommitWaitPeriod);
// #. set the logPageStatus to INACTIVE in order to prevent
// other txns from writing on this page.
- logManager.getLogPageStatus(pageToFlush).set(PageState.INACTIVE);
+ logManager.getLogPageStatus(flushPageIndex).set(PageState.INACTIVE);
// #. need to wait until the logPageOwnerCount reaches 1
// (LOG_WRITER)
// meaning every one has finished writing logs on this page.
- while (logManager.getLogPageOwnershipCount(pageToFlush).get() != PageOwnershipStatus.LOG_WRITER) {
+ while (logManager.getLogPageOwnershipCount(flushPageIndex).get() != PageOwnershipStatus.LOG_WRITER) {
sleep(0);
}
// #. set the logPageOwnerCount to 0 (LOG_FLUSHER)
// meaning it is flushing.
- logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_FLUSHER);
+ logManager.getLogPageOwnershipCount(flushPageIndex).set(PageOwnershipStatus.LOG_FLUSHER);
+
+ beforeFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
// put the content to disk (the thread still has a lock on
// the log page)
- logManager.getLogPage(pageToFlush).flush();
+ logManager.getLogPage(flushPageIndex).flush();
- // Map the log page to a new region in the log file.
- long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition()
- + logManager.getLogManagerProperties().getLogBufferSize();
+ afterFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
- logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1
- + logManager.getLogManagerProperties().getLogBufferSize(), nextWritePosition, pageToFlush);
+ // increment the last flushed lsn
+ logManager.incrementLastFlushedLsn(afterFlushOffset - beforeFlushOffset);
- // increment the last flushed lsn and lastFlushedPage
- logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize());
+ // Map the log page to a new region in the log file if the flushOffset reached the logPageSize
+ if (afterFlushOffset == logPageSize) {
+ long diskNextWriteOffset = logManager.getLogPages()[flushPageIndex].getDiskNextWriteOffset()
+ + logBufferSize;
+ logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1 + logBufferSize,
+ diskNextWriteOffset, flushPageIndex);
+ resetFlushPageIndex = true;
+ }
// decrement activeTxnCountOnIndexes
- logManager.decrementActiveTxnCountOnIndexes(pageToFlush);
+ logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);
// reset the count to 1
- logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_WRITER);
+ logManager.getLogPageOwnershipCount(flushPageIndex).set(PageOwnershipStatus.LOG_WRITER);
// mark the page as ACTIVE
- logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE);
+ logManager.getLogPageStatus(flushPageIndex).set(LogManager.PageState.ACTIVE);
// #. checks the queue whether there is another flush
// request on the same log buffer
// If there is another request, then simply remove it.
- if (flushRequestQueue[pageToFlush].peek() != null) {
- flushRequestQueue[pageToFlush].take();
+ if (flushRequestQueue[flushPageIndex].peek() != null) {
+ flushRequestQueue[flushPageIndex].take();
}
// notify all waiting (transaction) threads.
- logManager.getLogPage(pageToFlush).notifyAll();
+ logManager.getLogPage(flushPageIndex).notifyAll();
+
+ if (resetFlushPageIndex) {
+ flushPageIndex = logManager.getNextPageInSequence(flushPageIndex);
+ resetFlushPageIndex = false;
+ }
}
} catch (IOException ioe) {
ioe.printStackTrace();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
index 5040fa9..581ce4c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
@@ -28,6 +28,7 @@
public static final String NUM_LOG_PAGES_KEY = "num_log_pages";
public static final String LOG_FILE_PREFIX_KEY = "log_file_prefix";
public static final String GROUP_COMMIT_WAIT_PERIOD_KEY = "group_commit_wait_period";
+ public static final String DISK_SECTOR_SIZE_KEY = "disk_sector_size";
private static final int DEFAULT_LOG_PAGE_SIZE = 128 * 1024; //128KB
private static final int DEFAULT_NUM_LOG_PAGES = 8;
@@ -35,6 +36,7 @@
private static final long DEFAULT_GROUP_COMMIT_WAIT_PERIOD = 200; // time in millisec.
private static final String DEFAULT_LOG_FILE_PREFIX = "asterix_transaction_log";
private static final String DEFAULT_LOG_DIRECTORY = "asterix_logs/";
+ private static final int DEFAULT_DISK_SECTOR_SIZE = 4096;
// follow the naming convention <logFilePrefix>_<number> where number starts from 0
private final String logFilePrefix;
@@ -51,6 +53,8 @@
private final int logBufferSize;
// maximum size of each log file
private final long logPartitionSize;
+ // default disk sector size
+ private final int diskSectorSize;
public LogManagerProperties(Properties properties, String nodeId) {
this.logDirKey = new String(nodeId + LOG_DIR_SUFFIX_KEY);
@@ -66,6 +70,8 @@
this.logBufferSize = logPageSize * numLogPages;
//make sure that the log partition size is the multiple of log buffer size.
this.logPartitionSize = (logPartitionSize / logBufferSize) * logBufferSize;
+ this.diskSectorSize = Integer.parseInt(properties.getProperty(DISK_SECTOR_SIZE_KEY, ""
+ + DEFAULT_DISK_SECTOR_SIZE));
}
public long getLogPartitionSize() {
@@ -99,6 +105,10 @@
public String getLogDirKey() {
return logDirKey;
}
+
+ public int getDiskSectorSize() {
+ return diskSectorSize;
+ }
public String toString() {
StringBuilder builder = new StringBuilder();
@@ -108,6 +118,7 @@
builder.append("num_log_pages : " + numLogPages + FileUtil.lineSeparator);
builder.append("log_partition_size : " + logPartitionSize + FileUtil.lineSeparator);
builder.append("group_commit_wait_period : " + groupCommitWaitPeriod + FileUtil.lineSeparator);
+ builder.append("disk_sector_size : " + diskSectorSize + FileUtil.lineSeparator);
return builder.toString();
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
index 6b882ef..8eae204 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
@@ -60,7 +60,7 @@
private final int RESOURCE_ID_POS = 25;
private final int RESOURCE_MGR_ID_POS = 33;
private final int LOG_RECORD_SIZE_POS = 34;
-
+
private ILogManager logManager;
public LogRecordHelper(ILogManager logManager) {
@@ -118,7 +118,11 @@
@Override
public int getLogContentSize(LogicalLogLocator logicalLogLocater) {
- return logicalLogLocater.getBuffer().readInt(logicalLogLocater.getMemoryOffset() + LOG_RECORD_SIZE_POS);
+ if (getLogType(logicalLogLocater) == LogType.COMMIT || getLogType(logicalLogLocater) == LogType.ENTITY_COMMIT) {
+ return 0;
+ } else {
+ return logicalLogLocater.getBuffer().readInt(logicalLogLocater.getMemoryOffset() + LOG_RECORD_SIZE_POS);
+ }
}
@Override