reflected code review comments
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
index 354a745..e88d74e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
@@ -54,7 +54,7 @@
bufferLastFlushOffset = 0;
bufferNextWriteOffset = 0;
this.diskSectorSize = diskSectorSize;
- this.latch = new ReentrantReadWriteLock(true);
+ latch = new ReentrantReadWriteLock(true);
referenceCount = new AtomicInteger(0);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
index 4f6d2b8..a4ea3cb 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
@@ -39,13 +39,13 @@
public void open(String filePath, long offset, int size) throws IOException;
- int getBufferLastFlushOffset();
+ public int getBufferLastFlushOffset();
- void setBufferLastFlushOffset(int offset);
+ public void setBufferLastFlushOffset(int offset);
- int getBufferNextWriteOffset();
+ public int getBufferNextWriteOffset();
- void setBufferNextWriteOffset(int offset);
+ public void setBufferNextWriteOffset(int offset);
public void acquireWriteLatch();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
index ce5649a..0e24f9d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
@@ -25,43 +25,43 @@
public interface ILogRecordHelper {
- byte getLogType(LogicalLogLocator logicalLogLocator);
+ public byte getLogType(LogicalLogLocator logicalLogLocator);
- int getJobId(LogicalLogLocator logicalLogLocator);
+ public int getJobId(LogicalLogLocator logicalLogLocator);
- int getDatasetId(LogicalLogLocator logicalLogLocator);
+ public int getDatasetId(LogicalLogLocator logicalLogLocator);
- int getPKHashValue(LogicalLogLocator logicalLogLocator);
+ public int getPKHashValue(LogicalLogLocator logicalLogLocator);
- PhysicalLogLocator getPrevLSN(LogicalLogLocator logicalLogLocator);
+ public PhysicalLogLocator getPrevLSN(LogicalLogLocator logicalLogLocator);
- boolean getPrevLSN(PhysicalLogLocator physicalLogLocator, LogicalLogLocator logicalLogLocator);
+ public boolean getPrevLSN(PhysicalLogLocator physicalLogLocator, LogicalLogLocator logicalLogLocator);
- long getResourceId(LogicalLogLocator logicalLogLocator);
+ public long getResourceId(LogicalLogLocator logicalLogLocator);
- byte getResourceMgrId(LogicalLogLocator logicalLogLocater);
+ public byte getResourceMgrId(LogicalLogLocator logicalLogLocater);
- int getLogContentSize(LogicalLogLocator logicalLogLocater);
+ public int getLogContentSize(LogicalLogLocator logicalLogLocater);
- long getLogChecksum(LogicalLogLocator logicalLogLocator);
+ public long getLogChecksum(LogicalLogLocator logicalLogLocator);
- int getLogContentBeginPos(LogicalLogLocator logicalLogLocator);
+ public int getLogContentBeginPos(LogicalLogLocator logicalLogLocator);
- int getLogContentEndPos(LogicalLogLocator logicalLogLocator);
+ public int getLogContentEndPos(LogicalLogLocator logicalLogLocator);
- String getLogRecordForDisplay(LogicalLogLocator logicalLogLocator);
+ public String getLogRecordForDisplay(LogicalLogLocator logicalLogLocator);
- void writeLogHeader(LogicalLogLocator logicalLogLocator, byte logType, TransactionContext context, int datasetId,
+ public void writeLogHeader(LogicalLogLocator logicalLogLocator, byte logType, TransactionContext context, int datasetId,
int PKHashValue, long prevLogicalLogLocator, long resourceId, byte resourceMgrId, int logRecordSize);
- boolean validateLogRecord(LogicalLogLocator logicalLogLocator);
+ public boolean validateLogRecord(LogicalLogLocator logicalLogLocator);
- int getLogRecordSize(byte logType, int logBodySize);
+ public int getLogRecordSize(byte logType, int logBodySize);
- int getLogHeaderSize(byte logType);
+ public int getLogHeaderSize(byte logType);
- int getLogChecksumSize();
+ public int getLogChecksumSize();
- int getCommitLogSize();
+ public int getCommitLogSize();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index 1376e1c..7e954d8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -98,7 +98,7 @@
while (logicalLogLocator.getMemoryOffset() <= readOnlyBuffer.getSize()
- logManager.getLogRecordHelper().getLogHeaderSize(LogType.COMMIT)) {
integerRead = readOnlyBuffer.readInt(logicalLogLocator.getMemoryOffset());
- if (integerRead == logManager.getLogManagerProperties().LOG_MAGIC_NUMBER) {
+ if (integerRead == LogManagerProperties.LOG_MAGIC_NUMBER) {
logRecordBeginPosFound = true;
break;
}
@@ -182,7 +182,7 @@
//find the magic number to identify the start of the log record
//----------------------------------------------------------------
int readNumber = -1;
- int logMagicNumber = logManager.getLogManagerProperties().LOG_MAGIC_NUMBER;
+ int logMagicNumber = LogManagerProperties.LOG_MAGIC_NUMBER;
int bytesSkipped = 0;
boolean logRecordBeginPosFound = false;
//check whether the currentOffset has enough space to have new log record by comparing
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 333a661..199fd0f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -404,21 +404,15 @@
}
logPages[pageIndex].setBufferNextWriteOffset(bufferNextWriteOffset);
- if (IS_DEBUG_MODE) {
- System.out.println("--------------> LSN(" + currentLSN + ") is written");
+ if (logType != LogType.ENTITY_COMMIT) {
+ // release the ownership as the log record has been placed in
+ // created space.
+ logPages[pageIndex].decRefCnt();
+
+ // indicating that the transaction thread has released ownership
+ decremented = true;
}
- // release the ownership as the log record has been placed in
- // created space.
- logPages[pageIndex].decRefCnt();
-
- //collect statistics
- statLogSize += totalLogSize;
- statLogCount++;
-
- // indicating that the transaction thread has released ownership
- decremented = true;
-
if (logType == LogType.ENTITY_COMMIT) {
map = activeTxnCountMaps.get(pageIndex);
if (map.containsKey(txnCtx)) {
@@ -428,14 +422,35 @@
} else {
map.put(txnCtx, 1);
}
+ //------------------------------------------------------------------------------
+ // [Notice]
+ // reference count should be decremented
+ // after activeTxnCount is incremented, but before addFlushRequest() is called.
+ //------------------------------------------------------------------------------
+ // release the ownership as the log record has been placed in
+ // created space.
+ logPages[pageIndex].decRefCnt();
+
+ // indicating that the transaction thread has released ownership
+ decremented = true;
+
addFlushRequest(pageIndex, currentLSN, false);
} else if (logType == LogType.COMMIT) {
+
addFlushRequest(pageIndex, currentLSN, true);
if (IS_DEBUG_MODE) {
System.out.println("Running sum of log size: " + statLogSize + ", log count: " + statLogCount);
}
}
+ if (IS_DEBUG_MODE) {
+ System.out.println("--------------> LSN(" + currentLSN + ") is written");
+ }
+
+ //collect statistics
+ statLogSize += totalLogSize;
+ statLogCount++;
+
} catch (Exception e) {
e.printStackTrace();
throw new ACIDException(txnCtx, "Thread: " + Thread.currentThread().getName()
@@ -827,17 +842,17 @@
continue;
}
+ //if the log page is already full, don't wait.
+ if (logManager.getLogPage(flushPageIndex).getBufferNextWriteOffset() < logPageSize
+ - logManager.getLogRecordHelper().getCommitLogSize()) {
+ // #. sleep for the groupCommitWaitTime
+ sleep(groupCommitWaitPeriod);
+ }
+
synchronized (logManager.getLogPage(flushPageIndex)) {
logManager.getLogPage(flushPageIndex).acquireWriteLatch();
try {
- //if the log page is already full, don't wait.
- if (logManager.getLogPage(flushPageIndex).getBufferNextWriteOffset() > logPageSize
- - logManager.getLogRecordHelper().getCommitLogSize()) {
- // #. sleep for the groupCommitWaitTime
- sleep(groupCommitWaitPeriod);
- }
-
// #. need to wait until the reference count reaches 0
while (logManager.getLogPage(flushPageIndex).getRefCnt() != 0) {
sleep(0);
@@ -857,33 +872,33 @@
if (logManager.getLastFlushedLsn().get() + 1 > logManager.getCurrentLsn().get()) {
logManager.getCurrentLsn().set(logManager.getLastFlushedLsn().get() + 1);
}
-
+
// Map the log page to a new region in the log file if the flushOffset reached the logPageSize
if (afterFlushOffset == logPageSize) {
- long diskNextWriteOffset = logManager.getLogPages()[flushPageIndex]
- .getDiskNextWriteOffset() + logBufferSize;
+ long diskNextWriteOffset = logManager.getLogPages()[flushPageIndex].getDiskNextWriteOffset()
+ + logBufferSize;
logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1 + logBufferSize,
diskNextWriteOffset, flushPageIndex);
resetFlushPageIndex = true;
}
-
+
// decrement activeTxnCountOnIndexes
logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);
- // #. checks the queue whether there is another flush
- // request on the same log buffer
- // If there is another request, then simply remove it.
- if (flushRequestQueue[flushPageIndex].peek() != null) {
- flushRequestQueue[flushPageIndex].take();
- }
-
- // notify all waiting (transaction) threads.
- logManager.getLogPage(flushPageIndex).notifyAll();
-
} finally {
logManager.getLogPage(flushPageIndex).releaseWriteLatch();
}
+ // #. checks the queue whether there is another flush
+ // request on the same log buffer
+ // If there is another request, then simply remove it.
+ if (flushRequestQueue[flushPageIndex].peek() != null) {
+ flushRequestQueue[flushPageIndex].take();
+ }
+
+ // notify all waiting (transaction) threads.
+ logManager.getLogPage(flushPageIndex).notifyAll();
+
if (resetFlushPageIndex) {
flushPageIndex = logManager.getNextPageInSequence(flushPageIndex);
resetFlushPageIndex = false;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
index e46b7ac..1b65d8f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
@@ -50,6 +50,9 @@
public class LogRecordHelper implements ILogRecordHelper {
private final int LOG_CHECKSUM_SIZE = 8;
+ private final int LOG_HEADER_PART1_SIZE = 17;
+ private final int LOG_HEADER_PART2_SIZE = 21;
+ private final int COMMIT_LOG_SIZE = LOG_HEADER_PART1_SIZE + LOG_CHECKSUM_SIZE;
private final int MAGIC_NO_POS = 0;
private final int LOG_TYPE_POS = 4;
@@ -61,7 +64,7 @@
private final int RESOURCE_MGR_ID_POS = 33;
private final int LOG_RECORD_SIZE_POS = 34;
- public final int COMMIT_LOG_SIZE = PREV_LSN_POS + LOG_CHECKSUM_SIZE;
+
private ILogManager logManager;
@@ -184,7 +187,7 @@
/* magic no */
(logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + MAGIC_NO_POS,
- logManager.getLogManagerProperties().LOG_MAGIC_NUMBER);
+ LogManagerProperties.LOG_MAGIC_NUMBER);
/* log type */
(logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + LOG_TYPE_POS, logType);
@@ -236,7 +239,7 @@
@Override
public int getLogRecordSize(byte logType, int logBodySize) {
if (logType == LogType.UPDATE) {
- return 46 + logBodySize;
+ return LOG_HEADER_PART1_SIZE + LOG_HEADER_PART2_SIZE + LOG_CHECKSUM_SIZE + logBodySize;
} else {
return COMMIT_LOG_SIZE;
}
@@ -245,9 +248,9 @@
@Override
public int getLogHeaderSize(byte logType) {
if (logType == LogType.UPDATE) {
- return 38;
+ return LOG_HEADER_PART1_SIZE + LOG_HEADER_PART2_SIZE;
} else {
- return 17;
+ return LOG_HEADER_PART1_SIZE;
}
}