[NO ISSUE][STO] Add API to ensure minimum LSN

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Add API in the log manager that can be used to advance the
  append LSN to a specific LSN.
- Add API to get the maximum LSN used in specific storage partitions.

Change-Id: Ic5f91f08a75f3737e2b33373cfd9500d78ba83d2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17934
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index f6eb123..a7d30a3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -520,6 +520,31 @@
     }
 
     @Override
+    public long getPartitionsMaxLSN(Set<Integer> partitions) throws HyracksDataException {
+        final IIndexCheckpointManagerProvider idxCheckpointMgrProvider = appCtx.getIndexCheckpointManagerProvider();
+        long maxLSN = 0;
+        for (Integer partition : partitions) {
+            final List<DatasetResourceReference> partitionResources = localResourceRepository.getResources(resource -> {
+                DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
+                return dsResource.getPartition() == partition;
+            }, Collections.singleton(partition)).values().stream().map(DatasetResourceReference::of)
+                    .collect(Collectors.toList());
+            for (DatasetResourceReference indexRef : partitionResources) {
+                try {
+                    final IIndexCheckpointManager idxCheckpointMgr = idxCheckpointMgrProvider.get(indexRef);
+                    if (idxCheckpointMgr.isValidIndex()) {
+                        long indexMaxLsn = idxCheckpointMgrProvider.get(indexRef).getLowWatermark();
+                        maxLSN = Math.max(maxLSN, indexMaxLsn);
+                    }
+                } catch (Exception e) {
+                    LOGGER.warn("Failed to get max LSN of resource {}", indexRef, e);
+                }
+            }
+        }
+        return maxLSN;
+    }
+
+    @Override
     public synchronized void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush)
             throws HyracksDataException {
         //replay logs > minLSN that belong to these partitions
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
index d7e0885..9d89c07 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
@@ -99,7 +99,13 @@
     public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException;
 
     /**
-     * Deletes all current log files and start the next log file partition
+     * Deletes all current log files and start the next log file partition after {@code minLSN}
      */
-    void renewLogFiles();
+    void renewLogFiles(long minLSN);
+
+    /**
+     * Ensures the next lsn of this log manager is greater than {@code lsn}
+     * @param lsn
+     */
+    void ensureMinLSN(long lsn);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 8a5f34e..aef2215 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -110,6 +110,15 @@
     void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException;
 
     /**
+     * Gets the LSN used in {@code partitions} or 0 if no LSNs are found
+     *
+     * @param partitions
+     * @return the maximum used LSN
+     * @throws HyracksDataException
+     */
+    long getPartitionsMaxLSN(Set<Integer> partitions) throws HyracksDataException;
+
+    /**
      * Replay the commited transactions' logs belonging to {@code partitions}. if {@code flush} is true,
      * all datasets are flushed after the logs are replayed.
      *
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index e66185c..a502b30 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -377,10 +377,11 @@
     }
 
     @Override
-    public void renewLogFiles() {
+    public void renewLogFiles(long minLSN) {
         terminateLogFlusher();
         closeCurrentLogFile();
-        long nextLogFileId = getNextLogFileId();
+        long nextLogFileId = getLogFileId(minLSN) + 1;
+        LOGGER.info("renewing txn log files; next file id {}", nextLogFileId);
         createFileIfNotExists(getLogFilePath(nextLogFileId));
         final long logFileFirstLsn = getLogFileFirstLsn(nextLogFileId);
         deleteOldLogFiles(logFileFirstLsn);
@@ -388,6 +389,15 @@
     }
 
     @Override
+    public void ensureMinLSN(long lsn) {
+        if (appendLSN.get() > lsn) {
+            LOGGER.info("current append lsn {} > target min LSN {}; not renewing log files", appendLSN.get(), lsn);
+            return;
+        }
+        renewLogFiles(lsn);
+    }
+
+    @Override
     public void deleteOldLogFiles(long checkpointLSN) {
         Long checkpointLSNLogFileID = getLogFileId(checkpointLSN);
         List<Long> logFileIds = getOrderedLogFileIds();
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index 460f393..6761ea9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -65,7 +65,8 @@
                 txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
         datasetLifecycleManager.flushAllDatasets();
         capture(SHARP_CHECKPOINT_LSN, true);
-        txnSubsystem.getLogManager().renewLogFiles();
+        long currentAppendLNS = txnSubsystem.getLogManager().getAppendLSN();
+        txnSubsystem.getLogManager().renewLogFiles(currentAppendLNS);
         LOGGER.info("Completed sharp checkpoint.");
     }