[NO ISSUE][TX] Create New Log File Before Deleting Old Files

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Ensure next log file is created after a sharp check
  point before deleting old files. This is to prevent
  the case if a crash happens right after deleting the
  old files but before creating the new one, then the
  next time the system starts up, it will start with
  log file id 0 which is wrong.
- Log the details of latest index checkpoint when the
  low watermark of the new checkpoint is less than the
  low watermakr of the latest checkpoint.

Change-Id: I4817f697b43daff55726909ab074ec30a1c224ce
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2959
Reviewed-by: Michael Blow <mblow@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 22dec60..6009f51 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -65,7 +65,6 @@
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.stubbing.Answer;
 
 public class CheckpointingTest {
 
@@ -134,7 +133,7 @@
                 ICheckpointManager checkpointManager = nc.getTransactionSubsystem().getCheckpointManager();
                 LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager();
                 // Number of log files after node startup should be one
-                int numberOfLogFiles = logManager.getLogFileIds().size();
+                int numberOfLogFiles = logManager.getOrderedLogFileIds().size();
                 Assert.assertEquals(1, numberOfLogFiles);
 
                 // Low-water mark LSN
@@ -142,10 +141,10 @@
                 // Low-water mark log file id
                 long initialLowWaterMarkFileId = logManager.getLogFileId(lowWaterMarkLSN);
                 // Initial Low-water mark should be in the only available log file
-                Assert.assertEquals(initialLowWaterMarkFileId, logManager.getLogFileIds().get(0).longValue());
+                Assert.assertEquals(initialLowWaterMarkFileId, logManager.getOrderedLogFileIds().get(0).longValue());
 
                 // Insert records until a new log file is created
-                while (logManager.getLogFileIds().size() == 1) {
+                while (logManager.getOrderedLogFileIds().size() == 1) {
                     ITupleReference tuple = tupleGenerator.next();
                     DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
                 }
@@ -160,9 +159,9 @@
                      * the low-water mark is still in it (i.e. it is still required for
                      * recovery)
                      */
-                    int numberOfLogFilesBeforeCheckpoint = logManager.getLogFileIds().size();
+                    int numberOfLogFilesBeforeCheckpoint = logManager.getOrderedLogFileIds().size();
                     checkpointManager.tryCheckpoint(logManager.getAppendLSN());
-                    int numberOfLogFilesAfterCheckpoint = logManager.getLogFileIds().size();
+                    int numberOfLogFilesAfterCheckpoint = logManager.getOrderedLogFileIds().size();
                     Assert.assertEquals(numberOfLogFilesBeforeCheckpoint, numberOfLogFilesAfterCheckpoint);
 
                     /*
@@ -203,7 +202,7 @@
 
                     checkpointManager.tryCheckpoint(lowWaterMarkLSN);
                     // Validate initialLowWaterMarkFileId was deleted
-                    for (Long fileId : logManager.getLogFileIds()) {
+                    for (Long fileId : logManager.getOrderedLogFileIds()) {
                         Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
                     }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index 4453a1d..964bf66 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -147,7 +147,7 @@
     public void interruptedLogFileSwitch() throws Exception {
         final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
         final LogManager logManager = (LogManager) ncAppCtx.getTransactionSubsystem().getLogManager();
-        int logFileCountBeforeInterrupt = logManager.getLogFileIds().size();
+        int logFileCountBeforeInterrupt = logManager.getOrderedLogFileIds().size();
 
         // ensure an interrupted transactor will create next log file but will fail to position the log channel
         final AtomicBoolean failed = new AtomicBoolean(false);
@@ -162,7 +162,7 @@
         interruptedTransactor.start();
         interruptedTransactor.join();
         // ensure a new log file was created and survived interrupt
-        int logFileCountAfterInterrupt = logManager.getLogFileIds().size();
+        int logFileCountAfterInterrupt = logManager.getOrderedLogFileIds().size();
         Assert.assertEquals(logFileCountBeforeInterrupt + 1, logFileCountAfterInterrupt);
         Assert.assertFalse(failed.get());
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index f84167e..9654473 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -24,6 +24,8 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -31,6 +33,7 @@
 
 public class IndexCheckpoint {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final long INITIAL_CHECKPOINT_ID = 0;
     private long id;
@@ -52,6 +55,9 @@
     public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, long validComponentSequence,
             long lastComponentId) {
         if (lowWatermark < latest.getLowWatermark()) {
+            if (LOGGER.isErrorEnabled()) {
+                LOGGER.error("low watermark {} less than the latest checkpoint low watermark {}", lowWatermark, latest);
+            }
             throw new IllegalStateException("Low watermark should always be increasing");
         }
         IndexCheckpoint next = new IndexCheckpoint();
@@ -104,4 +110,13 @@
             throw HyracksDataException.create(e);
         }
     }
+
+    @Override
+    public String toString() {
+        try {
+            return asJson();
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index c0d18df..0a6dda9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -29,7 +29,6 @@
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -103,7 +102,8 @@
         nodeId = txnSubsystem.getId();
         flushLogsQ = new LinkedBlockingQueue<>();
         txnSubsystem.getApplicationContext().getThreadExecutor().execute(new FlushLogsLogger());
-        initializeLogManager(SMALLEST_LOG_FILE_ID);
+        final long onDiskMaxLogFileId = getOnDiskMaxLogFileId();
+        initializeLogManager(onDiskMaxLogFileId);
     }
 
     private void initializeLogManager(long nextLogFileId) {
@@ -365,56 +365,32 @@
         }
     }
 
-    private long initializeLogAnchor(long nextLogFileId) {
-        long fileId = 0;
-        long offset = 0;
-        File fileLogDir = new File(logDir);
-        try {
-            if (fileLogDir.exists()) {
-                List<Long> logFileIds = getLogFileIds();
-                if (logFileIds.isEmpty()) {
-                    fileId = nextLogFileId;
-                    createFileIfNotExists(getLogFilePath(fileId));
-                    if (LOGGER.isInfoEnabled()) {
-                        LOGGER.info("created a log file: " + getLogFilePath(fileId));
-                    }
-                } else {
-                    fileId = logFileIds.get(logFileIds.size() - 1);
-                    File logFile = new File(getLogFilePath(fileId));
-                    offset = logFile.length();
-                }
-            } else {
-                fileId = nextLogFileId;
-                createNewDirectory(logDir);
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("created the log directory: " + logManagerProperties.getLogDir());
-                }
-                createFileIfNotExists(getLogFilePath(fileId));
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("created a log file: " + getLogFilePath(fileId));
-                }
-            }
-        } catch (IOException ioe) {
-            throw new IllegalStateException("Failed to initialize the log anchor", ioe);
-        }
+    private long initializeLogAnchor(long fileId) {
+        final String logFilePath = getLogFilePath(fileId);
+        createFileIfNotExists(logFilePath);
+        final File logFile = new File(logFilePath);
+        long offset = logFile.length();
         if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("log file Id: " + fileId + ", offset: " + offset);
+            LOGGER.info("initializing log anchor with log file Id: {} at offset: {}", fileId, offset);
         }
-        return logFileSize * fileId + offset;
+        return getLogFileFirstLsn(fileId) + offset;
     }
 
     @Override
     public void renewLogFiles() {
         terminateLogFlusher();
         closeCurrentLogFile();
-        long lastMaxLogFileId = deleteAllLogFiles();
-        initializeLogManager(lastMaxLogFileId + 1);
+        long nextLogFileId = getNextLogFileId();
+        createFileIfNotExists(getLogFilePath(nextLogFileId));
+        final long logFileFirstLsn = getLogFileFirstLsn(nextLogFileId);
+        deleteOldLogFiles(logFileFirstLsn);
+        initializeLogManager(nextLogFileId);
     }
 
     @Override
     public void deleteOldLogFiles(long checkpointLSN) {
         Long checkpointLSNLogFileID = getLogFileId(checkpointLSN);
-        List<Long> logFileIds = getLogFileIds();
+        List<Long> logFileIds = getOrderedLogFileIds();
         if (!logFileIds.isEmpty()) {
             //sort log files from oldest to newest
             Collections.sort(logFileIds);
@@ -461,24 +437,7 @@
         }
     }
 
-    private long deleteAllLogFiles() {
-        List<Long> logFileIds = getLogFileIds();
-        if (!logFileIds.isEmpty()) {
-            for (Long id : logFileIds) {
-                File file = new File(getLogFilePath(id));
-                LOGGER.info("Deleting log file: " + file.getAbsolutePath());
-                if (!file.delete()) {
-                    throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
-                }
-                LOGGER.info("log file: " + file.getAbsolutePath() + " was deleted successfully");
-            }
-            return logFileIds.get(logFileIds.size() - 1);
-        } else {
-            throw new IllegalStateException("Couldn't find any log files.");
-        }
-    }
-
-    public List<Long> getLogFileIds() {
+    public List<Long> getOrderedLogFileIds() {
         File fileLogDir = new File(logDir);
         String[] logFileNames = null;
         List<Long> logFileIds = null;
@@ -510,12 +469,7 @@
         for (String fileName : logFileNames) {
             logFileIds.add(Long.parseLong(fileName.substring(logFilePrefix.length() + 1)));
         }
-        Collections.sort(logFileIds, new Comparator<Long>() {
-            @Override
-            public int compare(Long arg0, Long arg1) {
-                return arg0.compareTo(arg1);
-            }
-        });
+        logFileIds.sort(Long::compareTo);
         return logFileIds;
     }
 
@@ -531,17 +485,21 @@
         return lsn / logFileSize;
     }
 
-    private static boolean createFileIfNotExists(String path) throws IOException {
-        File file = new File(path);
-        File parentFile = file.getParentFile();
-        if (parentFile != null) {
-            parentFile.mkdirs();
+    private static void createFileIfNotExists(String path) {
+        try {
+            File file = new File(path);
+            if (file.exists()) {
+                return;
+            }
+            File parentFile = file.getParentFile();
+            if (parentFile != null) {
+                parentFile.mkdirs();
+            }
+            Files.createFile(file.toPath());
+            LOGGER.info("Created log file {}", path);
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to create file in " + path, e);
         }
-        return file.createNewFile();
-    }
-
-    private static boolean createNewDirectory(String path) {
-        return (new File(path)).mkdir();
     }
 
     private void createNextLogFile() throws IOException {
@@ -579,7 +537,7 @@
 
     @Override
     public long getReadableSmallestLSN() {
-        List<Long> logFileIds = getLogFileIds();
+        List<Long> logFileIds = getOrderedLogFileIds();
         if (!logFileIds.isEmpty()) {
             return logFileIds.get(0) * logFileSize;
         } else {
@@ -629,6 +587,22 @@
         fileChannel.close();
     }
 
+    private long getNextLogFileId() {
+        return getOnDiskMaxLogFileId() + 1;
+    }
+
+    private long getLogFileFirstLsn(long logFileId) {
+        return logFileId * logFileSize;
+    }
+
+    private long getOnDiskMaxLogFileId() {
+        final List<Long> logFileIds = getOrderedLogFileIds();
+        if (logFileIds.isEmpty()) {
+            return SMALLEST_LOG_FILE_ID;
+        }
+        return logFileIds.get(logFileIds.size() - 1);
+    }
+
     /**
      * This class is used to log FLUSH logs.
      * FLUSH logs are flushed on a different thread to avoid a possible deadlock in {@link LogBuffer} batchUnlock