[ASTERIXDB-2230][TX] Survive Interrupt in Log File Switch
- user model changes: no
- storage format changes: no
- interface changes: yes
- ILogBuffer: (-) setLastPage
- ILogManager: (-) renewLogFilesAndStartFromLSN
Details:
- Survive interrupt in log file switch.
- Make LogManager responsible of closing log files.
- Remove unneeded methods in ILogManager and ILogBuffer.
- Adapt log file switch test case to new behavior.
Change-Id: I191564c510c0555f191a35e2603e051bbef24540
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2301
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index b14d70b..a1978eb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -19,8 +19,6 @@
package org.apache.asterix.test.txn;
import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -46,7 +44,6 @@
import org.apache.asterix.test.common.TestTupleReference;
import org.apache.asterix.transaction.management.service.logging.LogManager;
import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants;
-import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.junit.After;
@@ -59,6 +56,7 @@
protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
private static final String PREPARE_NEXT_LOG_FILE_METHOD = "prepareNextLogFile";
+ private static final String ENSURE_LAST_PAGE_FLUSHED_METHOD = "ensureLastPageFlushed";
@Before
public void setUp() throws Exception {
@@ -146,39 +144,20 @@
int logFileCountBeforeInterrupt = logManager.getLogFileIds().size();
// ensure an interrupted transactor will create next log file but will fail to position the log channel
- final AtomicBoolean interrupted = new AtomicBoolean(false);
+ final AtomicBoolean failed = new AtomicBoolean(false);
Thread interruptedTransactor = new Thread(() -> {
Thread.currentThread().interrupt();
try {
prepareNextLogFile(logManager);
} catch (Exception e) {
- Throwable rootCause = ExceptionUtils.getRootCause(e);
- if (rootCause.getCause() instanceof java.nio.channels.ClosedByInterruptException) {
- interrupted.set(true);
- }
- }
- });
- interruptedTransactor.start();
- interruptedTransactor.join();
- // ensure a new log file was created but the thread was interrupt
- int logFileCountAfterInterrupt = logManager.getLogFileIds().size();
- Assert.assertEquals(logFileCountBeforeInterrupt + 1, logFileCountAfterInterrupt);
- Assert.assertTrue(interrupted.get());
-
- // ensure next transactor will not create another file
- final AtomicBoolean failed = new AtomicBoolean(false);
- Thread transactor = new Thread(() -> {
- try {
- prepareNextLogFile(logManager);
- } catch (Exception e) {
failed.set(true);
}
});
- transactor.start();
- transactor.join();
- // make sure no new files were created and the operation was successful
- int countAfterTransactor = logManager.getLogFileIds().size();
- Assert.assertEquals(logFileCountAfterInterrupt, countAfterTransactor);
+ interruptedTransactor.start();
+ interruptedTransactor.join();
+ // ensure a new log file was created and survived interrupt
+ int logFileCountAfterInterrupt = logManager.getLogFileIds().size();
+ Assert.assertEquals(logFileCountBeforeInterrupt + 1, logFileCountAfterInterrupt);
Assert.assertFalse(failed.get());
// make sure we can still log to the new file
@@ -196,14 +175,20 @@
}
private static void prepareNextLogFile(LogManager logManager) throws Exception {
- Method method;
+ Method ensureLastPageFlushed;
+ Method prepareNextLogFile;
+ String targetMethod = null;
try {
- method = LogManager.class.getDeclaredMethod(PREPARE_NEXT_LOG_FILE_METHOD, null);
+ targetMethod = ENSURE_LAST_PAGE_FLUSHED_METHOD;
+ ensureLastPageFlushed = LogManager.class.getDeclaredMethod(targetMethod, null);
+ targetMethod = PREPARE_NEXT_LOG_FILE_METHOD;
+ prepareNextLogFile = LogManager.class.getDeclaredMethod(targetMethod, null);
} catch (Exception e) {
- throw new IllegalStateException(
- "Couldn't find " + PREPARE_NEXT_LOG_FILE_METHOD + " in LogManager. Was it renamed?");
+ throw new IllegalStateException("Couldn't find " + targetMethod + " in LogManager. Was it renamed?");
}
- method.setAccessible(true);
- method.invoke(logManager, null);
+ ensureLastPageFlushed.setAccessible(true);
+ ensureLastPageFlushed.invoke(logManager, null);
+ prepareNextLogFile.setAccessible(true);
+ prepareNextLogFile.invoke(logManager, null);
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
index 6bdce73..b4a7d38 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
@@ -62,11 +62,6 @@
void reset();
/**
- * Set current page to be the last page of the associated file
- */
- void setLastPage();
-
- /**
* stops the log buffer
*/
void stop();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
index aa018ba..d7e0885 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
@@ -63,14 +63,6 @@
public String getNodeId();
/**
- * Delete all log files and start new log partition > LSNtoStartFrom
- *
- * @param LSNtoStartFrom
- * @throws IOException
- */
- public void renewLogFilesAndStartFromLSN(long LSNtoStartFrom) throws IOException;
-
- /**
* @return the log page size in bytes
*/
public int getLogPageSize();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
index c718fca..9bdf55c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.utils;
import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@@ -30,6 +31,9 @@
private static final Logger LOGGER = LogManager.getLogger();
+ private InvokeUtil() {
+ }
+
/**
* Executes the passed interruptible, retrying if the operation is interrupted. Once the interruptible
* completes, the current thread will be re-interrupted, if the original operation was interrupted.
@@ -144,6 +148,31 @@
return false;
}
+ /**
+ * Executes the passed interruptible, retrying if the operation fails due to {@link ClosedByInterruptException} or
+ * {@link InterruptedException}. Once the interruptible completes, the current thread will be re-interrupted, if
+ * the original operation was interrupted.
+ */
+ public static void doIoUninterruptibly(ThrowingIOInterruptible interruptible) throws IOException {
+ boolean interrupted = false;
+ try {
+ while (true) {
+ try {
+ interruptible.run();
+ break;
+ } catch (ClosedByInterruptException | InterruptedException e) {
+ LOGGER.error("IO operation Interrupted. Retrying..", e);
+ interrupted = true;
+ Thread.interrupted();
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
@FunctionalInterface
public interface Interruptible {
void run() throws InterruptedException;
@@ -154,4 +183,8 @@
void run() throws Exception; // NOSONAR
}
+ @FunctionalInterface
+ public interface ThrowingIOInterruptible {
+ void run() throws IOException, InterruptedException;
+ }
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 011d2a1..614591b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.transaction.management.service.logging;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.LinkedBlockingQueue;
@@ -56,7 +55,6 @@
protected final ByteBuffer appendBuffer;
private final ByteBuffer flushBuffer;
private final ByteBuffer unlockBuffer;
- private boolean isLastPage;
protected final LinkedBlockingQueue<ILogRecord> syncCommitQ;
protected final LinkedBlockingQueue<ILogRecord> flushQ;
protected final LinkedBlockingQueue<ILogRecord> remoteJobsQ;
@@ -76,7 +74,6 @@
full = new AtomicBoolean(false);
appendOffset = 0;
flushOffset = 0;
- isLastPage = false;
syncCommitQ = new LinkedBlockingQueue<>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
flushQ = new LinkedBlockingQueue<>();
remoteJobsQ = new LinkedBlockingQueue<>();
@@ -132,11 +129,6 @@
}
@Override
- public void setLastPage() {
- this.isLastPage = true;
- }
-
- @Override
public boolean hasSpace(int logSize) {
return appendOffset + logSize <= logPageSize && !full.get();
}
@@ -152,7 +144,6 @@
full.set(false);
appendOffset = 0;
flushOffset = 0;
- isLastPage = false;
stop = false;
}
@@ -174,24 +165,18 @@
+ ", full: " + full.get());
}
if (stopping || stop) {
- fileChannel.close();
return;
}
wait();
}
endOffset = appendOffset;
}
- internalFlush(flushOffset, endOffset);
+ internalFlush(flushOffset, endOffset);
} catch (InterruptedException e) {
interrupted = true;
}
}
internalFlush(flushOffset, appendOffset);
- if (isLastPage) {
- fileChannel.close();
- }
- } catch (IOException e) {
- throw new IllegalStateException(e);
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
@@ -235,8 +220,8 @@
reusableTxnId.setId(logRecord.getTxnId());
reusableDatasetId.setId(logRecord.getDatasetId());
txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId);
- txnSubsystem.getLockManager().unlock(reusableDatasetId, logRecord.getPKHashValue(),
- LockMode.ANY, txnCtx);
+ txnSubsystem.getLockManager()
+ .unlock(reusableDatasetId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx);
txnCtx.notifyEntityCommitted();
if (txnSubsystem.getTransactionProperties().isCommitProfilerEnabled()) {
txnSubsystem.incrementEntityCommitCount();
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 96d0539..c226886 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -93,6 +93,7 @@
private LogFlusher logFlusher;
private Future<? extends Object> futureLogFlusher;
protected LinkedBlockingQueue<ILogRecord> flushLogsQ;
+ private long currentLogFileId;
public LogManager(ITransactionSubsystem txnSubsystem) {
this.txnSubsystem = txnSubsystem;
@@ -239,17 +240,18 @@
private void prepareNextLogFile() {
final long nextFileBeginLsn = getNextFileFirstLsn();
try {
+ closeCurrentLogFile();
createNextLogFile();
- setLogPosition(nextFileBeginLsn);
+ InvokeUtil.doIoUninterruptibly(() -> setLogPosition(nextFileBeginLsn));
+ // move appendLSN and flushLSN to the first LSN of the next log file
+ // only after the file was created and the channel was positioned successfully
+ appendLSN.set(nextFileBeginLsn);
+ flushLSN.set(nextFileBeginLsn);
+ LOGGER.info("Created new txn log file with id({}) starting with LSN = {}", currentLogFileId,
+ nextFileBeginLsn);
} catch (IOException e) {
throw new ACIDException(e);
}
- // move appendLSN and flushLSN to the first LSN of the next log file
- // only after the file was created and the channel was positioned successfully
- appendLSN.set(nextFileBeginLsn);
- flushLSN.set(nextFileBeginLsn);
- LOGGER.info("Created new txn log file with id({}) starting with LSN = {}", getLogFileId(nextFileBeginLsn),
- nextFileBeginLsn);
}
private long getNextFileFirstLsn() {
@@ -258,8 +260,6 @@
}
private void ensureLastPageFlushed() {
- // Mark the page as the last page so that it will close the output file channel.
- appendPage.setLastPage();
// Make sure to flush whatever left in the log tail.
appendPage.setFull();
synchronized (flushLSN) {
@@ -301,6 +301,7 @@
@Override
public void stop(boolean dumpState, OutputStream os) {
terminateLogFlusher();
+ closeCurrentLogFile();
if (dumpState) {
dumpState(os);
}
@@ -387,6 +388,7 @@
@Override
public void renewLogFiles() {
terminateLogFlusher();
+ closeCurrentLogFile();
long lastMaxLogFileId = deleteAllLogFiles();
initializeLogManager(lastMaxLogFileId + 1);
}
@@ -445,13 +447,6 @@
}
private long deleteAllLogFiles() {
- if (appendChannel != null) {
- try {
- appendChannel.close();
- } catch (IOException e) {
- throw new IllegalStateException("Failed to close a fileChannel of a log file");
- }
- }
txnLogFileId2ReaderCount.clear();
List<Long> logFileIds = getLogFileIds();
if (logFileIds != null) {
@@ -537,9 +532,22 @@
final long fileId = getLogFileId(lsn);
final Path targetFilePath = Paths.get(getLogFilePath(fileId));
final long targetPosition = getLogFileOffset(lsn);
- final RandomAccessFile raf = new RandomAccessFile(targetFilePath.toFile(), "rw"); // NOSONAR closed by LogBuffer
+ final RandomAccessFile raf = new RandomAccessFile(targetFilePath.toFile(), "rw"); // NOSONAR closed when full
appendChannel = raf.getChannel();
appendChannel.position(targetPosition);
+ currentLogFileId = fileId;
+ }
+
+ private void closeCurrentLogFile() {
+ if (appendChannel != null && appendChannel.isOpen()) {
+ try {
+ LOGGER.info("closing current log file with id({})", currentLogFileId);
+ appendChannel.close();
+ } catch (IOException e) {
+ LOGGER.error(() -> "failed to close log file with id(" + currentLogFileId + ")", e);
+ throw new ACIDException(e);
+ }
+ }
}
@Override
@@ -563,14 +571,6 @@
}
@Override
- public void renewLogFilesAndStartFromLSN(long LSNtoStartFrom) throws IOException {
- terminateLogFlusher();
- deleteAllLogFiles();
- long newLogFile = getLogFileId(LSNtoStartFrom);
- initializeLogManager(newLogFile + 1);
- }
-
- @Override
public void setReplicationManager(IReplicationManager replicationManager) {
throw new IllegalStateException("This log manager does not support replication");
}
@@ -687,7 +687,7 @@
}
@Override
- public Boolean call() throws InterruptedException {
+ public Boolean call() {
started.release();
boolean interrupted = false;
try {