[NO ISSUE][STO] Add API to ensure minimum LSN
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add API in the log manager that can be used to advance the
append LSN to a specific LSN.
- Add API to get the maximum LSN used in specific storage partitions.
Change-Id: Ic5f91f08a75f3737e2b33373cfd9500d78ba83d2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17934
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index f6eb123..a7d30a3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -520,6 +520,31 @@
}
@Override
+ public long getPartitionsMaxLSN(Set<Integer> partitions) throws HyracksDataException {
+ final IIndexCheckpointManagerProvider idxCheckpointMgrProvider = appCtx.getIndexCheckpointManagerProvider();
+ long maxLSN = 0;
+ for (Integer partition : partitions) {
+ final List<DatasetResourceReference> partitionResources = localResourceRepository.getResources(resource -> {
+ DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
+ return dsResource.getPartition() == partition;
+ }, Collections.singleton(partition)).values().stream().map(DatasetResourceReference::of)
+ .collect(Collectors.toList());
+ for (DatasetResourceReference indexRef : partitionResources) {
+ try {
+ final IIndexCheckpointManager idxCheckpointMgr = idxCheckpointMgrProvider.get(indexRef);
+ if (idxCheckpointMgr.isValidIndex()) {
+ long indexMaxLsn = idxCheckpointMgrProvider.get(indexRef).getLowWatermark();
+ maxLSN = Math.max(maxLSN, indexMaxLsn);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to get max LSN of resource {}", indexRef, e);
+ }
+ }
+ }
+ return maxLSN;
+ }
+
+ @Override
public synchronized void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush)
throws HyracksDataException {
//replay logs > minLSN that belong to these partitions
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
index d7e0885..9d89c07 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
@@ -99,7 +99,13 @@
public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException;
/**
- * Deletes all current log files and start the next log file partition
+ * Deletes all current log files and start the next log file partition after {@code minLSN}
*/
- void renewLogFiles();
+ void renewLogFiles(long minLSN);
+
+ /**
+ * Ensures the next lsn of this log manager is greater than {@code lsn}
+ * @param lsn
+ */
+ void ensureMinLSN(long lsn);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 8a5f34e..aef2215 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -110,6 +110,15 @@
void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException;
/**
+ * Gets the LSN used in {@code partitions} or 0 if no LSNs are found
+ *
+ * @param partitions
+ * @return the maximum used LSN
+ * @throws HyracksDataException
+ */
+ long getPartitionsMaxLSN(Set<Integer> partitions) throws HyracksDataException;
+
+ /**
* Replay the commited transactions' logs belonging to {@code partitions}. if {@code flush} is true,
* all datasets are flushed after the logs are replayed.
*
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index e66185c..a502b30 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -377,10 +377,11 @@
}
@Override
- public void renewLogFiles() {
+ public void renewLogFiles(long minLSN) {
terminateLogFlusher();
closeCurrentLogFile();
- long nextLogFileId = getNextLogFileId();
+ long nextLogFileId = getLogFileId(minLSN) + 1;
+ LOGGER.info("renewing txn log files; next file id {}", nextLogFileId);
createFileIfNotExists(getLogFilePath(nextLogFileId));
final long logFileFirstLsn = getLogFileFirstLsn(nextLogFileId);
deleteOldLogFiles(logFileFirstLsn);
@@ -388,6 +389,15 @@
}
@Override
+ public void ensureMinLSN(long lsn) {
+ if (appendLSN.get() > lsn) {
+ LOGGER.info("current append lsn {} > target min LSN {}; not renewing log files", appendLSN.get(), lsn);
+ return;
+ }
+ renewLogFiles(lsn);
+ }
+
+ @Override
public void deleteOldLogFiles(long checkpointLSN) {
Long checkpointLSNLogFileID = getLogFileId(checkpointLSN);
List<Long> logFileIds = getOrderedLogFileIds();
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index 460f393..6761ea9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -65,7 +65,8 @@
txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
datasetLifecycleManager.flushAllDatasets();
capture(SHARP_CHECKPOINT_LSN, true);
- txnSubsystem.getLogManager().renewLogFiles();
+ long currentAppendLNS = txnSubsystem.getLogManager().getAppendLSN();
+ txnSubsystem.getLogManager().renewLogFiles(currentAppendLNS);
LOGGER.info("Completed sharp checkpoint.");
}