[ASTERIXDB-3556][STO] Ensure local recovery is run on resume

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Set the system state to CORRUPTED on resume to ensure
  local recovery is run.
- Ensure cached files clean up is performed all the time
  when we have any cached files.

Ext-ref: MB-65062
Change-Id: I6600823447f478fc70a22cf27a9967235570562c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19373
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ritik Raj <raj.ritik9835@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 77a1e01..78f574b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -158,9 +158,13 @@
     public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException {
         state = SystemState.RECOVERING;
         LOGGER.info("starting recovery for partitions {}", partitions);
+        Checkpoint systemCheckpoint = checkpointManager.getLatest();
+        if (systemCheckpoint == null) {
+            LOGGER.warn("no system checkpoint found; skipping txn log recovery");
+            return;
+        }
         long readableSmallestLSN = logMgr.getReadableSmallestLSN();
-        Checkpoint checkpointObject = checkpointManager.getLatest();
-        long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
+        long lowWaterMarkLSN = systemCheckpoint.getMinMCTFirstLsn();
         if (lowWaterMarkLSN < readableSmallestLSN) {
             lowWaterMarkLSN = readableSmallestLSN;
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
index b47703a..cc59488 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
@@ -41,14 +41,12 @@
     private final Set<Integer> storagePartitions;
     private final boolean metadataNode;
     private final int metadataPartitionId;
-    private final boolean cleanup;
 
-    public CloudToLocalStorageCachingTask(Set<Integer> storagePartitions, boolean metadataNode, int metadataPartitionId,
-            boolean cleanup) {
+    public CloudToLocalStorageCachingTask(Set<Integer> storagePartitions, boolean metadataNode,
+            int metadataPartitionId) {
         this.storagePartitions = storagePartitions;
         this.metadataNode = metadataNode;
         this.metadataPartitionId = metadataPartitionId;
-        this.cleanup = cleanup;
     }
 
     @Override
@@ -68,7 +66,7 @@
                     applicationContext.getTransactionSubsystem().getCheckpointManager().getLatest();
             IPartitionBootstrapper bootstrapper = applicationContext.getPartitionBootstrapper();
             bootstrapper.bootstrap(storagePartitions, lrs.getOnDiskPartitions(), metadataNode, metadataPartitionId,
-                    cleanup, latestCheckpoint == null);
+                    latestCheckpoint == null);
 
             // Report all local resources
             diskService.reportLocalResources(lrs.loadAndGetAllResources());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
index d0a8dcc..b8addb2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
@@ -42,7 +42,9 @@
     public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
         INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
         try {
-            appContext.getTransactionSubsystem().getRecoveryManager().startLocalRecovery(partitions);
+            if (!partitions.isEmpty()) {
+                appContext.getTransactionSubsystem().getRecoveryManager().startLocalRecovery(partitions);
+            }
         } catch (IOException | ACIDException e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index dde2371..fff1bc1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -223,9 +223,9 @@
         tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING, nodeActivePartitions));
         int metadataPartitionId = clusterManager.getMetadataPartition().getPartitionId();
         // Add any cloud-related tasks
-        addCloudTasks(tasks, nodeActivePartitions, metadataNode, metadataPartitionId, state == SystemState.CORRUPTED);
+        addCloudTasks(tasks, nodeActivePartitions, metadataNode, metadataPartitionId);
         tasks.add(new LocalStorageCleanupTask(metadataPartitionId));
-        if (state == SystemState.CORRUPTED) {
+        if (state == SystemState.CORRUPTED && !nodeActivePartitions.isEmpty()) {
             // need to perform local recovery for node active partitions
             LocalRecoveryTask rt = new LocalRecoveryTask(nodeActivePartitions);
             tasks.add(rt);
@@ -257,7 +257,7 @@
     }
 
     protected void addCloudTasks(List<INCLifecycleTask> tasks, Set<Integer> computePartitions, boolean metadataNode,
-            int metadataPartitionId, boolean cleanup) {
+            int metadataPartitionId) {
         IApplicationContext appCtx = (IApplicationContext) serviceContext.getApplicationContext();
         if (!appCtx.isCloudDeployment()) {
             return;
@@ -266,7 +266,7 @@
         StorageComputePartitionsMap map = clusterManager.getStorageComputeMap();
         map = map == null ? StorageComputePartitionsMap.computePartitionsMap(clusterManager) : map;
         Set<Integer> storagePartitions = map.getStoragePartitions(computePartitions);
-        tasks.add(new CloudToLocalStorageCachingTask(storagePartitions, metadataNode, metadataPartitionId, cleanup));
+        tasks.add(new CloudToLocalStorageCachingTask(storagePartitions, metadataNode, metadataPartitionId));
     }
 
     private synchronized void process(MetadataNodeResponseMessage response) throws HyracksDataException {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 4ce382e..912ca47 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -45,7 +45,7 @@
 import org.apache.asterix.common.cloud.IPartitionBootstrapper;
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.asterix.common.metadata.MetadataConstants;
-import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -106,22 +106,23 @@
      */
 
     @Override
-    public IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint() {
+    public SystemState getSystemStateOnMissingCheckpoint() {
         Set<CloudFile> existingMetadataFiles = getCloudMetadataPartitionFiles();
         CloudFile bootstrapMarkerPath = CloudFile.of(StoragePathUtil.getBootstrapMarkerRelativePath(nsPathResolver));
         if (existingMetadataFiles.isEmpty() || existingMetadataFiles.contains(bootstrapMarkerPath)) {
             LOGGER.info("First time to initialize this cluster: systemState = PERMANENT_DATA_LOSS");
-            return IRecoveryManager.SystemState.PERMANENT_DATA_LOSS;
+            return SystemState.PERMANENT_DATA_LOSS;
         } else {
-            LOGGER.info("Resuming a previous initialized cluster: systemState = HEALTHY");
-            return IRecoveryManager.SystemState.HEALTHY;
+            LOGGER.info(
+                    "Resuming a previously initialized cluster; setting system state to {} to force local recovery if needed",
+                    SystemState.CORRUPTED);
+            return SystemState.CORRUPTED;
         }
     }
 
     @Override
     public final void bootstrap(Set<Integer> activePartitions, List<FileReference> currentOnDiskPartitions,
-            boolean metadataNode, int metadataPartition, boolean cleanup, boolean ensureCompleteBootstrap)
-            throws HyracksDataException {
+            boolean metadataNode, int metadataPartition, boolean ensureCompleteBootstrap) throws HyracksDataException {
         partitions.clear();
         partitions.addAll(activePartitions);
         if (metadataNode) {
@@ -138,8 +139,7 @@
         }
 
         LOGGER.info("Initializing cloud manager with ({}) storage partitions: {}", partitions.size(), partitions);
-
-        if (cleanup) {
+        if (!currentOnDiskPartitions.isEmpty()) {
             deleteUnkeptPartitionDirs(currentOnDiskPartitions);
             cleanupLocalFiles();
         }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java
index 54090bb..f2451ee 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java
@@ -49,8 +49,7 @@
 
     @Override
     public void bootstrap(Set<Integer> activePartitions, List<FileReference> currentOnDiskPartitions,
-            boolean metadataNode, int metadataPartition, boolean cleanup, boolean ensureCompleteBootstrap)
-            throws HyracksDataException {
+            boolean metadataNode, int metadataPartition, boolean ensureCompleteBootstrap) throws HyracksDataException {
         for (FileReference onDiskPartition : currentOnDiskPartitions) {
             int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
             if (!activePartitions.contains(partitionNum)) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
index 6bb4176..73a1392 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
@@ -50,9 +50,8 @@
      * @param currentOnDiskPartitions paths to the current local partitions
      * @param metadataNode            whether the node is a metadata node as well
      * @param metadataPartition       metadata partition number
-     * @param cleanup                 performs cleanup by deleting all unkept partitions
      * @param ensureCompleteBootstrap ensures the metadata catalog was fully bootstrapped
      */
     void bootstrap(Set<Integer> activePartitions, List<FileReference> currentOnDiskPartitions, boolean metadataNode,
-            int metadataPartition, boolean cleanup, boolean ensureCompleteBootstrap) throws HyracksDataException;
+            int metadataPartition, boolean ensureCompleteBootstrap) throws HyracksDataException;
 }