[NO ISSUE][TX] Create New Log File Before Deleting Old Files
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Ensure next log file is created after a sharp check
point before deleting old files. This is to prevent
the case if a crash happens right after deleting the
old files but before creating the new one, then the
next time the system starts up, it will start with
log file id 0 which is wrong.
- Log the details of latest index checkpoint when the
low watermark of the new checkpoint is less than the
low watermakr of the latest checkpoint.
Change-Id: I4817f697b43daff55726909ab074ec30a1c224ce
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2959
Reviewed-by: Michael Blow <mblow@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 22dec60..6009f51 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -65,7 +65,6 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.stubbing.Answer;
public class CheckpointingTest {
@@ -134,7 +133,7 @@
ICheckpointManager checkpointManager = nc.getTransactionSubsystem().getCheckpointManager();
LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager();
// Number of log files after node startup should be one
- int numberOfLogFiles = logManager.getLogFileIds().size();
+ int numberOfLogFiles = logManager.getOrderedLogFileIds().size();
Assert.assertEquals(1, numberOfLogFiles);
// Low-water mark LSN
@@ -142,10 +141,10 @@
// Low-water mark log file id
long initialLowWaterMarkFileId = logManager.getLogFileId(lowWaterMarkLSN);
// Initial Low-water mark should be in the only available log file
- Assert.assertEquals(initialLowWaterMarkFileId, logManager.getLogFileIds().get(0).longValue());
+ Assert.assertEquals(initialLowWaterMarkFileId, logManager.getOrderedLogFileIds().get(0).longValue());
// Insert records until a new log file is created
- while (logManager.getLogFileIds().size() == 1) {
+ while (logManager.getOrderedLogFileIds().size() == 1) {
ITupleReference tuple = tupleGenerator.next();
DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
}
@@ -160,9 +159,9 @@
* the low-water mark is still in it (i.e. it is still required for
* recovery)
*/
- int numberOfLogFilesBeforeCheckpoint = logManager.getLogFileIds().size();
+ int numberOfLogFilesBeforeCheckpoint = logManager.getOrderedLogFileIds().size();
checkpointManager.tryCheckpoint(logManager.getAppendLSN());
- int numberOfLogFilesAfterCheckpoint = logManager.getLogFileIds().size();
+ int numberOfLogFilesAfterCheckpoint = logManager.getOrderedLogFileIds().size();
Assert.assertEquals(numberOfLogFilesBeforeCheckpoint, numberOfLogFilesAfterCheckpoint);
/*
@@ -203,7 +202,7 @@
checkpointManager.tryCheckpoint(lowWaterMarkLSN);
// Validate initialLowWaterMarkFileId was deleted
- for (Long fileId : logManager.getLogFileIds()) {
+ for (Long fileId : logManager.getOrderedLogFileIds()) {
Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index 4453a1d..964bf66 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -147,7 +147,7 @@
public void interruptedLogFileSwitch() throws Exception {
final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
final LogManager logManager = (LogManager) ncAppCtx.getTransactionSubsystem().getLogManager();
- int logFileCountBeforeInterrupt = logManager.getLogFileIds().size();
+ int logFileCountBeforeInterrupt = logManager.getOrderedLogFileIds().size();
// ensure an interrupted transactor will create next log file but will fail to position the log channel
final AtomicBoolean failed = new AtomicBoolean(false);
@@ -162,7 +162,7 @@
interruptedTransactor.start();
interruptedTransactor.join();
// ensure a new log file was created and survived interrupt
- int logFileCountAfterInterrupt = logManager.getLogFileIds().size();
+ int logFileCountAfterInterrupt = logManager.getOrderedLogFileIds().size();
Assert.assertEquals(logFileCountBeforeInterrupt + 1, logFileCountAfterInterrupt);
Assert.assertFalse(failed.get());
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index f84167e..9654473 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -24,6 +24,8 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -31,6 +33,7 @@
public class IndexCheckpoint {
+ private static final Logger LOGGER = LogManager.getLogger();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final long INITIAL_CHECKPOINT_ID = 0;
private long id;
@@ -52,6 +55,9 @@
public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, long validComponentSequence,
long lastComponentId) {
if (lowWatermark < latest.getLowWatermark()) {
+ if (LOGGER.isErrorEnabled()) {
+ LOGGER.error("low watermark {} less than the latest checkpoint low watermark {}", lowWatermark, latest);
+ }
throw new IllegalStateException("Low watermark should always be increasing");
}
IndexCheckpoint next = new IndexCheckpoint();
@@ -104,4 +110,13 @@
throw HyracksDataException.create(e);
}
}
+
+ @Override
+ public String toString() {
+ try {
+ return asJson();
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index c0d18df..0a6dda9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -29,7 +29,6 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -103,7 +102,8 @@
nodeId = txnSubsystem.getId();
flushLogsQ = new LinkedBlockingQueue<>();
txnSubsystem.getApplicationContext().getThreadExecutor().execute(new FlushLogsLogger());
- initializeLogManager(SMALLEST_LOG_FILE_ID);
+ final long onDiskMaxLogFileId = getOnDiskMaxLogFileId();
+ initializeLogManager(onDiskMaxLogFileId);
}
private void initializeLogManager(long nextLogFileId) {
@@ -365,56 +365,32 @@
}
}
- private long initializeLogAnchor(long nextLogFileId) {
- long fileId = 0;
- long offset = 0;
- File fileLogDir = new File(logDir);
- try {
- if (fileLogDir.exists()) {
- List<Long> logFileIds = getLogFileIds();
- if (logFileIds.isEmpty()) {
- fileId = nextLogFileId;
- createFileIfNotExists(getLogFilePath(fileId));
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("created a log file: " + getLogFilePath(fileId));
- }
- } else {
- fileId = logFileIds.get(logFileIds.size() - 1);
- File logFile = new File(getLogFilePath(fileId));
- offset = logFile.length();
- }
- } else {
- fileId = nextLogFileId;
- createNewDirectory(logDir);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("created the log directory: " + logManagerProperties.getLogDir());
- }
- createFileIfNotExists(getLogFilePath(fileId));
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("created a log file: " + getLogFilePath(fileId));
- }
- }
- } catch (IOException ioe) {
- throw new IllegalStateException("Failed to initialize the log anchor", ioe);
- }
+ private long initializeLogAnchor(long fileId) {
+ final String logFilePath = getLogFilePath(fileId);
+ createFileIfNotExists(logFilePath);
+ final File logFile = new File(logFilePath);
+ long offset = logFile.length();
if (LOGGER.isInfoEnabled()) {
- LOGGER.info("log file Id: " + fileId + ", offset: " + offset);
+ LOGGER.info("initializing log anchor with log file Id: {} at offset: {}", fileId, offset);
}
- return logFileSize * fileId + offset;
+ return getLogFileFirstLsn(fileId) + offset;
}
@Override
public void renewLogFiles() {
terminateLogFlusher();
closeCurrentLogFile();
- long lastMaxLogFileId = deleteAllLogFiles();
- initializeLogManager(lastMaxLogFileId + 1);
+ long nextLogFileId = getNextLogFileId();
+ createFileIfNotExists(getLogFilePath(nextLogFileId));
+ final long logFileFirstLsn = getLogFileFirstLsn(nextLogFileId);
+ deleteOldLogFiles(logFileFirstLsn);
+ initializeLogManager(nextLogFileId);
}
@Override
public void deleteOldLogFiles(long checkpointLSN) {
Long checkpointLSNLogFileID = getLogFileId(checkpointLSN);
- List<Long> logFileIds = getLogFileIds();
+ List<Long> logFileIds = getOrderedLogFileIds();
if (!logFileIds.isEmpty()) {
//sort log files from oldest to newest
Collections.sort(logFileIds);
@@ -461,24 +437,7 @@
}
}
- private long deleteAllLogFiles() {
- List<Long> logFileIds = getLogFileIds();
- if (!logFileIds.isEmpty()) {
- for (Long id : logFileIds) {
- File file = new File(getLogFilePath(id));
- LOGGER.info("Deleting log file: " + file.getAbsolutePath());
- if (!file.delete()) {
- throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
- }
- LOGGER.info("log file: " + file.getAbsolutePath() + " was deleted successfully");
- }
- return logFileIds.get(logFileIds.size() - 1);
- } else {
- throw new IllegalStateException("Couldn't find any log files.");
- }
- }
-
- public List<Long> getLogFileIds() {
+ public List<Long> getOrderedLogFileIds() {
File fileLogDir = new File(logDir);
String[] logFileNames = null;
List<Long> logFileIds = null;
@@ -510,12 +469,7 @@
for (String fileName : logFileNames) {
logFileIds.add(Long.parseLong(fileName.substring(logFilePrefix.length() + 1)));
}
- Collections.sort(logFileIds, new Comparator<Long>() {
- @Override
- public int compare(Long arg0, Long arg1) {
- return arg0.compareTo(arg1);
- }
- });
+ logFileIds.sort(Long::compareTo);
return logFileIds;
}
@@ -531,17 +485,21 @@
return lsn / logFileSize;
}
- private static boolean createFileIfNotExists(String path) throws IOException {
- File file = new File(path);
- File parentFile = file.getParentFile();
- if (parentFile != null) {
- parentFile.mkdirs();
+ private static void createFileIfNotExists(String path) {
+ try {
+ File file = new File(path);
+ if (file.exists()) {
+ return;
+ }
+ File parentFile = file.getParentFile();
+ if (parentFile != null) {
+ parentFile.mkdirs();
+ }
+ Files.createFile(file.toPath());
+ LOGGER.info("Created log file {}", path);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to create file in " + path, e);
}
- return file.createNewFile();
- }
-
- private static boolean createNewDirectory(String path) {
- return (new File(path)).mkdir();
}
private void createNextLogFile() throws IOException {
@@ -579,7 +537,7 @@
@Override
public long getReadableSmallestLSN() {
- List<Long> logFileIds = getLogFileIds();
+ List<Long> logFileIds = getOrderedLogFileIds();
if (!logFileIds.isEmpty()) {
return logFileIds.get(0) * logFileSize;
} else {
@@ -629,6 +587,22 @@
fileChannel.close();
}
+ private long getNextLogFileId() {
+ return getOnDiskMaxLogFileId() + 1;
+ }
+
+ private long getLogFileFirstLsn(long logFileId) {
+ return logFileId * logFileSize;
+ }
+
+ private long getOnDiskMaxLogFileId() {
+ final List<Long> logFileIds = getOrderedLogFileIds();
+ if (logFileIds.isEmpty()) {
+ return SMALLEST_LOG_FILE_ID;
+ }
+ return logFileIds.get(logFileIds.size() - 1);
+ }
+
/**
* This class is used to log FLUSH logs.
* FLUSH logs are flushed on a different thread to avoid a possible deadlock in {@link LogBuffer} batchUnlock