[NO ISSUE][TX] Make TxnLogFile Close Idempotent
- user model changes: no
- storage format changes: no
- interface changes: yes
Renamed ILogReader.initializeScan to setPosition and added
javadocs.
Details:
Currently there is an explicit check that the file channel
of a TxnLogFile is open before closing it. However, the
channel could be closed due to interrupts and therefore
we should remove the explicit check and always try to close
it. However, we should always decrement the TxnLogFile
references counter even if the channel is not open since
that TxnLogFile is not accessed anymore.
Change-Id: I255e4b9af0bc78298c0a25daf0b5629d413eba6a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2165
Reviewed-by: Till Westmann <tillw@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 7bc5697..19966fe 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -201,7 +201,7 @@
jobId2WinnerEntitiesMap = new HashMap<>();
//set log reader to the lowWaterMarkLsn
ILogRecord logRecord;
- logReader.initializeScan(lowWaterMarkLSN);
+ logReader.setPosition(lowWaterMarkLSN);
logRecord = logReader.next();
while (logRecord != null) {
if (IS_DEBUG_MODE) {
@@ -300,7 +300,7 @@
ILogRecord logRecord = null;
try {
- logReader.initializeScan(lowWaterMarkLSN);
+ logReader.setPosition(lowWaterMarkLSN);
logRecord = logReader.next();
while (logRecord != null) {
if (IS_DEBUG_MODE) {
@@ -540,7 +540,7 @@
Set<Integer> activePartitions = localResourceRepository.getActivePartitions();
ILogReader logReader = logMgr.getLogReader(false);
try {
- logReader.initializeScan(firstLSN);
+ logReader.setPosition(firstLSN);
ILogRecord logRecord = null;
while (currentLSN < lastLSN) {
logRecord = logReader.next();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java
index da188e3..8539e2b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogReader.java
@@ -18,18 +18,34 @@
*/
package org.apache.asterix.common.transactions;
-import org.apache.asterix.common.exceptions.ACIDException;
-
public interface ILogReader {
- public void initializeScan(long beginLSN) throws ACIDException;
+ /**
+ * Sets the log reader position at log sequence number with value {@code lsn}.
+ *
+ * @param lsn
+ */
+ void setPosition(long lsn);
- //for scanning
- public ILogRecord next() throws ACIDException;
+ /**
+ * Reads and returns the log record located at the log reader current position. After reading the log record,
+ * the log reader position is incremented by the size of the read log.
+ *
+ * @return the log record
+ */
+ ILogRecord next();
- //for random reading
- public ILogRecord read(long readLSN) throws ACIDException;
+ /**
+ * Reads and returns the log record with log sequence number {@code lsn}.
+ *
+ * @param lsn
+ * @return The log record
+ */
+ ILogRecord read(long lsn);
- public void close() throws ACIDException;
+ /**
+ * Closes the log reader and any resources used.
+ */
+ void close();
-}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index dd0a5c7..cdd957a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -580,7 +580,7 @@
@Override
public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException {
if (!fileChannel.isOpen()) {
- throw new IllegalStateException("File channel is not open");
+ LOGGER.warning(() -> "Closing log file with id(" + logFileRef.getLogFileId() + ") with a closed channel.");
}
fileChannel.close();
untouchLogFile(logFileRef.getLogFileId());
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
index 148aa7e..f2c5eef 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
@@ -30,14 +30,11 @@
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.MutableLong;
import org.apache.asterix.common.transactions.TxnLogFile;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
-/**
- * NOTE: Many method calls of this class are not thread safe.
- * Be very cautious using it in a multithreaded context.
- */
+@NotThreadSafe
public class LogReader implements ILogReader {
- public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(LogReader.class.getName());
private final ILogManager logMgr;
private final long logFileSize;
@@ -54,7 +51,7 @@
private enum ReturnState {
FLUSH,
EOF
- };
+ }
public LogReader(ILogManager logMgr, long logFileSize, int logPageSize, MutableLong flushLSN,
boolean isRecoveryMode) {
@@ -68,8 +65,8 @@
}
@Override
- public void initializeScan(long beginLSN) throws ACIDException {
- readLSN = beginLSN;
+ public void setPosition(long lsn) {
+ readLSN = lsn;
if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) {
return;
}
@@ -84,7 +81,7 @@
* @throws ACIDException
*/
@Override
- public ILogRecord next() throws ACIDException {
+ public ILogRecord next() {
if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) {
return null;
}
@@ -147,13 +144,10 @@
return ReturnState.EOF;
}
try {
- if (IS_DEBUG_MODE) {
- LOGGER.info(
- "waitForFlushOrReturnIfEOF()| flushLSN: " + flushLSN.get() + ", readLSN: " + readLSN);
- }
flushLSN.wait();
} catch (InterruptedException e) {
- //ignore
+ Thread.currentThread().interrupt();
+ throw new ACIDException(e);
}
}
return ReturnState.FLUSH;
@@ -166,10 +160,9 @@
* @return true if log continues, false if EOF
* @throws ACIDException
*/
- private boolean refillLogReadBuffer() throws ACIDException {
+ private boolean refillLogReadBuffer() {
try {
if (readLSN % logFileSize == logFile.size()) {
- logFile.close();
readLSN += logFileSize - (readLSN % logFileSize);
getLogFile();
}
@@ -183,14 +176,12 @@
* Fills the log buffer with data from the log file at the current position
*
* @return false if EOF, true otherwise
- * @throws ACIDException
*/
-
- private boolean fillLogReadBuffer() throws ACIDException {
+ private boolean fillLogReadBuffer() {
return fillLogReadBuffer(logPageSize, readBuffer);
}
- private boolean fillLogReadBuffer(int readSize, ByteBuffer readBuffer) throws ACIDException {
+ private boolean fillLogReadBuffer(int readSize, ByteBuffer readBuffer) {
int size = 0;
int read = 0;
readBuffer.position(0);
@@ -217,10 +208,9 @@
return true;
}
- //for random reading
@Override
- public ILogRecord read(long LSN) throws ACIDException {
- readLSN = LSN;
+ public ILogRecord read(long lsn) {
+ readLSN = lsn;
//wait for the log to be flushed if needed before trying to read it.
synchronized (flushLSN) {
while (readLSN >= flushLSN.get()) {
@@ -232,15 +222,10 @@
}
}
try {
- if (logFile == null) {
+ if (logFile == null || readLSN < fileBeginLSN || readLSN >= fileBeginLSN + logFile.size()) {
//get the log file which contains readLSN
getLogFile();
fillLogReadBuffer();
- } else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + logFile.size()) {
- //log is not in the current log file
- logFile.close();
- getLogFile();
- fillLogReadBuffer();
} else if (readLSN < bufferBeginLSN || readLSN >= bufferBeginLSN + readBuffer.limit()) {
//log is not in the current read buffer
fillLogReadBuffer();
@@ -265,7 +250,7 @@
case TRUNCATED: {
if (!fillLogReadBuffer()) {
throw new IllegalStateException(
- "Could not read LSN(" + LSN + ") from log file id " + logFile.getLogFileId());
+ "Could not read LSN(" + lsn + ") from log file id " + logFile.getLogFileId());
}
//now read the complete log record
continue;
@@ -285,8 +270,10 @@
return logRecord;
}
- private void getLogFile() throws ACIDException {
+ private void getLogFile() {
try {
+ // close existing file (if any) before opening another one
+ close();
logFile = logMgr.getLogFile(readLSN);
fileBeginLSN = logFile.getFileBeginLSN();
} catch (IOException e) {
@@ -295,10 +282,11 @@
}
@Override
- public void close() throws ACIDException {
+ public void close() {
try {
if (logFile != null) {
logFile.close();
+ logFile = null;
}
} catch (IOException e) {
throw new ACIDException(e);