[NO ISSUE][STO] Clean up invalid resources on bootstrap

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Add new node startup tasks to delete invalid resources.
- Ensure index max disk LSN is initialized during local
  recovery even if the index is already registered.

Change-Id: Ia13182e96ec65951e837f71ffc4db3e92a43a7b0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12766
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 0359cf1..ffde7d0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -160,7 +160,7 @@
     @Override
     public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException {
         state = SystemState.RECOVERING;
-        LOGGER.info("starting recovery ...");
+        LOGGER.info("starting recovery for partitions {}", partitions);
 
         long readableSmallestLSN = logMgr.getReadableSmallestLSN();
         Checkpoint checkpointObject = checkpointManager.getLatest();
@@ -363,10 +363,8 @@
                                 datasetLifecycleManager.register(localResource.getPath(), index);
                                 datasetLifecycleManager.open(localResource.getPath());
                                 try {
-                                    final DatasetResourceReference resourceReference =
-                                            DatasetResourceReference.of(localResource);
-                                    maxDiskLastLsn =
-                                            indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
+                                    maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
+                                            indexCheckpointManagerProvider);
                                 } catch (HyracksDataException e) {
                                     datasetLifecycleManager.close(localResource.getPath());
                                     throw e;
@@ -374,7 +372,12 @@
                                 //#. set resourceId and maxDiskLastLSN to the map
                                 resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
                             } else {
-                                maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+                                if (!resourceId2MaxLSNMap.containsKey(resourceId)) {
+                                    maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
+                                            indexCheckpointManagerProvider);
+                                } else {
+                                    maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+                                }
                             }
                             // lsn @ maxDiskLastLsn is either a flush log or a master replica log
                             if (lsn >= maxDiskLastLsn) {
@@ -858,6 +861,19 @@
         index.resetCurrentComponentIndex();
     }
 
+    private long getResourceLowWaterMark(LocalResource localResource, IDatasetLifecycleManager datasetLifecycleManager,
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) throws HyracksDataException {
+        long maxDiskLastLsn;
+        try {
+            final DatasetResourceReference resourceReference = DatasetResourceReference.of(localResource);
+            maxDiskLastLsn = indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
+        } catch (HyracksDataException e) {
+            datasetLifecycleManager.close(localResource.getPath());
+            throw e;
+        }
+        return maxDiskLastLsn;
+    }
+
     private class JobEntityCommits {
         private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
         private final long txnId;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index dd1b6e7..655f9da 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -123,8 +123,6 @@
         if (!partitions.contains(partition)) {
             return;
         }
-        final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
-        datasetLifecycleManager.flushDataset(appCtx.getReplicationManager().getReplicationStrategy());
         closePartitionResources(partition);
         final List<IPartitionReplica> partitionReplicas = getReplicas(partition);
         for (IPartitionReplica replica : partitionReplicas) {
@@ -139,10 +137,12 @@
     }
 
     public void closePartitionResources(int partition) throws HyracksDataException {
+        final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
+        //TODO(mhubail) we can flush only datasets of the requested partition
+        datasetLifecycleManager.flushAllDatasets();
         final PersistentLocalResourceRepository resourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
         final Map<Long, LocalResource> partitionResources = resourceRepository.getPartitionResources(partition);
-        final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
         for (LocalResource resource : partitionResources.values()) {
             datasetLifecycleManager.closeIfOpen(resource.getPath());
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
new file mode 100644
index 0000000..5471fb8
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the LocalStorageCleanupTask at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class LocalStorageCleanupTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+    private final int metadataPartitionId;
+
+    public LocalStorageCleanupTask(int metadataPartitionId) {
+        this.metadataPartitionId = metadataPartitionId;
+    }
+
+    @Override
+    public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
+        INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
+        PersistentLocalResourceRepository localResourceRepository =
+                (PersistentLocalResourceRepository) appContext.getLocalResourceRepository();
+        localResourceRepository.deleteInvalidIndexes(r -> {
+            DatasetLocalResource lr = (DatasetLocalResource) r.getResource();
+            return MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId())
+                    && lr.getPartition() != metadataPartitionId;
+        });
+    }
+
+    @Override
+    public String toString() {
+        return "LocalStorageCleanupTask{" + "metadataPartitionId=" + metadataPartitionId + '}';
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index f164773..97e1a56 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -37,6 +37,7 @@
 import org.apache.asterix.app.nc.task.CheckpointTask;
 import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
 import org.apache.asterix.app.nc.task.LocalRecoveryTask;
+import org.apache.asterix.app.nc.task.LocalStorageCleanupTask;
 import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
 import org.apache.asterix.app.nc.task.RetrieveLibrariesTask;
 import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
@@ -213,6 +214,8 @@
             Set<Integer> activePartitions) {
         final List<INCLifecycleTask> tasks = new ArrayList<>();
         tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
+        int metadataPartitionId = clusterManager.getMetadataPartition().getPartitionId();
+        tasks.add(new LocalStorageCleanupTask(metadataPartitionId));
         if (state == SystemState.CORRUPTED) {
             // need to perform local recovery for node active partitions
             LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
@@ -222,7 +225,7 @@
             tasks.add(new StartReplicationServiceTask());
         }
         if (metadataNode) {
-            tasks.add(new MetadataBootstrapTask(clusterManager.getMetadataPartition().getPartitionId()));
+            tasks.add(new MetadataBootstrapTask(metadataPartitionId));
         }
         tasks.add(new CheckpointTask());
         tasks.add(new StartLifecycleComponentsTask());
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 556ab6a..f753868 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -273,6 +273,24 @@
         return resourcesMap;
     }
 
+    public synchronized void deleteInvalidIndexes(Predicate<LocalResource> filter) throws HyracksDataException {
+        for (Path root : storageRoots) {
+            final Collection<File> files = FileUtils.listFiles(root.toFile(), METADATA_FILES_FILTER, ALL_DIR_FILTER);
+            try {
+                for (File file : files) {
+                    final LocalResource localResource = readLocalResource(file);
+                    if (filter.test(localResource)) {
+                        LOGGER.warn("deleting invalid metadata index {}", file.getParentFile());
+                        IoUtil.delete(file.getParentFile());
+                    }
+                }
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+        resourceCache.invalidateAll();
+    }
+
     public Map<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
         return getResources(p -> true);
     }