[ASTERIXDB-3563][STO] Delay activation of dataset until accessed

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
activate the dataset when its being accessed in cloud mode.

Ext-ref: MB-63037
Change-Id: If64a7fbd9701772ffa42761d1557223a3eea95be
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19443
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ritik Raj <ritik.raj@couchbase.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Ritik Raj <ritik.raj@couchbase.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index de4a066..007826b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -286,8 +286,9 @@
         // Must start vbc now instead of by life cycle component manager (lccm) because lccm happens after
         // the metadata bootstrap task
         ((ILifeCycleComponent) virtualBufferCache).start();
-        datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
-                txnSubsystem.getLogManager(), virtualBufferCache, indexCheckpointManagerProvider, lockNotifier);
+        datasetLifecycleManager =
+                new DatasetLifecycleManager(ncServiceContext, storageProperties, localResourceRepository, recoveryMgr,
+                        txnSubsystem.getLogManager(), virtualBufferCache, indexCheckpointManagerProvider, lockNotifier);
         localResourceRepository.setDatasetLifecycleManager(datasetLifecycleManager);
         final String nodeId = getServiceContext().getNodeId();
         final Set<Integer> nodePartitions = metadataProperties.getNodePartitions(nodeId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 179b996..269f6f7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -901,6 +901,16 @@
         return maxDiskLastLsn;
     }
 
+    @Override
+    public boolean isLazyRecoveryEnabled() {
+        return false;
+    }
+
+    @Override
+    public void recoverIndexes(List<ILSMIndex> datasetPartitionIndexes) throws HyracksDataException {
+        // do-nothing
+    }
+
     private class JobEntityCommits {
         private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
         private final long txnId;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 43b5d1b..85916b0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -36,6 +36,7 @@
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.storage.IIndexCheckpointManager;
@@ -43,12 +44,16 @@
 import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.storage.StorageIOStats;
 import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResource;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -67,22 +72,26 @@
 public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>();
+    protected final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>();
     private final StorageProperties storageProperties;
-    private final ILocalResourceRepository resourceRepository;
+    protected final ILocalResourceRepository resourceRepository;
     private final IVirtualBufferCache vbc;
+    protected final INCServiceContext serviceCtx;
+    protected final IRecoveryManager recoveryMgr;
     private final ILogManager logManager;
     private final LogRecord waitLog;
-    private final IDiskResourceCacheLockNotifier lockNotifier;
+    protected final IDiskResourceCacheLockNotifier lockNotifier;
     private volatile boolean stopped = false;
     private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
     // all LSM-trees share the same virtual buffer cache list
     private final List<IVirtualBufferCache> vbcs;
 
-    public DatasetLifecycleManager(StorageProperties storageProperties, ILocalResourceRepository resourceRepository,
-            ILogManager logManager, IVirtualBufferCache vbc,
-            IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
+    public DatasetLifecycleManager(INCServiceContext serviceCtx, StorageProperties storageProperties,
+            ILocalResourceRepository resourceRepository, IRecoveryManager recoveryMgr, ILogManager logManager,
+            IVirtualBufferCache vbc, IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
             IDiskResourceCacheLockNotifier lockNotifier) {
+        this.serviceCtx = serviceCtx;
+        this.recoveryMgr = recoveryMgr;
         this.logManager = logManager;
         this.storageProperties = storageProperties;
         this.resourceRepository = resourceRepository;
@@ -130,7 +139,7 @@
         datasetResource.register(resource, (ILSMIndex) index);
     }
 
-    private int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
+    protected int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
         LocalResource lr = resourceRepository.get(resourcePath);
         if (lr == null) {
             return -1;
@@ -138,7 +147,7 @@
         return ((DatasetLocalResource) lr.getResource()).getDatasetId();
     }
 
-    private long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException {
+    protected long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException {
         LocalResource lr = resourceRepository.get(resourcePath);
         if (lr == null) {
             return -1;
@@ -146,6 +155,14 @@
         return lr.getId();
     }
 
+    private DatasetLocalResource getDatasetLocalResource(String resourcePath) throws HyracksDataException {
+        LocalResource lr = resourceRepository.get(resourcePath);
+        if (lr == null) {
+            return null;
+        }
+        return (DatasetLocalResource) lr.getResource();
+    }
+
     @Override
     public synchronized void unregister(String resourcePath) throws HyracksDataException {
         validateDatasetLifecycleManagerState();
@@ -193,6 +210,72 @@
     @Override
     public synchronized void open(String resourcePath) throws HyracksDataException {
         validateDatasetLifecycleManagerState();
+        DatasetLocalResource localResource = getDatasetLocalResource(resourcePath);
+        if (localResource == null) {
+            throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
+        }
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
+
+        lockNotifier.onOpen(resourceID);
+        try {
+            DatasetResource datasetResource = datasets.get(did);
+            int partition = localResource.getPartition();
+            if (shouldRecoverLazily(datasetResource, partition)) {
+                performLocalRecovery(resourcePath, datasetResource, partition);
+            } else {
+                openResource(resourcePath, false);
+            }
+        } finally {
+            lockNotifier.onClose(resourceID);
+        }
+    }
+
+    private void performLocalRecovery(String resourcePath, DatasetResource datasetResource, int partition)
+            throws HyracksDataException {
+        LOGGER.debug("performing local recovery for dataset {} partition {}", datasetResource.getDatasetInfo(),
+                partition);
+        FileReference indexRootRef = StoragePathUtil.getIndexRootPath(serviceCtx.getIoManager(), resourcePath);
+        Map<Long, LocalResource> resources = resourceRepository.getResources(r -> true, List.of(indexRootRef));
+
+        List<ILSMIndex> indexes = new ArrayList<>();
+        for (LocalResource resource : resources.values()) {
+            if (shouldSkipResource(resource)) {
+                continue;
+            }
+
+            ILSMIndex index = getOrCreateIndex(resource);
+            boolean undoTouch = !resourcePath.equals(resource.getPath());
+            openResource(resource.getPath(), undoTouch);
+            indexes.add(index);
+        }
+
+        if (!indexes.isEmpty()) {
+            recoveryMgr.recoverIndexes(indexes);
+        }
+
+        datasetResource.markRecovered(partition);
+    }
+
+    private boolean shouldSkipResource(LocalResource resource) {
+        DatasetLocalResource lr = (DatasetLocalResource) resource.getResource();
+        return MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId())
+                || (lr.getResource() instanceof LSMBTreeLocalResource
+                        && ((LSMBTreeLocalResource) lr.getResource()).isSecondaryNoIncrementalMaintenance());
+    }
+
+    private ILSMIndex getOrCreateIndex(LocalResource resource) throws HyracksDataException {
+        ILSMIndex index = get(resource.getPath());
+        if (index == null) {
+            DatasetLocalResource lr = (DatasetLocalResource) resource.getResource();
+            index = (ILSMIndex) lr.createInstance(serviceCtx);
+            register(resource.getPath(), index);
+        }
+        return index;
+    }
+
+    private void openResource(String resourcePath, boolean undoTouch) throws HyracksDataException {
+        validateDatasetLifecycleManagerState();
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
 
@@ -214,15 +297,36 @@
 
         dsr.open(true);
         dsr.touch();
-
-        if (!iInfo.isOpen()) {
-            ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
-            synchronized (opTracker) {
-                iInfo.getIndex().activate();
+        boolean indexTouched = false;
+        try {
+            if (!iInfo.isOpen()) {
+                ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
+                synchronized (opTracker) {
+                    iInfo.getIndex().activate();
+                }
+                iInfo.setOpen(true);
             }
-            iInfo.setOpen(true);
+            iInfo.touch();
+            indexTouched = true;
+        } finally {
+            if (undoTouch) {
+                dsr.untouch();
+                if (indexTouched) {
+                    iInfo.untouch();
+                }
+                lockNotifier.onClose(resourceID);
+            }
         }
-        iInfo.touch();
+    }
+
+    private boolean shouldRecoverLazily(DatasetResource resource, int partition) {
+        // Perform lazy recovery only if the following conditions are met:
+        // 1. Lazy recovery is enabled.
+        // 2. The resource does not belong to the Metadata dataverse.
+        // 3. The partition is being accessed for the first time.
+        return recoveryMgr.isLazyRecoveryEnabled()
+                && !MetadataIndexImmutableProperties.isMetadataDataset(resource.getDatasetID())
+                && !resource.isRecovered(partition);
     }
 
     public DatasetResource getDatasetLifecycle(int did) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index db9eabb..8e3081d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -20,7 +20,9 @@
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
@@ -48,12 +50,14 @@
     private final Map<Integer, PrimaryIndexOperationTracker> datasetPrimaryOpTrackers;
     private final Map<Integer, ILSMComponentIdGenerator> datasetComponentIdGenerators;
     private final Map<Integer, IRateLimiter> datasetRateLimiters;
+    private final Set<Integer> recoveredPartitions;
 
     public DatasetResource(DatasetInfo datasetInfo) {
         this.datasetInfo = datasetInfo;
         this.datasetPrimaryOpTrackers = new HashMap<>();
         this.datasetComponentIdGenerators = new HashMap<>();
         this.datasetRateLimiters = new HashMap<>();
+        this.recoveredPartitions = new HashSet<>();
     }
 
     public boolean isRegistered() {
@@ -127,6 +131,10 @@
         return datasetComponentIdGenerators.get(partition);
     }
 
+    public boolean isRecovered(int partitionId) {
+        return recoveredPartitions.contains(partitionId);
+    }
+
     public IRateLimiter getRateLimiter(int partition) {
         return datasetRateLimiters.get(partition);
     }
@@ -139,6 +147,14 @@
         datasetPrimaryOpTrackers.put(partition, opTracker);
     }
 
+    public void markRecovered(int partition) {
+        if (recoveredPartitions.contains(partition)) {
+            throw new IllegalStateException(
+                    "Index has already been recovered for dataset" + getDatasetID() + "partition " + partition);
+        }
+        recoveredPartitions.add(partition);
+    }
+
     public void setIdGenerator(int partition, ILSMComponentIdGenerator idGenerator) {
         if (datasetComponentIdGenerators.containsKey(partition)) {
             throw new IllegalStateException("LSMComponentIdGenerator has already been set for partition " + partition);
@@ -187,5 +203,6 @@
         datasetPrimaryOpTrackers.remove(partitionId);
         datasetComponentIdGenerators.remove(partitionId);
         datasetRateLimiters.remove(partitionId);
+        recoveredPartitions.remove(partitionId);
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index a5f79ac..3437d42 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -20,10 +20,12 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
 /**
  * Provides API for failure recovery. Failure could be at application level and
@@ -128,4 +130,19 @@
      */
     void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException;
 
+    /**
+     * Ensures that {@code datasetPartitionIndexes} are consistent by performing component id level recovery
+     *
+     * @param datasetPartitionIndexes A list of indexes associated with a specific
+     *                                dataset partition that require recovery.
+     * @throws HyracksDataException If an error occurs during the recovery or rollback
+     *                              process, indicating a failure to achieve consistency.
+     */
+    void recoverIndexes(List<ILSMIndex> datasetPartitionIndexes) throws HyracksDataException;
+
+    /**
+     * determines if the indexes need to be recovered lazily at the time of their first access
+     * @return
+     */
+    boolean isLazyRecoveryEnabled();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 28fd27e..2fdfba4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -139,6 +139,13 @@
         return ResourceReference.of(fileAbsolutePath).getFileRelativePath().toString();
     }
 
+    public static FileReference getIndexRootPath(IIOManager ioManager, String relativePath)
+            throws HyracksDataException {
+        int separatorIndex = relativePath.lastIndexOf(File.separatorChar);
+        String parentDirectory = relativePath.substring(0, separatorIndex);
+        return ioManager.resolve(parentDirectory);
+    }
+
     /**
      * Create a file
      * Note: this method is not thread safe. It is the responsibility of the caller to ensure no path conflict when
@@ -229,7 +236,17 @@
     }
 
     public static boolean isRelativeParent(FileReference parent, FileReference child) {
-        return child.getRelativePath().startsWith(parent.getRelativePath());
+        String childPath = child.getRelativePath();
+        String parentPath = parent.getRelativePath();
+        boolean isMatch = childPath.startsWith(parentPath);
+        if (isMatch) {
+            int parentPathLength = parentPath.length();
+            if (childPath.length() == parentPathLength) {
+                return true;
+            }
+            return childPath.charAt(parentPathLength) == File.separatorChar;
+        }
+        return false;
     }
 
     public static String getNamespacePath(INamespacePathResolver nsPathResolver, Namespace namespace, int partition) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 5c7d5ac..cb4e068 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -251,6 +251,7 @@
         return ioManager.resolve(fileName);
     }
 
+    @Override
     public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<FileReference> roots)
             throws HyracksDataException {
         beforeReadAccess();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java
index 491d476..46eb1d5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java
@@ -18,7 +18,12 @@
  */
 package org.apache.hyracks.storage.common;
 
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 
 public interface ILocalResourceRepository {
 
@@ -28,5 +33,8 @@
 
     void delete(String name) throws HyracksDataException;
 
+    Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<FileReference> resourceFolderRoot)
+            throws HyracksDataException;
+
     long maxId() throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java
index 2e756ea..5f2ccc6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java
@@ -19,9 +19,12 @@
 package org.apache.hyracks.storage.common;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Predicate;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
 
 public class TransientLocalResourceRepository implements ILocalResourceRepository {
 
@@ -55,6 +58,12 @@
     }
 
     @Override
+    public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<FileReference> roots)
+            throws HyracksDataException {
+        return Map.of();
+    }
+
+    @Override
     public long maxId() throws HyracksDataException {
         long maxResourceId = 0;