merged master to variable log page size issue branch
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
index 4319b8b..e88d74e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
@@ -19,6 +19,9 @@
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Represent a buffer that is backed by a physical file. Provider custom APIs
@@ -27,22 +30,32 @@
public class FileBasedBuffer extends Buffer implements IFileBasedBuffer {
private String filePath;
- private long nextWritePosition;
private FileChannel fileChannel;
private RandomAccessFile raf;
- private int size;
+ private int bufferSize;
- public FileBasedBuffer(String filePath, long offset, int size) throws IOException {
+ private int bufferLastFlushOffset;
+ private int bufferNextWriteOffset;
+ private final int diskSectorSize;
+
+ private final ReadWriteLock latch;
+ private final AtomicInteger referenceCount;
+
+ public FileBasedBuffer(String filePath, long offset, int bufferSize, int diskSectorSize) throws IOException {
this.filePath = filePath;
- this.nextWritePosition = offset;
- buffer = ByteBuffer.allocate(size);
+ buffer = ByteBuffer.allocate(bufferSize);
raf = new RandomAccessFile(new File(filePath), "rw");
- raf.seek(offset);
fileChannel = raf.getChannel();
+ fileChannel.position(offset);
fileChannel.read(buffer);
buffer.position(0);
- this.size = size;
- buffer.limit(size);
+ this.bufferSize = bufferSize;
+ buffer.limit(bufferSize);
+ bufferLastFlushOffset = 0;
+ bufferNextWriteOffset = 0;
+ this.diskSectorSize = diskSectorSize;
+ latch = new ReentrantReadWriteLock(true);
+ referenceCount = new AtomicInteger(0);
}
public String getFilePath() {
@@ -53,17 +66,9 @@
this.filePath = filePath;
}
- public long getOffset() {
- return nextWritePosition;
- }
-
- public void setOffset(long offset) {
- this.nextWritePosition = offset;
- }
-
@Override
public int getSize() {
- return buffer.limit();
+ return bufferSize;
}
public void clear() {
@@ -72,11 +77,18 @@
@Override
public void flush() throws IOException {
- buffer.position(0);
- buffer.limit(size);
+ //flush
+ int pos = bufferLastFlushOffset;
+ int limit = (((bufferNextWriteOffset - 1) / diskSectorSize) + 1) * diskSectorSize;
+ buffer.position(pos);
+ buffer.limit(limit);
fileChannel.write(buffer);
fileChannel.force(true);
- erase();
+
+ //update variables
+ bufferLastFlushOffset = limit;
+ bufferNextWriteOffset = limit;
+ buffer.limit(bufferSize);
}
@Override
@@ -124,45 +136,110 @@
* starting at offset.
*/
@Override
- public void reset(String filePath, long nextWritePosition, int size) throws IOException {
+ public void reset(String filePath, long diskNextWriteOffset, int bufferSize) throws IOException {
if (!filePath.equals(this.filePath)) {
raf.close();//required?
fileChannel.close();
raf = new RandomAccessFile(filePath, "rw");
this.filePath = filePath;
}
- this.nextWritePosition = nextWritePosition;
- raf.seek(nextWritePosition);
fileChannel = raf.getChannel();
+ fileChannel.position(diskNextWriteOffset);
erase();
buffer.position(0);
- buffer.limit(size);
- this.size = size;
+ buffer.limit(bufferSize);
+ this.bufferSize = bufferSize;
+
+ bufferLastFlushOffset = 0;
+ bufferNextWriteOffset = 0;
}
-
+
@Override
public void close() throws IOException {
fileChannel.close();
}
-
+
@Override
- public void open(String filePath, long offset, int size) throws IOException {
+ public void open(String filePath, long offset, int bufferSize) throws IOException {
raf = new RandomAccessFile(filePath, "rw");
- this.nextWritePosition = offset;
fileChannel = raf.getChannel();
fileChannel.position(offset);
erase();
buffer.position(0);
- buffer.limit(size);
- this.size = size;
+ buffer.limit(bufferSize);
+ this.bufferSize = bufferSize;
+ bufferLastFlushOffset = 0;
+ bufferNextWriteOffset = 0;
}
- public long getNextWritePosition() {
- return nextWritePosition;
+ @Override
+ public long getDiskNextWriteOffset() throws IOException {
+ return fileChannel.position();
}
- public void setNextWritePosition(long nextWritePosition) {
- this.nextWritePosition = nextWritePosition;
+ @Override
+ public void setDiskNextWriteOffset(long offset) throws IOException {
+ fileChannel.position(offset);
}
+ @Override
+ public int getBufferLastFlushOffset() {
+ return bufferLastFlushOffset;
+ }
+
+ @Override
+ public void setBufferLastFlushOffset(int offset) {
+ this.bufferLastFlushOffset = offset;
+ }
+
+ @Override
+ public int getBufferNextWriteOffset() {
+ synchronized (fileChannel) {
+ return bufferNextWriteOffset;
+ }
+ }
+
+ @Override
+ public void setBufferNextWriteOffset(int offset) {
+ synchronized (fileChannel) {
+ if (bufferNextWriteOffset < offset) {
+ bufferNextWriteOffset = offset;
+ }
+ }
+ }
+
+ @Override
+ public void acquireWriteLatch() {
+ latch.writeLock().lock();
+ }
+
+ @Override
+ public void releaseWriteLatch() {
+ latch.writeLock().unlock();
+ }
+
+ @Override
+ public void acquireReadLatch() {
+ latch.readLock().lock();
+ }
+
+ @Override
+ public void releaseReadLatch() {
+ latch.readLock().unlock();
+ }
+
+ @Override
+ public void incRefCnt() {
+ referenceCount.incrementAndGet();
+ }
+
+ @Override
+ public void decRefCnt() {
+ referenceCount.decrementAndGet();
+ }
+
+ @Override
+ public int getRefCnt() {
+ return referenceCount.get();
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java
index f2ffa0e..46e03f1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java
@@ -38,8 +38,8 @@
return (new File(path)).mkdir();
}
- public static IFileBasedBuffer getFileBasedBuffer(String filePath, long offset, int size) throws IOException {
- IFileBasedBuffer fileBasedBuffer = new FileBasedBuffer(filePath, offset, size);
+ public static IFileBasedBuffer getFileBasedBuffer(String filePath, long offset, int bufferSize, int diskSectorSize) throws IOException {
+ IFileBasedBuffer fileBasedBuffer = new FileBasedBuffer(filePath, offset, bufferSize, diskSectorSize);
return fileBasedBuffer;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
index e1f9f95..a4ea3cb 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
@@ -31,12 +31,34 @@
*/
public void reset(String filePath, long offset, int size) throws IOException;
- public long getNextWritePosition();
+ public long getDiskNextWriteOffset() throws IOException;
- public void setNextWritePosition(long writePosition);
+ public void setDiskNextWriteOffset(long writePosition) throws IOException;
public void close() throws IOException;
public void open(String filePath, long offset, int size) throws IOException;
+ public int getBufferLastFlushOffset();
+
+ public void setBufferLastFlushOffset(int offset);
+
+ public int getBufferNextWriteOffset();
+
+ public void setBufferNextWriteOffset(int offset);
+
+ public void acquireWriteLatch();
+
+ public void releaseWriteLatch();
+
+ public void acquireReadLatch();
+
+ public void releaseReadLatch();
+
+ public void incRefCnt();
+
+ public void decRefCnt();
+
+ public int getRefCnt();
+
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
index c629d03..26229a7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
@@ -53,17 +53,6 @@
ACIDException;
/**
- * Provides a cursor for retrieving logs that satisfy a given ILogFilter
- * instance. Log records are retrieved in increasing order of lsn
- *
- * @param logFilter
- * specifies the filtering criteria for the retrieved logs
- * @return LogCursor an iterator for the retrieved logs
- * @throws ACIDException
- */
- public ILogCursor readLog(ILogFilter logFilter) throws ACIDException;
-
- /**
* @param logicalLogLocator TODO
* @param PhysicalLogLocator
* specifies the location of the log record to be read
@@ -72,15 +61,6 @@
public void readLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException;
/**
- * Flushes the log records up to the lsn represented by the
- * logicalLogLocator
- *
- * @param logicalLogLocator
- * @throws ACIDException
- */
- public void flushLog(LogicalLogLocator logicalLogLocator) throws ACIDException;
-
- /**
* Retrieves the configuration parameters of the ILogManager
*
* @return LogManagerProperties: the configuration parameters for the
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
index 80f74cb..0e24f9d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
@@ -25,41 +25,43 @@
public interface ILogRecordHelper {
- byte getLogType(LogicalLogLocator logicalLogLocator);
+ public byte getLogType(LogicalLogLocator logicalLogLocator);
- int getJobId(LogicalLogLocator logicalLogLocator);
+ public int getJobId(LogicalLogLocator logicalLogLocator);
- int getDatasetId(LogicalLogLocator logicalLogLocator);
+ public int getDatasetId(LogicalLogLocator logicalLogLocator);
- int getPKHashValue(LogicalLogLocator logicalLogLocator);
+ public int getPKHashValue(LogicalLogLocator logicalLogLocator);
- PhysicalLogLocator getPrevLSN(LogicalLogLocator logicalLogLocator);
+ public PhysicalLogLocator getPrevLSN(LogicalLogLocator logicalLogLocator);
- boolean getPrevLSN(PhysicalLogLocator physicalLogLocator, LogicalLogLocator logicalLogLocator);
+ public boolean getPrevLSN(PhysicalLogLocator physicalLogLocator, LogicalLogLocator logicalLogLocator);
- long getResourceId(LogicalLogLocator logicalLogLocator);
+ public long getResourceId(LogicalLogLocator logicalLogLocator);
- byte getResourceMgrId(LogicalLogLocator logicalLogLocater);
+ public byte getResourceMgrId(LogicalLogLocator logicalLogLocater);
- int getLogContentSize(LogicalLogLocator logicalLogLocater);
+ public int getLogContentSize(LogicalLogLocator logicalLogLocater);
- long getLogChecksum(LogicalLogLocator logicalLogLocator);
+ public long getLogChecksum(LogicalLogLocator logicalLogLocator);
- int getLogContentBeginPos(LogicalLogLocator logicalLogLocator);
+ public int getLogContentBeginPos(LogicalLogLocator logicalLogLocator);
- int getLogContentEndPos(LogicalLogLocator logicalLogLocator);
+ public int getLogContentEndPos(LogicalLogLocator logicalLogLocator);
- String getLogRecordForDisplay(LogicalLogLocator logicalLogLocator);
+ public String getLogRecordForDisplay(LogicalLogLocator logicalLogLocator);
- void writeLogHeader(LogicalLogLocator logicalLogLocator, byte logType, TransactionContext context, int datasetId,
+ public void writeLogHeader(LogicalLogLocator logicalLogLocator, byte logType, TransactionContext context, int datasetId,
int PKHashValue, long prevLogicalLogLocator, long resourceId, byte resourceMgrId, int logRecordSize);
- boolean validateLogRecord(LogicalLogLocator logicalLogLocator);
+ public boolean validateLogRecord(LogicalLogLocator logicalLogLocator);
- int getLogRecordSize(byte logType, int logBodySize);
+ public int getLogRecordSize(byte logType, int logBodySize);
- int getLogHeaderSize(byte logType);
+ public int getLogHeaderSize(byte logType);
- int getLogChecksumSize();
+ public int getLogChecksumSize();
+
+ public int getCommitLogSize();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index 8a2b188..7e954d8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -23,27 +23,16 @@
private final LogManager logManager;
private final ILogFilter logFilter;
+ private final int logPageSize;
private IBuffer readOnlyBuffer;
private LogicalLogLocator logicalLogLocator = null;
- private long bufferIndex = 0;
- private boolean firstNext = true;
- private boolean readMemory = false;
- private long readLSN = 0;
private boolean needReloadBuffer = true;
- /**
- * @param logFilter
- */
- public LogCursor(final LogManager logManager, ILogFilter logFilter) throws ACIDException {
+ public LogCursor(final LogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter,
+ int logPageSize) throws IOException, ACIDException {
this.logFilter = logFilter;
this.logManager = logManager;
-
- }
-
- public LogCursor(final LogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter)
- throws IOException, ACIDException {
- this.logFilter = logFilter;
- this.logManager = logManager;
+ this.logPageSize = logPageSize;
initialize(startingPhysicalLogLocator);
}
@@ -57,7 +46,8 @@
File file = new File(filePath);
if (file.exists()) {
return FileUtil.getFileBasedBuffer(filePath, lsn
- % logManager.getLogManagerProperties().getLogPartitionSize(), size);
+ % logManager.getLogManagerProperties().getLogPartitionSize(), size, logManager
+ .getLogManagerProperties().getDiskSectorSize());
} else {
return null;
}
@@ -87,8 +77,7 @@
return false;
}
- //if the lsn to read is greater than the last flushed lsn, then read from memory
- if (logicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
+ if (logManager.isMemoryRead(logicalLogLocator.getLsn())) {
return readFromMemory(currentLogLocator);
}
@@ -96,10 +85,9 @@
//needReloadBuffer is set to true if the log record is read from the memory log page.
if (needReloadBuffer) {
//log page size doesn't exceed integer boundary
- int offset = (int)(logicalLogLocator.getLsn() % logManager.getLogManagerProperties().getLogPageSize());
+ int offset = (int) (logicalLogLocator.getLsn() % logPageSize);
long adjustedLSN = logicalLogLocator.getLsn() - offset;
- readOnlyBuffer = getReadOnlyBuffer(adjustedLSN, logManager.getLogManagerProperties()
- .getLogPageSize());
+ readOnlyBuffer = getReadOnlyBuffer(adjustedLSN, logPageSize);
logicalLogLocator.setBuffer(readOnlyBuffer);
logicalLogLocator.setMemoryOffset(offset);
needReloadBuffer = false;
@@ -110,14 +98,14 @@
while (logicalLogLocator.getMemoryOffset() <= readOnlyBuffer.getSize()
- logManager.getLogRecordHelper().getLogHeaderSize(LogType.COMMIT)) {
integerRead = readOnlyBuffer.readInt(logicalLogLocator.getMemoryOffset());
- if (integerRead == logManager.getLogManagerProperties().LOG_MAGIC_NUMBER) {
+ if (integerRead == LogManagerProperties.LOG_MAGIC_NUMBER) {
logRecordBeginPosFound = true;
break;
}
logicalLogLocator.increaseMemoryOffset(1);
logicalLogLocator.incrementLsn();
bytesSkipped++;
- if (bytesSkipped > logManager.getLogManagerProperties().getLogPageSize()) {
+ if (bytesSkipped > logPageSize) {
return false; // the maximum size of a log record is limited to
// a log page size. If we have skipped as many
// bytes without finding a log record, it
@@ -133,10 +121,9 @@
// need to reload the buffer
// TODO
// reduce IO by reading more pages(equal to logBufferSize) at a time.
- long lsnpos = ((logicalLogLocator.getLsn() / logManager.getLogManagerProperties().getLogPageSize()) + 1)
- * logManager.getLogManagerProperties().getLogPageSize();
+ long lsnpos = ((logicalLogLocator.getLsn() / logPageSize) + 1) * logPageSize;
- readOnlyBuffer = getReadOnlyBuffer(lsnpos, logManager.getLogManagerProperties().getLogPageSize());
+ readOnlyBuffer = getReadOnlyBuffer(lsnpos, logPageSize);
if (readOnlyBuffer != null) {
logicalLogLocator.setBuffer(readOnlyBuffer);
logicalLogLocator.setLsn(lsnpos);
@@ -190,13 +177,12 @@
IFileBasedBuffer logPage = logManager.getLogPage(pageIndex);
synchronized (logPage) {
// need to check again if the log record in the log buffer or has reached the disk
- if (lsn > logManager.getLastFlushedLsn().get()) {
+ if (logManager.isMemoryRead(lsn)) {
//find the magic number to identify the start of the log record
//----------------------------------------------------------------
int readNumber = -1;
- int logPageSize = logManager.getLogManagerProperties().getLogPageSize();
- int logMagicNumber = logManager.getLogManagerProperties().LOG_MAGIC_NUMBER;
+ int logMagicNumber = LogManagerProperties.LOG_MAGIC_NUMBER;
int bytesSkipped = 0;
boolean logRecordBeginPosFound = false;
//check whether the currentOffset has enough space to have new log record by comparing
@@ -223,7 +209,8 @@
// need to read the next log page
readOnlyBuffer = null;
logicalLogLocator.setBuffer(null);
- logicalLogLocator.setLsn(lsn / logPageSize + 1);
+ lsn = ((logicalLogLocator.getLsn() / logPageSize) + 1) * logPageSize;
+ logicalLogLocator.setLsn(lsn);
logicalLogLocator.setMemoryOffset(0);
return next(currentLogLocator);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 9b8f09c..199fd0f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -27,15 +27,12 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageOwnershipStatus;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageState;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -48,6 +45,9 @@
private final TransactionSubsystem provider;
private LogManagerProperties logManagerProperties;
private LogPageFlushThread logPageFlusher;
+ private final int logPageSize;
+ private long statLogSize;
+ private long statLogCount;
/*
* the array of log pages. The number of log pages is configurable. Pages
@@ -62,47 +62,6 @@
*/
private int numLogPages;
- /*
- * Initially all pages have an owner count of 1 that is the LogManager. When
- * a transaction requests to write in a log page, the owner count is
- * incremented. The log manager reserves space in the log page and puts in
- * the log header but leaves the space for the content and the checksum
- * (covering the whole log record). When the content has been put, the log
- * manager computes the checksum and puts it after the content. At this
- * point, the ownership count is decremented as the transaction is done with
- * using the page. When a page is requested to be flushed, logPageFlusher
- * set the count to 0(LOG_FLUSHER: meaning that the page is being flushed)
- * only if the count is 1(LOG_WRITER: meaning that there is no other
- * transactions who own the page to write logs.) After flushing the page,
- * logPageFlusher set this count to 1.
- */
- private AtomicInteger[] logPageOwnerCount;
-
- static class PageOwnershipStatus {
- public static final int LOG_WRITER = 1;
- public static final int LOG_FLUSHER = 0;
- }
-
- /*
- * LogPageStatus: A page is either ACTIVE or INACTIVE. The status for each
- * page is maintained in logPageStatus. A page is ACTIVE when the LogManager
- * can allocate space in the page for writing a log record. Initially all
- * pages are ACTIVE. As transactions fill up space by writing log records, a
- * page may not have sufficient space left for serving a request by a
- * transaction. When this happens, the page is flushed to disk by calling
- * logPageFlusher.requestFlush(). In the requestFlush(), after
- * groupCommitWaitTime, the page status is set to INACTIVE. Then, there is
- * no more writer on the page(meaning the corresponding logPageOwnerCount is
- * 1), the page is flushed by the logPageFlusher and the status is reset to
- * ACTIVE by the logPageFlusher.
- */
- private AtomicInteger[] logPageStatus;
-
- static class PageState {
- public static final int INACTIVE = 0;
- public static final int ACTIVE = 1;
- }
-
private AtomicLong lastFlushedLSN = new AtomicLong(-1);
/*
@@ -129,10 +88,6 @@
return lastFlushedLSN;
}
- public AtomicInteger getLogPageStatus(int pageIndex) {
- return logPageStatus[pageIndex];
- }
-
public AtomicLong getCurrentLsn() {
return lsn;
}
@@ -144,13 +99,19 @@
public LogManager(TransactionSubsystem provider) throws ACIDException {
this.provider = provider;
initLogManagerProperties(this.provider.getId());
+ logPageSize = logManagerProperties.getLogPageSize();
initLogManager();
+ statLogSize = 0;
+ statLogCount = 0;
}
public LogManager(TransactionSubsystem provider, String nodeId) throws ACIDException {
this.provider = provider;
initLogManagerProperties(nodeId);
+ logPageSize = logManagerProperties.getLogPageSize();
initLogManager();
+ statLogSize = 0;
+ statLogCount = 0;
}
/*
@@ -186,9 +147,6 @@
private void initLogManager() throws ACIDException {
logRecordHelper = new LogRecordHelper(this);
numLogPages = logManagerProperties.getNumLogPages();
- logPageOwnerCount = new AtomicInteger[numLogPages];
- logPageStatus = new AtomicInteger[numLogPages];
-
activeTxnCountMaps = new ArrayList<HashMap<TransactionContext, Integer>>(numLogPages);
for (int i = 0; i < numLogPages; i++) {
activeTxnCountMaps.add(new HashMap<TransactionContext, Integer>());
@@ -199,20 +157,12 @@
/*
* place the log anchor at the end of the last log record written.
*/
- PhysicalLogLocator nextPhysicalLsn = initLSN();
-
- /*
- * initialize meta data for each log page.
- */
- for (int i = 0; i < numLogPages; i++) {
- logPageOwnerCount[i] = new AtomicInteger(PageOwnershipStatus.LOG_WRITER);
- logPageStatus[i] = new AtomicInteger(PageState.ACTIVE);
- }
+ initLSN();
/*
* initialize the log pages.
*/
- initializeLogPages(nextPhysicalLsn);
+ initializeLogPages(startingLSN);
/*
* Instantiate and begin the LogFlusher thread. The Log Flusher thread
@@ -226,7 +176,7 @@
}
public int getLogPageIndex(long lsnValue) {
- return (int) (((lsnValue - startingLSN) / logManagerProperties.getLogPageSize()) % numLogPages);
+ return (int) (((lsnValue - startingLSN) / logPageSize) % numLogPages);
}
/*
@@ -242,28 +192,7 @@
* record is (to be) placed.
*/
public int getLogPageOffset(long lsnValue) {
- return (int) ((lsnValue - startingLSN) % logManagerProperties.getLogPageSize());
- }
-
- /*
- * a transaction thread under certain scenarios is required to wait until
- * the page where it has to write a log record becomes available for writing
- * a log record.
- */
- private void waitUntillPageIsAvailableForWritingLog(int pageIndex) throws ACIDException {
- if (logPageStatus[pageIndex].get() == PageState.ACTIVE
- && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER) {
- return;
- }
- try {
- synchronized (logPages[pageIndex]) {
- while (!(logPageStatus[pageIndex].get() == PageState.ACTIVE && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER)) {
- logPages[pageIndex].wait();
- }
- }
- } catch (InterruptedException e) {
- throw new ACIDException(" thread interrupted while waiting for page " + pageIndex + " to be available ", e);
- }
+ return (int) (lsnValue % logPageSize);
}
/*
@@ -277,7 +206,6 @@
* @param logType: the type of log record.
*/
private long getLsn(int entrySize, byte logType) throws ACIDException {
- long pageSize = logManagerProperties.getLogPageSize();
while (true) {
boolean forwardPage = false;
@@ -294,9 +222,9 @@
// check if the log record will cross page boundaries, a case that
// is not allowed.
- if ((next - 1) / pageSize != old / pageSize || (next % pageSize == 0)) {
+ if ((next - 1) / logPageSize != old / logPageSize || (next % logPageSize == 0)) {
- if ((old != 0 && old % pageSize == 0)) {
+ if ((old != 0 && old % logPageSize == 0)) {
// On second thought, this shall never be the case as it
// means that the lsn is
// currently at the beginning of a page and we still need to
@@ -309,7 +237,7 @@
} else {
// set the lsn to point to the beginning of the next page.
- retVal = ((old / pageSize) + 1) * pageSize;
+ retVal = ((old / logPageSize) + 1) * logPageSize;
}
next = retVal;
@@ -323,20 +251,6 @@
pageIndex = getNextPageInSequence(pageIndex);
}
- /*
- * we do not want to keep allocating LSNs if the corresponding page
- * is unavailable. Consider a scenario when the log flusher thread
- * is incredibly slow in flushing pages. Transaction threads will
- * acquire an lsn each for writing their next log record. When a
- * page has been made available, mulltiple transaction threads that
- * were waiting can continue to write their log record at the
- * assigned LSNs. Two transaction threads may get LSNs that are on
- * the same log page but actually differ by the size of the log
- * buffer. This would be erroneous. Transaction threads are made to
- * wait upfront for avoiding this situation.
- */
- waitUntillPageIsAvailableForWritingLog(pageIndex);
-
if (!lsn.compareAndSet(old, next)) {
// Atomic call -> returns true only when the value represented
// by lsn is same as
@@ -345,6 +259,10 @@
}
if (forwardPage) {
+
+ // forward the nextWriteOffset in the log page
+ logPages[pageIndex].setBufferNextWriteOffset(logPageSize);
+
addFlushRequest(prevPage, old, false);
// The transaction thread that discovers the need to forward a
@@ -352,21 +270,18 @@
continue;
} else {
- // the transaction thread has been given a space in a log page,
- // but is made to wait until the page is available.
- // (Is this needed? when does this wait happen?)
- waitUntillPageIsAvailableForWritingLog(pageIndex);
-
+ logPages[pageIndex].acquireReadLatch();
// increment the counter as the transaction thread now holds a
// space in the log page and hence is an owner.
- logPageOwnerCount[pageIndex].incrementAndGet();
+ logPages[pageIndex].incRefCnt();
+ logPages[pageIndex].releaseReadLatch();
// Before the count is incremented, if the flusher flushed the
// allocated page,
// then retry to get new LSN. Otherwise, the log with allocated
// lsn will be lost.
if (lastFlushedLSN.get() >= retVal) {
- logPageOwnerCount[pageIndex].decrementAndGet();
+ logPages[pageIndex].decRefCnt();
continue;
}
}
@@ -396,10 +311,10 @@
int totalLogSize = logRecordHelper.getLogRecordSize(logType, logContentSize);
// check for the total space requirement to be less than a log page.
- if (totalLogSize > logManagerProperties.getLogPageSize()) {
+ if (totalLogSize > logPageSize) {
throw new ACIDException(
" Maximum Log Content Size is "
- + (logManagerProperties.getLogPageSize() - logRecordHelper.getLogHeaderSize(LogType.UPDATE) - logRecordHelper
+ + (logPageSize - logRecordHelper.getLogHeaderSize(LogType.UPDATE) - logRecordHelper
.getLogChecksumSize()));
}
@@ -482,16 +397,21 @@
logPages[pageIndex].writeLong(pageOffset + logRecordHelper.getLogHeaderSize(logType) + logContentSize,
checksum);
- if (IS_DEBUG_MODE) {
- System.out.println("--------------> LSN(" + currentLSN + ") is written");
+ // forward the nextWriteOffset in the log page
+ int bufferNextWriteOffset = (int) ((currentLSN + totalLogSize) % logPageSize);
+ if (bufferNextWriteOffset == 0) {
+ bufferNextWriteOffset = logPageSize;
}
+ logPages[pageIndex].setBufferNextWriteOffset(bufferNextWriteOffset);
- // release the ownership as the log record has been placed in
- // created space.
- logPageOwnerCount[pageIndex].decrementAndGet();
+ if (logType != LogType.ENTITY_COMMIT) {
+ // release the ownership as the log record has been placed in
+ // created space.
+ logPages[pageIndex].decRefCnt();
- // indicating that the transaction thread has released ownership
- decremented = true;
+ // indicating that the transaction thread has released ownership
+ decremented = true;
+ }
if (logType == LogType.ENTITY_COMMIT) {
map = activeTxnCountMaps.get(pageIndex);
@@ -502,18 +422,42 @@
} else {
map.put(txnCtx, 1);
}
+ //------------------------------------------------------------------------------
+ // [Notice]
+ // reference count should be decremented
+ // after activeTxnCount is incremented, but before addFlushRequest() is called.
+ //------------------------------------------------------------------------------
+ // release the ownership as the log record has been placed in
+ // created space.
+ logPages[pageIndex].decRefCnt();
+
+ // indicating that the transaction thread has released ownership
+ decremented = true;
+
addFlushRequest(pageIndex, currentLSN, false);
} else if (logType == LogType.COMMIT) {
+
addFlushRequest(pageIndex, currentLSN, true);
+ if (IS_DEBUG_MODE) {
+ System.out.println("Running sum of log size: " + statLogSize + ", log count: " + statLogCount);
+ }
}
+ if (IS_DEBUG_MODE) {
+ System.out.println("--------------> LSN(" + currentLSN + ") is written");
+ }
+
+ //collect statistics
+ statLogSize += totalLogSize;
+ statLogCount++;
+
} catch (Exception e) {
e.printStackTrace();
throw new ACIDException(txnCtx, "Thread: " + Thread.currentThread().getName()
+ " logger encountered exception", e);
} finally {
if (!decremented) {
- logPageOwnerCount[pageIndex].decrementAndGet();
+ logPages[pageIndex].decRefCnt();
}
}
}
@@ -526,20 +470,13 @@
String filePath = LogUtil.getLogFilePath(logManagerProperties, getLogFileId(lsn));
- logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition),
- logManagerProperties.getLogPageSize());
- }
-
- @Override
- public ILogCursor readLog(ILogFilter logFilter) throws ACIDException {
- LogCursor cursor = new LogCursor(this, logFilter);
- return cursor;
+ logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition), logPageSize);
}
@Override
public ILogCursor readLog(PhysicalLogLocator physicalLogLocator, ILogFilter logFilter) throws IOException,
ACIDException {
- LogCursor cursor = new LogCursor(this, physicalLogLocator, logFilter);
+ LogCursor cursor = new LogCursor(this, physicalLogLocator, logFilter, logPageSize);
return cursor;
}
@@ -550,7 +487,7 @@
String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, lsnValue));
long fileOffset = LogUtil.getFileOffset(this, lsnValue);
- ByteBuffer buffer = ByteBuffer.allocate(logManagerProperties.getLogPageSize());
+ ByteBuffer buffer = ByteBuffer.allocate(logPageSize);
RandomAccessFile raf = null;
FileChannel fileChannel = null;
try {
@@ -603,7 +540,7 @@
}
/* check if the log record in the log buffer or has reached the disk. */
- if (lsnValue > getLastFlushedLsn().get()) {
+ if (isMemoryRead(lsnValue)) {
int pageIndex = getLogPageIndex(lsnValue);
int pageOffset = getLogPageOffset(lsnValue);
@@ -611,7 +548,7 @@
// minimize memory allocation overhead. current code allocates the
// log page size per reading a log record.
- byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
+ byte[] pageContent = new byte[logPageSize];
// take a lock on the log page so that the page is not flushed to
// disk interim
@@ -619,8 +556,8 @@
// need to check again (this thread may have got de-scheduled
// and must refresh!)
- if (lsnValue > getLastFlushedLsn().get()) {
+ if (isMemoryRead(lsnValue)) {
// get the log record length
logPages[pageIndex].getBytes(pageContent, 0, pageContent.length);
byte logType = pageContent[pageOffset + 4];
@@ -656,6 +593,20 @@
readDiskLog(lsnValue, logicalLogLocator);
}
+ public boolean isMemoryRead(long currentLSN) {
+ long flushLSN = lastFlushedLSN.get();
+ if ((flushLSN + 1) % logPageSize == 0) {
+ return false;
+ }
+ long logPageBeginOffset = flushLSN - (flushLSN % logPageSize);
+ long logPageEndOffset = logPageBeginOffset + logPageSize;
+ if (currentLSN > flushLSN || (currentLSN >= logPageBeginOffset && currentLSN < logPageEndOffset)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
public void renewLogFiles() throws ACIDException {
List<String> logFileNames = LogUtil.getLogFiles(logManagerProperties);
for (String name : logFileNames) {
@@ -670,7 +621,7 @@
logPageFlusher.renew();
}
- private PhysicalLogLocator initLSN() throws ACIDException {
+ private void initLSN() throws ACIDException {
PhysicalLogLocator nextPhysicalLsn = LogUtil.initializeLogAnchor(this);
startingLSN = nextPhysicalLsn.getLsn();
lastFlushedLSN.set(startingLSN - 1);
@@ -678,7 +629,6 @@
LOGGER.info(" Starting lsn is : " + startingLSN);
}
lsn.set(startingLSN);
- return nextPhysicalLsn;
}
private void closeLogPages() throws ACIDException {
@@ -695,9 +645,7 @@
try {
String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, startingLSN));
for (int i = 0; i < numLogPages; i++) {
- logPages[i].open(filePath,
- LogUtil.getFileOffset(this, startingLSN) + i * logManagerProperties.getLogPageSize(),
- logManagerProperties.getLogPageSize());
+ logPages[i].open(filePath, LogUtil.getFileOffset(this, startingLSN) + i * logPageSize, logPageSize);
}
} catch (Exception e) {
throw new ACIDException(Thread.currentThread().getName() + " unable to create log buffer", e);
@@ -710,33 +658,25 @@
}
/*
- * This method shall be called by the Buffer manager when it needs to evict
- * a page from the cache. TODO: Change the implementation from a looping
- * logic to event based when log manager support is integrated with the
- * Buffer Manager.
- */
- @Override
- public synchronized void flushLog(LogicalLogLocator logicalLogLocator) throws ACIDException {
- if (logicalLogLocator.getLsn() > lsn.get()) {
- throw new ACIDException(" invalid lsn " + logicalLogLocator.getLsn());
- }
- while (lastFlushedLSN.get() < logicalLogLocator.getLsn());
- }
-
- /*
* Map each log page to cover a physical byte range over a log file. When a
* page is flushed, the page contents are put to disk in the corresponding
* byte range.
*/
- private void initializeLogPages(PhysicalLogLocator physicalLogLocator) throws ACIDException {
+ private void initializeLogPages(long beginLsn) throws ACIDException {
try {
- String filePath = LogUtil.getLogFilePath(logManagerProperties,
- LogUtil.getFileId(this, physicalLogLocator.getLsn()));
+ String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, beginLsn));
+ long nextDiskWriteOffset = LogUtil.getFileOffset(this, beginLsn);
+ long nextBufferWriteOffset = nextDiskWriteOffset % logPageSize;
+ long bufferBeginOffset = nextDiskWriteOffset - nextBufferWriteOffset;
+
for (int i = 0; i < numLogPages; i++) {
- logPages[i] = FileUtil.getFileBasedBuffer(
- filePath,
- LogUtil.getFileOffset(this, physicalLogLocator.getLsn()) + i
- * logManagerProperties.getLogPageSize(), logManagerProperties.getLogPageSize());
+ logPages[i] = FileUtil.getFileBasedBuffer(filePath, bufferBeginOffset + i * logPageSize, logPageSize,
+ logManagerProperties.getDiskSectorSize());
+ if (i == 0) {
+ logPages[i].setBufferLastFlushOffset((int) nextBufferWriteOffset);
+ logPages[i].setBufferNextWriteOffset((int) nextBufferWriteOffset);
+ logPages[i].setDiskNextWriteOffset(nextDiskWriteOffset);
+ }
}
} catch (Exception e) {
e.printStackTrace();
@@ -764,10 +704,6 @@
return logPages[pageIndex];
}
- public AtomicInteger getLogPageOwnershipCount(int pageIndex) {
- return logPageOwnerCount[pageIndex];
- }
-
public IFileBasedBuffer[] getLogPages() {
return logPages;
}
@@ -829,7 +765,7 @@
*/
private final LinkedBlockingQueue<Object>[] flushRequestQueue;
private final Object[] flushRequests;
- private int pageToFlush;
+ private int flushPageIndex;
private final long groupCommitWaitPeriod;
private boolean isRenewRequest;
@@ -843,14 +779,14 @@
flushRequestQueue[i] = new LinkedBlockingQueue<Object>(1);
flushRequests[i] = new Object();
}
- this.pageToFlush = -1;
+ this.flushPageIndex = 0;
groupCommitWaitPeriod = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
isRenewRequest = false;
}
public void renew() {
isRenewRequest = true;
- pageToFlush = -1;
+ flushPageIndex = 0;
this.interrupt();
isRenewRequest = false;
}
@@ -886,15 +822,19 @@
@Override
public void run() {
+ int logPageSize = logManager.getLogManagerProperties().getLogPageSize();
+ int logBufferSize = logManager.getLogManagerProperties().getLogBufferSize();
+ int beforeFlushOffset = 0;
+ int afterFlushOffset = 0;
+ boolean resetFlushPageIndex = false;
+
while (true) {
try {
- pageToFlush = logManager.getNextPageInSequence(pageToFlush);
-
// A wait call on the linkedBLockingQueue. The flusher thread is
// notified when an object is added to the queue. Please note
// that each page has an associated blocking queue.
try {
- flushRequestQueue[pageToFlush].take();
+ flushRequestQueue[flushPageIndex].take();
} catch (InterruptedException ie) {
while (isRenewRequest) {
sleep(1);
@@ -902,58 +842,67 @@
continue;
}
- synchronized (logManager.getLogPage(pageToFlush)) {
-
- // #. sleep during the groupCommitWaitTime
+ //if the log page is already full, don't wait.
+ if (logManager.getLogPage(flushPageIndex).getBufferNextWriteOffset() < logPageSize
+ - logManager.getLogRecordHelper().getCommitLogSize()) {
+ // #. sleep for the groupCommitWaitTime
sleep(groupCommitWaitPeriod);
+ }
- // #. set the logPageStatus to INACTIVE in order to prevent
- // other txns from writing on this page.
- logManager.getLogPageStatus(pageToFlush).set(PageState.INACTIVE);
+ synchronized (logManager.getLogPage(flushPageIndex)) {
+ logManager.getLogPage(flushPageIndex).acquireWriteLatch();
+ try {
- // #. need to wait until the logPageOwnerCount reaches 1
- // (LOG_WRITER)
- // meaning every one has finished writing logs on this page.
- while (logManager.getLogPageOwnershipCount(pageToFlush).get() != PageOwnershipStatus.LOG_WRITER) {
- sleep(0);
+ // #. need to wait until the reference count reaches 0
+ while (logManager.getLogPage(flushPageIndex).getRefCnt() != 0) {
+ sleep(0);
+ }
+
+ beforeFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
+
+ // put the content to disk (the thread still has a lock on the log page)
+ logManager.getLogPage(flushPageIndex).flush();
+
+ afterFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
+
+ // increment the last flushed lsn
+ logManager.incrementLastFlushedLsn(afterFlushOffset - beforeFlushOffset);
+
+ // increment currentLSN if currentLSN is less than flushLSN.
+ if (logManager.getLastFlushedLsn().get() + 1 > logManager.getCurrentLsn().get()) {
+ logManager.getCurrentLsn().set(logManager.getLastFlushedLsn().get() + 1);
+ }
+
+ // Map the log page to a new region in the log file if the flushOffset reached the logPageSize
+ if (afterFlushOffset == logPageSize) {
+ long diskNextWriteOffset = logManager.getLogPages()[flushPageIndex].getDiskNextWriteOffset()
+ + logBufferSize;
+ logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1 + logBufferSize,
+ diskNextWriteOffset, flushPageIndex);
+ resetFlushPageIndex = true;
+ }
+
+ // decrement activeTxnCountOnIndexes
+ logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);
+
+ } finally {
+ logManager.getLogPage(flushPageIndex).releaseWriteLatch();
}
- // #. set the logPageOwnerCount to 0 (LOG_FLUSHER)
- // meaning it is flushing.
- logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_FLUSHER);
-
- // put the content to disk (the thread still has a lock on
- // the log page)
- logManager.getLogPage(pageToFlush).flush();
-
- // Map the log page to a new region in the log file.
- long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition()
- + logManager.getLogManagerProperties().getLogBufferSize();
-
- logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1
- + logManager.getLogManagerProperties().getLogBufferSize(), nextWritePosition, pageToFlush);
-
- // increment the last flushed lsn and lastFlushedPage
- logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize());
-
- // decrement activeTxnCountOnIndexes
- logManager.decrementActiveTxnCountOnIndexes(pageToFlush);
-
- // reset the count to 1
- logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_WRITER);
-
- // mark the page as ACTIVE
- logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE);
-
// #. checks the queue whether there is another flush
// request on the same log buffer
// If there is another request, then simply remove it.
- if (flushRequestQueue[pageToFlush].peek() != null) {
- flushRequestQueue[pageToFlush].take();
+ if (flushRequestQueue[flushPageIndex].peek() != null) {
+ flushRequestQueue[flushPageIndex].take();
}
// notify all waiting (transaction) threads.
- logManager.getLogPage(pageToFlush).notifyAll();
+ logManager.getLogPage(flushPageIndex).notifyAll();
+
+ if (resetFlushPageIndex) {
+ flushPageIndex = logManager.getNextPageInSequence(flushPageIndex);
+ resetFlushPageIndex = false;
+ }
}
} catch (IOException ioe) {
ioe.printStackTrace();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
index 5040fa9..581ce4c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
@@ -28,6 +28,7 @@
public static final String NUM_LOG_PAGES_KEY = "num_log_pages";
public static final String LOG_FILE_PREFIX_KEY = "log_file_prefix";
public static final String GROUP_COMMIT_WAIT_PERIOD_KEY = "group_commit_wait_period";
+ public static final String DISK_SECTOR_SIZE_KEY = "disk_sector_size";
private static final int DEFAULT_LOG_PAGE_SIZE = 128 * 1024; //128KB
private static final int DEFAULT_NUM_LOG_PAGES = 8;
@@ -35,6 +36,7 @@
private static final long DEFAULT_GROUP_COMMIT_WAIT_PERIOD = 200; // time in millisec.
private static final String DEFAULT_LOG_FILE_PREFIX = "asterix_transaction_log";
private static final String DEFAULT_LOG_DIRECTORY = "asterix_logs/";
+ private static final int DEFAULT_DISK_SECTOR_SIZE = 4096;
// follow the naming convention <logFilePrefix>_<number> where number starts from 0
private final String logFilePrefix;
@@ -51,6 +53,8 @@
private final int logBufferSize;
// maximum size of each log file
private final long logPartitionSize;
+ // default disk sector size
+ private final int diskSectorSize;
public LogManagerProperties(Properties properties, String nodeId) {
this.logDirKey = new String(nodeId + LOG_DIR_SUFFIX_KEY);
@@ -66,6 +70,8 @@
this.logBufferSize = logPageSize * numLogPages;
//make sure that the log partition size is the multiple of log buffer size.
this.logPartitionSize = (logPartitionSize / logBufferSize) * logBufferSize;
+ this.diskSectorSize = Integer.parseInt(properties.getProperty(DISK_SECTOR_SIZE_KEY, ""
+ + DEFAULT_DISK_SECTOR_SIZE));
}
public long getLogPartitionSize() {
@@ -99,6 +105,10 @@
public String getLogDirKey() {
return logDirKey;
}
+
+ public int getDiskSectorSize() {
+ return diskSectorSize;
+ }
public String toString() {
StringBuilder builder = new StringBuilder();
@@ -108,6 +118,7 @@
builder.append("num_log_pages : " + numLogPages + FileUtil.lineSeparator);
builder.append("log_partition_size : " + logPartitionSize + FileUtil.lineSeparator);
builder.append("group_commit_wait_period : " + groupCommitWaitPeriod + FileUtil.lineSeparator);
+ builder.append("disk_sector_size : " + diskSectorSize + FileUtil.lineSeparator);
return builder.toString();
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
index 6b882ef..1b65d8f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
@@ -50,6 +50,9 @@
public class LogRecordHelper implements ILogRecordHelper {
private final int LOG_CHECKSUM_SIZE = 8;
+ private final int LOG_HEADER_PART1_SIZE = 17;
+ private final int LOG_HEADER_PART2_SIZE = 21;
+ private final int COMMIT_LOG_SIZE = LOG_HEADER_PART1_SIZE + LOG_CHECKSUM_SIZE;
private final int MAGIC_NO_POS = 0;
private final int LOG_TYPE_POS = 4;
@@ -60,7 +63,9 @@
private final int RESOURCE_ID_POS = 25;
private final int RESOURCE_MGR_ID_POS = 33;
private final int LOG_RECORD_SIZE_POS = 34;
+
+
private ILogManager logManager;
public LogRecordHelper(ILogManager logManager) {
@@ -118,7 +123,11 @@
@Override
public int getLogContentSize(LogicalLogLocator logicalLogLocater) {
- return logicalLogLocater.getBuffer().readInt(logicalLogLocater.getMemoryOffset() + LOG_RECORD_SIZE_POS);
+ if (getLogType(logicalLogLocater) == LogType.COMMIT || getLogType(logicalLogLocater) == LogType.ENTITY_COMMIT) {
+ return 0;
+ } else {
+ return logicalLogLocater.getBuffer().readInt(logicalLogLocater.getMemoryOffset() + LOG_RECORD_SIZE_POS);
+ }
}
@Override
@@ -178,7 +187,7 @@
/* magic no */
(logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + MAGIC_NO_POS,
- logManager.getLogManagerProperties().LOG_MAGIC_NUMBER);
+ LogManagerProperties.LOG_MAGIC_NUMBER);
/* log type */
(logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + LOG_TYPE_POS, logType);
@@ -230,18 +239,18 @@
@Override
public int getLogRecordSize(byte logType, int logBodySize) {
if (logType == LogType.UPDATE) {
- return 46 + logBodySize;
+ return LOG_HEADER_PART1_SIZE + LOG_HEADER_PART2_SIZE + LOG_CHECKSUM_SIZE + logBodySize;
} else {
- return 25;
+ return COMMIT_LOG_SIZE;
}
}
@Override
public int getLogHeaderSize(byte logType) {
if (logType == LogType.UPDATE) {
- return 38;
+ return LOG_HEADER_PART1_SIZE + LOG_HEADER_PART2_SIZE;
} else {
- return 17;
+ return LOG_HEADER_PART1_SIZE;
}
}
@@ -249,4 +258,8 @@
public int getLogChecksumSize() {
return LOG_CHECKSUM_SIZE;
}
+
+ public int getCommitLogSize() {
+ return COMMIT_LOG_SIZE;
+ }
}