[ASTERIXDB-3574][STO] Taking resource-level lock instead of global lock

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Currently, all dataset and index metadata operations (`create`,
`register`, `open`, `close`, and `unregister`) in
`DatasetLifecycleManager` are synchronized using a global lock,
leading to contention and limiting concurrency.

To improve performance, the global lock is replaced with
resource-specific locks where applicable. This allows operations
on different datasets or indexes to proceed in parallel.

Ext-ref: MB-65695
Change-Id: I9e3b931b363f082f0a7c69a0454adfd37937bb60
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19484
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 269f6f7..5035d25 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -284,7 +284,6 @@
         long lsn = -1;
         ILSMIndex index = null;
         LocalResource localResource = null;
-        DatasetLocalResource localResourceMetadata = null;
         boolean foundWinner = false;
         JobEntityCommits jobEntityWinners = null;
 
@@ -354,12 +353,11 @@
                             //if index is not registered into IndexLifeCycleManager,
                             //create the index using LocalMetadata stored in LocalResourceRepository
                             //get partition path in this node
-                            localResourceMetadata = (DatasetLocalResource) localResource.getResource();
                             index = (ILSMIndex) datasetLifecycleManager.get(localResource.getPath());
                             if (index == null) {
                                 //#. create index instance and register to indexLifeCycleManager
-                                index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
-                                datasetLifecycleManager.register(localResource.getPath(), index);
+                                index = (ILSMIndex) datasetLifecycleManager.registerIfAbsent(localResource.getPath(),
+                                        null);
                                 datasetLifecycleManager.open(localResource.getPath());
                                 try {
                                     maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 952185b..7f3642b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -19,10 +19,10 @@
 package org.apache.asterix.common.context;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.common.api.IIOBlockingOperation;
 import org.apache.asterix.common.transactions.ILogManager;
@@ -57,8 +57,8 @@
     private boolean durable;
 
     public DatasetInfo(int datasetID, ILogManager logManager) {
-        this.partitionIndexes = new HashMap<>();
-        this.indexes = new HashMap<>();
+        this.partitionIndexes = new ConcurrentHashMap<>();
+        this.indexes = new ConcurrentHashMap<>();
         this.partitionPendingIO = new Int2IntOpenHashMap();
         this.setLastAccess(-1);
         this.datasetID = datasetID;
@@ -200,13 +200,13 @@
         return Collections.unmodifiableMap(indexes);
     }
 
-    public synchronized void addIndex(long resourceID, IndexInfo indexInfo) {
+    public void addIndex(long resourceID, IndexInfo indexInfo) {
         indexes.put(resourceID, indexInfo);
         partitionIndexes.computeIfAbsent(indexInfo.getPartition(), partition -> new HashSet<>()).add(indexInfo);
         LOGGER.debug("registered reference to index {}", indexInfo);
     }
 
-    public synchronized void removeIndex(long resourceID) {
+    public void removeIndex(long resourceID) {
         IndexInfo info = indexes.remove(resourceID);
         if (info != null) {
             partitionIndexes.get(info.getPartition()).remove(info);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 85916b0..c6edb4e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -23,10 +23,13 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.WeakHashMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.IntPredicate;
 import java.util.function.Predicate;
 
@@ -62,13 +65,16 @@
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.IResource;
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
 import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
 import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+@ThreadSafe
 public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent {
 
     private static final Logger LOGGER = LogManager.getLogger();
@@ -82,6 +88,8 @@
     private final LogRecord waitLog;
     protected final IDiskResourceCacheLockNotifier lockNotifier;
     private volatile boolean stopped = false;
+    private final ReentrantReadWriteLock stopLock;
+    private final Map<String, ReentrantReadWriteLock> resourceLocks;
     private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
     // all LSM-trees share the same virtual buffer cache list
     private final List<IVirtualBufferCache> vbcs;
@@ -96,6 +104,7 @@
         this.storageProperties = storageProperties;
         this.resourceRepository = resourceRepository;
         this.vbc = vbc;
+        this.stopLock = new ReentrantReadWriteLock();
         int numMemoryComponents = storageProperties.getMemoryComponentsNum();
         this.vbcs = new ArrayList<>(numMemoryComponents);
         for (int i = 0; i < numMemoryComponents; i++) {
@@ -103,13 +112,14 @@
         }
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
         this.lockNotifier = lockNotifier;
+        this.resourceLocks = Collections.synchronizedMap(new WeakHashMap<>());
         waitLog = new LogRecord();
         waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
         waitLog.computeAndSetLogSize();
     }
 
     @Override
-    public synchronized ILSMIndex get(String resourcePath) throws HyracksDataException {
+    public ILSMIndex get(String resourcePath) throws HyracksDataException {
         validateDatasetLifecycleManagerState();
         int datasetID = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -117,7 +127,7 @@
     }
 
     @Override
-    public synchronized ILSMIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
+    public ILSMIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
         validateDatasetLifecycleManagerState();
         DatasetResource datasetResource = datasets.get(datasetID);
         if (datasetResource == null) {
@@ -127,16 +137,52 @@
     }
 
     @Override
-    public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException {
-        validateDatasetLifecycleManagerState();
-        int did = getDIDfromResourcePath(resourcePath);
-        LocalResource resource = resourceRepository.get(resourcePath);
-        DatasetResource datasetResource = datasets.get(did);
-        lockNotifier.onRegister(resource, index);
-        if (datasetResource == null) {
-            datasetResource = getDatasetLifecycle(did);
+    public IIndex registerIfAbsent(String resourcePath, IIndex index) throws HyracksDataException {
+        stopLock.readLock().lock();
+        try {
+            validateDatasetLifecycleManagerState();
+
+            IIndex existingIndex = get(resourcePath);
+            if (existingIndex != null) {
+                return existingIndex;
+            }
+
+            ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
+            resourceLock.writeLock().lock();
+            try {
+                existingIndex = get(resourcePath);
+                if (existingIndex != null) {
+                    return existingIndex;
+                }
+
+                if (index == null) {
+                    index = getOrCreateIndex(resourcePath);
+                }
+
+                int datasetID = getDIDfromResourcePath(resourcePath);
+                LocalResource resource = resourceRepository.get(resourcePath);
+                lockNotifier.onRegister(resource, index);
+
+                DatasetResource datasetResource = datasets.get(datasetID);
+                if (datasetResource == null) {
+                    datasetResource = getDatasetLifecycle(datasetID);
+                }
+
+                datasetResource.register(resource, (ILSMIndex) index);
+            } finally {
+                resourceLock.writeLock().unlock();
+            }
+        } finally {
+            stopLock.readLock().unlock();
         }
-        datasetResource.register(resource, (ILSMIndex) index);
+
+        return index;
+    }
+
+    private ReentrantReadWriteLock getResourceLock(String resourcePath) {
+        // create fair locks inorder to avoid starving.
+        // actually I believe there shouldn't be any kinda starving as the separate locks are created for each resource.
+        return resourceLocks.computeIfAbsent(resourcePath, k -> new ReentrantReadWriteLock(true));
     }
 
     protected int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
@@ -163,115 +209,261 @@
         return (DatasetLocalResource) lr.getResource();
     }
 
+    /**
+     * Concurrency considerations for dataset operations:
+     *
+     * This method requires dataset locks to handle concurrent operations:
+     *
+     * 1. Dataset-level locking:
+     *    - The open() method works on all indexes of a dataset.
+     *    - The first-time open() call triggers recoverIndex(), which acquires the dataset lock.
+     *    - Race conditions may occur if index recovery happens while an index is being dropped.
+     *
+     * 2. Lock acquisition strategy:
+     *    - Both the dataset lock and the local resource lock must be acquired.
+     *    - This prevents register/open operations from occurring while an unregister operation is in progress.
+     *    - These locks must be held until the unregister operation completes.
+     *
+     * 3. Possible race scenarios:
+     *    Consider the following threads:
+     *      - t1: unregister(ds/0/idx1)
+     *      - t2: open(ds/0/idx1)
+     *      - t3: unregister(ds/0/idx2)
+     *      - t4: unregister(newDs/0/idx)
+     *    - If t2 is running, t1 and t3 should wait (same dataset), but t4 can proceed (different dataset).
+     *    - If t2 hasn't started yet (dataset lock not acquired), t1 and t3 could execute.
+     *
+     * 4. Race condition handling:
+     *    - If t1 starts unregistering, t2 starts opening, and t3 starts unregistering simultaneously:
+     *      - t2 takes the dataset lock.
+     *      - Depending on the timing of registration completion, the resource repository may or may not contain the index.
+     *      - If the index does not exist, an INDEX_DOES_NOT_EXIST error will occur.
+     *
+     * Note: At the compilation layer, exclusive dataset locks prevent DROP/UNREGISTER operations
+     *       from colliding with OPEN/REGISTER operations.
+     */
     @Override
-    public synchronized void unregister(String resourcePath) throws HyracksDataException {
-        validateDatasetLifecycleManagerState();
-        int did = getDIDfromResourcePath(resourcePath);
-        long resourceID = getResourceIDfromResourcePath(resourcePath);
-        DatasetResource dsr = datasets.get(did);
-        IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
-
-        if (dsr == null || iInfo == null) {
-            throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
-        }
-
-        lockNotifier.onUnregister(resourceID);
-        PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
-        if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
-            if (LOGGER.isErrorEnabled()) {
-                final String logMsg = String.format(
-                        "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
-                        resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
-                LOGGER.error(logMsg);
-            }
-            throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
-                    StoragePathUtil.getIndexNameFromPath(resourcePath));
-        }
-
-        // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
-        DatasetInfo dsInfo = dsr.getDatasetInfo();
-        dsInfo.waitForIO();
-        closeIndex(iInfo);
-        dsInfo.removeIndex(resourceID);
-        synchronized (dsInfo) {
-            int referenceCount = dsInfo.getReferenceCount();
-            boolean open = dsInfo.isOpen();
-            boolean empty = dsInfo.getIndexes().isEmpty();
-            if (referenceCount == 0 && open && empty && !dsInfo.isExternal()) {
-                LOGGER.debug("removing dataset {} from cache", dsInfo.getDatasetID());
-                removeDatasetFromCache(dsInfo.getDatasetID());
-            } else {
-                LOGGER.debug("keeping dataset {} in cache, ref count {}, open {}, indexes count: {}",
-                        dsInfo.getDatasetID(), referenceCount, open, dsInfo.getIndexes().size());
-            }
-        }
-    }
-
-    @Override
-    public synchronized void open(String resourcePath) throws HyracksDataException {
-        validateDatasetLifecycleManagerState();
-        DatasetLocalResource localResource = getDatasetLocalResource(resourcePath);
-        if (localResource == null) {
-            throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
-        }
-        int did = getDIDfromResourcePath(resourcePath);
-        long resourceID = getResourceIDfromResourcePath(resourcePath);
-
-        lockNotifier.onOpen(resourceID);
+    public void unregister(String resourcePath) throws HyracksDataException {
+        stopLock.readLock().lock();
         try {
-            DatasetResource datasetResource = datasets.get(did);
-            int partition = localResource.getPartition();
-            if (shouldRecoverLazily(datasetResource, partition)) {
-                performLocalRecovery(resourcePath, datasetResource, partition);
-            } else {
-                openResource(resourcePath, false);
+            validateDatasetLifecycleManagerState();
+            String datasetPartitionPath = StoragePathUtil.getDatasetPartitionPath(resourcePath);
+
+            ReentrantReadWriteLock partitionResourceLock = getResourceLock(datasetPartitionPath);
+            ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
+            partitionResourceLock.writeLock().lock();
+            try {
+                resourceLock.writeLock().lock();
+                try {
+                    int did = getDIDfromResourcePath(resourcePath);
+                    long resourceID = getResourceIDfromResourcePath(resourcePath);
+                    DatasetResource dsr = datasets.get(did);
+                    IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+
+                    if (dsr == null || iInfo == null) {
+                        throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
+                    }
+
+                    lockNotifier.onUnregister(resourceID);
+                    PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+                    if (iInfo.getReferenceCount() != 0
+                            || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+                        if (LOGGER.isErrorEnabled()) {
+                            final String logMsg = String.format(
+                                    "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+                                    resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+                            LOGGER.error(logMsg);
+                        }
+                        throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+                                StoragePathUtil.getIndexNameFromPath(resourcePath));
+                    }
+
+                    // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+                    DatasetInfo dsInfo = dsr.getDatasetInfo();
+                    dsInfo.waitForIO();
+                    closeIndex(iInfo);
+                    dsInfo.removeIndex(resourceID);
+                    synchronized (dsInfo) {
+                        int referenceCount = dsInfo.getReferenceCount();
+                        boolean open = dsInfo.isOpen();
+                        boolean empty = dsInfo.getIndexes().isEmpty();
+                        if (referenceCount == 0 && open && empty && !dsInfo.isExternal()) {
+                            LOGGER.debug("removing dataset {} from cache", dsInfo.getDatasetID());
+                            removeDatasetFromCache(dsInfo.getDatasetID());
+                        } else {
+                            LOGGER.debug("keeping dataset {} in cache, ref count {}, open {}, indexes count: {}",
+                                    dsInfo.getDatasetID(), referenceCount, open, dsInfo.getIndexes().size());
+                        }
+                    }
+                } finally {
+                    resourceLock.writeLock().unlock();
+                }
+            } finally {
+                partitionResourceLock.writeLock().unlock();
             }
         } finally {
-            lockNotifier.onClose(resourceID);
+            stopLock.readLock().unlock();
         }
     }
 
-    private void performLocalRecovery(String resourcePath, DatasetResource datasetResource, int partition)
-            throws HyracksDataException {
-        LOGGER.debug("performing local recovery for dataset {} partition {}", datasetResource.getDatasetInfo(),
-                partition);
-        FileReference indexRootRef = StoragePathUtil.getIndexRootPath(serviceCtx.getIoManager(), resourcePath);
-        Map<Long, LocalResource> resources = resourceRepository.getResources(r -> true, List.of(indexRootRef));
+    @Override
+    public void destroy(String resourcePath) throws HyracksDataException {
+        stopLock.readLock().lock();
+        try {
+            ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
+            resourceLock.writeLock().lock();
+            try {
+                LOGGER.info("Dropping index {} on node {}", resourcePath, serviceCtx.getNodeId());
+                IIndex index = get(resourcePath);
+                if (index != null) {
+                    unregister(resourcePath);
+                } else {
+                    index = readIndex(resourcePath);
+                }
+                if (getResourceId(resourcePath) != -1) {
+                    resourceRepository.delete(resourcePath);
+                }
+                index.destroy();
+            } finally {
+                resourceLock.writeLock().unlock();
+            }
+        } finally {
+            stopLock.readLock().unlock();
+        }
+    }
 
-        List<ILSMIndex> indexes = new ArrayList<>();
-        for (LocalResource resource : resources.values()) {
-            if (shouldSkipResource(resource)) {
-                continue;
+    private long getResourceId(String resourcePath) throws HyracksDataException {
+        LocalResource lr = resourceRepository.get(resourcePath);
+        return lr == null ? -1 : lr.getId();
+    }
+
+    @Override
+    public void open(String resourcePath) throws HyracksDataException {
+        stopLock.readLock().lock();
+        try {
+            validateDatasetLifecycleManagerState();
+
+            DatasetLocalResource localResource = getDatasetLocalResource(resourcePath);
+            if (localResource == null) {
+                throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
             }
 
-            ILSMIndex index = getOrCreateIndex(resource);
-            boolean undoTouch = !resourcePath.equals(resource.getPath());
-            openResource(resource.getPath(), undoTouch);
-            indexes.add(index);
-        }
+            int did = getDIDfromResourcePath(resourcePath);
+            long resourceID = getResourceIDfromResourcePath(resourcePath);
 
-        if (!indexes.isEmpty()) {
-            recoveryMgr.recoverIndexes(indexes);
-        }
+            // Notify before opening a resource
+            lockNotifier.onOpen(resourceID);
+            try {
+                DatasetResource datasetResource = datasets.get(did);
+                int partition = localResource.getPartition();
+                boolean lazyRecover = shouldRecoverLazily(datasetResource, partition);
 
-        datasetResource.markRecovered(partition);
+                if (!lazyRecover) {
+                    openResource(resourcePath, false);
+                    return;
+                }
+
+                // Perform local recovery by taking a lock on the root resource
+                boolean recoveredByCurrentThread = performLocalRecovery(resourcePath, datasetResource, partition);
+
+                /*
+                 * Concurrent Access Scenario for Index Resource Recovery:
+                 * ------------------------------------------------------
+                 * When multiple threads attempt to open the same index resource, a race
+                 * condition can occur.
+                 *
+                 * Example:
+                 * - Thread1 (handling ds/0/idx1) and Thread2 (handling ds/0/idx2) may both
+                 *   try to recover dataset indexes.
+                 * - Thread1 enters `ensureIndexOpenAndConsistent()`, detects that the resource
+                 *   is not recovered, and starts recovery.
+                 * - Before Thread1 marks recovery as complete, Thread2 also checks and finds
+                 *   the resource not recovered, so it attempts recovery too.
+                 *
+                 * However, index recovery is an expensive operation and should be performed by
+                 * only one thread. To prevent multiple recoveries, `ensureIndexOpenAndConsistent()`
+                 * must be synchronized on the root dataset resource. This ensures that only one
+                 * thread performs the recovery, while others wait.
+                 *
+                 * Behavior After Synchronization:
+                 * - Thread1 recovers and marks the index as recovered.
+                 * - Thread2, after acquiring the lock, sees that the index is already recovered.
+                 * - It returns `false` from `ensureIndexOpenAndConsistent()` and proceeds to call
+                 *   `open()`.
+                 * - Since `open()` is idempotent, if the index is already open (`iInfo.isOpen()`),
+                 *   it does nothing except incrementing open stats.
+                 */
+                if (!recoveredByCurrentThread) {
+                    openResource(resourcePath, false);
+                }
+            } finally {
+                lockNotifier.onClose(resourceID);
+            }
+        } finally {
+            stopLock.readLock().unlock();
+        }
     }
 
-    private boolean shouldSkipResource(LocalResource resource) {
+    private boolean performLocalRecovery(String resourcePath, DatasetResource datasetResource, int partition)
+            throws HyracksDataException {
+
+        String indexRootRefPath = StoragePathUtil.getDatasetPartitionPath(resourcePath);
+        ReentrantReadWriteLock resourceLock = getResourceLock(indexRootRefPath);
+        FileReference indexRootRef = serviceCtx.getIoManager().resolve(indexRootRefPath);
+        resourceLock.writeLock().lock();
+        try {
+            if (!shouldRecoverLazily(datasetResource, partition)) {
+                return false;
+            }
+            LOGGER.debug("performing local recovery for dataset {} partition {}", datasetResource.getDatasetInfo(),
+                    partition);
+            Map<Long, LocalResource> resources = resourceRepository.getResources(r -> true, List.of(indexRootRef));
+
+            List<ILSMIndex> indexes = new ArrayList<>();
+            for (LocalResource resource : resources.values()) {
+                if (shouldSkipRecoveringResource(resource)) {
+                    continue;
+                }
+
+                ILSMIndex index = (ILSMIndex) registerIfAbsent(resource.getPath(), null);
+                boolean undoTouch = !resourcePath.equals(resource.getPath());
+                openResource(resource.getPath(), undoTouch);
+                indexes.add(index);
+            }
+
+            if (!indexes.isEmpty()) {
+                recoveryMgr.recoverIndexes(indexes);
+            }
+
+            datasetResource.markRecovered(partition);
+            return true;
+        } finally {
+            resourceLock.writeLock().unlock();
+        }
+    }
+
+    private boolean shouldSkipRecoveringResource(LocalResource resource) {
         DatasetLocalResource lr = (DatasetLocalResource) resource.getResource();
         return MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId())
                 || (lr.getResource() instanceof LSMBTreeLocalResource
                         && ((LSMBTreeLocalResource) lr.getResource()).isSecondaryNoIncrementalMaintenance());
     }
 
-    private ILSMIndex getOrCreateIndex(LocalResource resource) throws HyracksDataException {
-        ILSMIndex index = get(resource.getPath());
-        if (index == null) {
-            DatasetLocalResource lr = (DatasetLocalResource) resource.getResource();
-            index = (ILSMIndex) lr.createInstance(serviceCtx);
-            register(resource.getPath(), index);
+    private IIndex getOrCreateIndex(String resourcePath) throws HyracksDataException {
+        IIndex index = get(resourcePath);
+        if (index != null) {
+            return index;
         }
-        return index;
+        return readIndex(resourcePath);
+    }
+
+    private IIndex readIndex(String resourcePath) throws HyracksDataException {
+        LocalResource lr = resourceRepository.get(resourcePath);
+        if (lr == null) {
+            throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
+        }
+        IResource resource = lr.getResource();
+        return resource.createInstance(serviceCtx);
     }
 
     private void openResource(String resourcePath, boolean undoTouch) throws HyracksDataException {
@@ -300,11 +492,15 @@
         boolean indexTouched = false;
         try {
             if (!iInfo.isOpen()) {
-                ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
-                synchronized (opTracker) {
-                    iInfo.getIndex().activate();
+                synchronized (iInfo) {
+                    if (!iInfo.isOpen()) {
+                        ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
+                        synchronized (opTracker) {
+                            iInfo.getIndex().activate();
+                        }
+                        iInfo.setOpen(true);
+                    }
                 }
-                iInfo.setOpen(true);
             }
             iInfo.touch();
             indexTouched = true;
@@ -334,15 +530,7 @@
         if (dsr != null) {
             return dsr;
         }
-        synchronized (datasets) {
-            dsr = datasets.get(did);
-            if (dsr == null) {
-                DatasetInfo dsInfo = new DatasetInfo(did, logManager);
-                dsr = new DatasetResource(dsInfo);
-                datasets.put(did, dsr);
-            }
-            return dsr;
-        }
+        return datasets.computeIfAbsent(did, k -> new DatasetResource(new DatasetInfo(did, logManager)));
     }
 
     @Override
@@ -351,46 +539,62 @@
     }
 
     @Override
-    public synchronized void close(String resourcePath) throws HyracksDataException {
-        DatasetResource dsr = null;
-        IndexInfo iInfo = null;
+    public void close(String resourcePath) throws HyracksDataException {
+        stopLock.readLock().lock();
         try {
-            validateDatasetLifecycleManagerState();
-            int did = getDIDfromResourcePath(resourcePath);
-            long resourceID = getResourceIDfromResourcePath(resourcePath);
-            dsr = datasets.get(did);
-            if (dsr == null) {
-                throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
+            DatasetResource dsr = null;
+            IndexInfo iInfo = null;
+
+            // A resource lock may not be necessary if the unregister case does not need handling.
+            ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
+            resourceLock.writeLock().lock();
+            try {
+                validateDatasetLifecycleManagerState();
+
+                int did = getDIDfromResourcePath(resourcePath);
+                long resourceID = getResourceIDfromResourcePath(resourcePath);
+
+                dsr = datasets.get(did);
+                if (dsr == null) {
+                    throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
+                }
+
+                iInfo = dsr.getIndexInfo(resourceID);
+                if (iInfo == null) {
+                    throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
+                }
+
+                lockNotifier.onClose(resourceID);
+            } finally {
+                // Regardless of any exceptions thrown in the try block (e.g., missing index),
+                // we must ensure that the index and dataset are marked as untouched.
+                if (iInfo != null) {
+                    iInfo.untouch();
+                }
+                if (dsr != null) {
+                    dsr.untouch();
+                }
+                resourceLock.writeLock().unlock();
             }
-            iInfo = dsr.getIndexInfo(resourceID);
-            if (iInfo == null) {
-                throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
-            }
-            lockNotifier.onClose(resourceID);
         } finally {
-            // Regardless of what exception is thrown in the try-block (e.g., line 279),
-            // we have to un-touch the index and dataset.
-            if (iInfo != null) {
-                iInfo.untouch();
-            }
-            if (dsr != null) {
-                dsr.untouch();
-            }
+            stopLock.readLock().unlock();
         }
     }
 
     @Override
-    public synchronized List<IIndex> getOpenResources() {
-        List<IndexInfo> openIndexesInfo = getOpenIndexesInfo();
-        List<IIndex> openIndexes = new ArrayList<>();
-        for (IndexInfo iInfo : openIndexesInfo) {
-            openIndexes.add(iInfo.getIndex());
+    public List<IIndex> getOpenResources() {
+        synchronized (datasets) {
+            List<IndexInfo> openIndexesInfo = getOpenIndexesInfo();
+            List<IIndex> openIndexes = new ArrayList<>();
+            for (IndexInfo iInfo : openIndexesInfo) {
+                openIndexes.add(iInfo.getIndex());
+            }
+            return openIndexes;
         }
-        return openIndexes;
     }
 
     @Override
-    public synchronized List<IndexInfo> getOpenIndexesInfo() {
+    public List<IndexInfo> getOpenIndexesInfo() {
         List<IndexInfo> openIndexesInfo = new ArrayList<>();
         for (DatasetResource dsr : datasets.values()) {
             for (IndexInfo iInfo : dsr.getIndexes().values()) {
@@ -412,27 +616,50 @@
     }
 
     @Override
-    public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path) {
+    public PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String resourcePath) {
         DatasetResource dataset = getDatasetLifecycle(datasetId);
         PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition);
-        if (opTracker == null) {
-            populateOpTrackerAndIdGenerator(dataset, partition, path);
-            opTracker = dataset.getOpTracker(partition);
+        if (opTracker != null) {
+            return opTracker;
         }
-        return opTracker;
+        ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
+        resourceLock.writeLock().lock();
+        try {
+            opTracker = dataset.getOpTracker(partition);
+            if (opTracker != null) {
+                return opTracker;
+            }
+            populateOpTrackerAndIdGenerator(dataset, partition, resourcePath);
+            opTracker = dataset.getOpTracker(partition);
+            return opTracker;
+        } finally {
+            resourceLock.writeLock().unlock();
+        }
     }
 
     @Override
-    public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path) {
+    public ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path) {
         DatasetResource dataset = datasets.get(datasetId);
         ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition);
-        if (generator == null) {
+        if (generator != null) {
+            return generator;
+        }
+        ReentrantReadWriteLock resourceLock = getResourceLock(path);
+        resourceLock.writeLock().lock();
+        try {
+            generator = dataset.getComponentIdGenerator(partition);
+            if (generator != null) {
+                return generator;
+            }
             populateOpTrackerAndIdGenerator(dataset, partition, path);
             generator = dataset.getComponentIdGenerator(partition);
+            return generator;
+        } finally {
+            resourceLock.writeLock().unlock();
         }
-        return generator;
     }
 
+    // this function is not being used.
     @Override
     public synchronized IRateLimiter getRateLimiter(int datasetId, int partition, long writeRateLimit) {
         DatasetResource dataset = datasets.get(datasetId);
@@ -444,7 +671,7 @@
     }
 
     @Override
-    public synchronized boolean isRegistered(int datasetId) {
+    public boolean isRegistered(int datasetId) {
         return datasets.containsKey(datasetId);
     }
 
@@ -476,34 +703,39 @@
     }
 
     @Override
-    public synchronized void flushAllDatasets() throws HyracksDataException {
+    public void flushAllDatasets() throws HyracksDataException {
         flushAllDatasets(partition -> true);
     }
 
     @Override
-    public synchronized void flushAllDatasets(IntPredicate partitions) throws HyracksDataException {
-        for (DatasetResource dsr : datasets.values()) {
-            if (dsr.getDatasetInfo().isOpen()) {
-                flushDatasetOpenIndexes(dsr, partitions, false);
+    public void flushAllDatasets(IntPredicate partitions) throws HyracksDataException {
+        synchronized (datasets) {
+            for (DatasetResource dsr : datasets.values()) {
+                if (dsr.getDatasetInfo().isOpen()) {
+                    flushDatasetOpenIndexes(dsr, partitions, false);
+                }
             }
         }
     }
 
     @Override
-    public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
-        DatasetResource dsr = datasets.get(datasetId);
-        if (dsr != null) {
-            flushDatasetOpenIndexes(dsr, p -> true, asyncFlush);
+    public void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
+        synchronized (datasets) {
+            DatasetResource dsr = datasets.get(datasetId);
+            if (dsr != null) {
+                flushDatasetOpenIndexes(dsr, p -> true, asyncFlush);
+            }
         }
     }
 
     @Override
-    public synchronized void asyncFlushMatchingIndexes(Predicate<ILSMIndex> indexPredicate)
-            throws HyracksDataException {
-        for (DatasetResource dsr : datasets.values()) {
-            for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
-                synchronized (opTracker) {
-                    asyncFlush(dsr, opTracker, indexPredicate);
+    public void asyncFlushMatchingIndexes(Predicate<ILSMIndex> indexPredicate) throws HyracksDataException {
+        synchronized (datasets) {
+            for (DatasetResource dsr : datasets.values()) {
+                for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
+                    synchronized (opTracker) {
+                        asyncFlush(dsr, opTracker, indexPredicate);
+                    }
                 }
             }
         }
@@ -594,38 +826,45 @@
     }
 
     @Override
-    public synchronized void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException {
-        ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
-        for (DatasetResource dsr : openDatasets) {
-            if (dsr.isOpen() && datasetsToClose.contains(dsr.getDatasetID())) {
-                closeDataset(dsr);
+    public void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException {
+        synchronized (datasets) {
+            for (DatasetResource dsr : datasets.values()) {
+                if (dsr.isOpen() && datasetsToClose.contains(dsr.getDatasetID())) {
+                    closeDataset(dsr);
+                }
             }
         }
     }
 
     @Override
-    public synchronized void closeAllDatasets() throws HyracksDataException {
-        ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
-        for (DatasetResource dsr : openDatasets) {
-            if (dsr.isOpen()) {
-                closeDataset(dsr);
+    public void closeAllDatasets() throws HyracksDataException {
+        synchronized (datasets) {
+            for (DatasetResource dsr : datasets.values()) {
+                if (dsr.isOpen()) {
+                    closeDataset(dsr);
+                }
             }
         }
     }
 
     @Override
     public synchronized void stop(boolean dumpState, OutputStream outputStream) throws IOException {
-        if (stopped) {
-            return;
-        }
-        if (dumpState) {
-            dumpState(outputStream);
-        }
+        stopLock.writeLock().lock();
+        try {
+            if (stopped) {
+                return;
+            }
+            if (dumpState) {
+                dumpState(outputStream);
+            }
 
-        closeAllDatasets();
+            closeAllDatasets();
 
-        datasets.clear();
-        stopped = true;
+            datasets.clear();
+            stopped = true;
+        } finally {
+            stopLock.writeLock().unlock();
+        }
     }
 
     @Override
@@ -665,9 +904,11 @@
     @Override
     public void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions)
             throws HyracksDataException {
-        for (DatasetResource dsr : datasets.values()) {
-            if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
-                flushDatasetOpenIndexes(dsr, partitions, false);
+        synchronized (datasets) {
+            for (DatasetResource dsr : datasets.values()) {
+                if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
+                    flushDatasetOpenIndexes(dsr, partitions, false);
+                }
             }
         }
     }
@@ -720,47 +961,60 @@
 
     //TODO refactor this method with unregister method
     @Override
-    public synchronized void closeIfOpen(String resourcePath) throws HyracksDataException {
-        validateDatasetLifecycleManagerState();
-        int did = getDIDfromResourcePath(resourcePath);
-        long resourceID = getResourceIDfromResourcePath(resourcePath);
+    public void closeIfOpen(String resourcePath) throws HyracksDataException {
+        stopLock.readLock().lock();
+        try {
+            validateDatasetLifecycleManagerState();
+            ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
+            resourceLock.writeLock().lock();
+            try {
+                int did = getDIDfromResourcePath(resourcePath);
+                long resourceID = getResourceIDfromResourcePath(resourcePath);
 
-        DatasetResource dsr = datasets.get(did);
-        IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+                DatasetResource dsr = datasets.get(did);
+                IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
 
-        if (dsr == null || iInfo == null) {
-            return;
-        }
+                if (dsr == null || iInfo == null) {
+                    return;
+                }
 
-        PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
-        if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
-            if (LOGGER.isErrorEnabled()) {
-                final String logMsg = String.format(
-                        "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
-                        resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
-                LOGGER.error(logMsg);
+                PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+                if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+                    if (LOGGER.isErrorEnabled()) {
+                        final String logMsg = String.format(
+                                "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+                                resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+                        LOGGER.error(logMsg);
+                    }
+                    throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+                            StoragePathUtil.getIndexNameFromPath(resourcePath));
+                }
+
+                // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+                DatasetInfo dsInfo = dsr.getDatasetInfo();
+                dsInfo.waitForIO();
+                closeIndex(iInfo);
+                dsInfo.removeIndex(resourceID);
+                synchronized (dsInfo) {
+                    if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
+                            && !dsInfo.isExternal()) {
+                        removeDatasetFromCache(dsInfo.getDatasetID());
+                    }
+                }
+            } finally {
+                resourceLock.writeLock().unlock();
             }
-            throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
-                    StoragePathUtil.getIndexNameFromPath(resourcePath));
-        }
-
-        // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
-        DatasetInfo dsInfo = dsr.getDatasetInfo();
-        dsInfo.waitForIO();
-        closeIndex(iInfo);
-        dsInfo.removeIndex(resourceID);
-        synchronized (dsInfo) {
-            if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
-                    && !dsInfo.isExternal()) {
-                removeDatasetFromCache(dsInfo.getDatasetID());
-            }
+        } finally {
+            stopLock.readLock().unlock();
         }
     }
 
     @Override
-    public synchronized void closePartition(int partitionId) {
-        for (DatasetResource ds : datasets.values()) {
-            ds.removePartition(partitionId);
+    public void closePartition(int partitionId) {
+        synchronized (datasets) {
+            for (DatasetResource ds : datasets.values()) {
+                ds.removePartition(partitionId);
+            }
         }
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 8e3081d..0c8f141 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -19,10 +19,10 @@
 package org.apache.asterix.common.context;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
@@ -54,9 +54,9 @@
 
     public DatasetResource(DatasetInfo datasetInfo) {
         this.datasetInfo = datasetInfo;
-        this.datasetPrimaryOpTrackers = new HashMap<>();
-        this.datasetComponentIdGenerators = new HashMap<>();
-        this.datasetRateLimiters = new HashMap<>();
+        this.datasetPrimaryOpTrackers = new ConcurrentHashMap<>();
+        this.datasetComponentIdGenerators = new ConcurrentHashMap<>();
+        this.datasetRateLimiters = new ConcurrentHashMap<>();
         this.recoveredPartitions = new HashSet<>();
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 2fdfba4..9f0f5c7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -139,11 +139,9 @@
         return ResourceReference.of(fileAbsolutePath).getFileRelativePath().toString();
     }
 
-    public static FileReference getIndexRootPath(IIOManager ioManager, String relativePath)
-            throws HyracksDataException {
+    public static String getDatasetPartitionPath(String relativePath) {
         int separatorIndex = relativePath.lastIndexOf(File.separatorChar);
-        String parentDirectory = relativePath.substring(0, separatorIndex);
-        return ioManager.resolve(parentDirectory);
+        return relativePath.substring(0, separatorIndex);
     }
 
     /**
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerConcurrentTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerConcurrentTest.java
new file mode 100644
index 0000000..d15a9d4
--- /dev/null
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerConcurrentTest.java
@@ -0,0 +1,2104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.config.StorageProperties;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.replication.IReplicationJob;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.cloud.IIndexDiskCacheManager;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexAccessParameters;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Concurrent tests for the DatasetLifecycleManager class.
+ *
+ * This test class verifies that the DatasetLifecycleManager properly handles
+ * concurrent register, open, close, and unregister operations on indexes.
+ *
+ * The tests use mock objects to simulate the index lifecycle and verify:
+ * 1. Thread safety of operations
+ * 2. Correct handling of re-registration attempts
+ * 3. Proper activation/deactivation during open/close
+ * 4. Correct identity preservation for index instances
+ * 5. Concurrent access to shared resources
+ *
+ * Each test method focuses on a specific aspect of concurrent behavior:
+ * - testConcurrentRegisterAndOpen: Tests register and open operations
+ * - testConcurrentRegisterOpenAndUnregister: Tests the full lifecycle
+ * - testRegisterAlreadyRegisteredIndex: Tests re-registration behavior
+ * - testConcurrentOpenSameIndex: Tests opening the same index concurrently
+ * - testConcurrentCloseAfterOpen: Tests closing after open operations
+ * - testMockIndexIdentity: Tests the mock setup itself
+ */
+public class DatasetLifecycleManagerConcurrentTest {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    // Configuration constants
+    private static final int NUM_DATASETS = 3;
+    private static final int NUM_PARTITIONS = 2;
+    private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors();
+    private static final int NUM_OPERATIONS_PER_THREAD = 20;
+
+    // For resource paths
+    private static final String DB_NAME = "Default";
+    private static final String SCOPE_NAME = "SDefault";
+    private static final String STORAGE_DIR = "storage";
+
+    private DatasetLifecycleManager datasetLifecycleManager;
+    private INCServiceContext mockServiceContext;
+    private StorageProperties mockStorageProperties;
+    private ILocalResourceRepository mockResourceRepository;
+    private IRecoveryManager mockRecoveryManager;
+    private ILogManager mockLogManager;
+    private IVirtualBufferCache mockVBC;
+    private IIndexCheckpointManagerProvider mockCheckpointProvider;
+    private IDiskResourceCacheLockNotifier mockLockNotifier;
+    private IIOManager mockIOManager;
+
+    private Map<String, MockIndex> mockIndexes;
+    private Map<String, LocalResource> mockResources;
+
+    private ExecutorService executorService;
+    private List<String> createdResourcePaths;
+
+    private Map<Integer, DatasetResource> datasetResourceMap;
+
+    // Add this field after the other private fields
+    private boolean lazyRecoveryEnabled = false;
+
+    /**
+     * Sets up the test environment with mock objects and resources.
+     * This includes:
+     * 1. Creating mock service context and other components
+     * 2. Setting up mock resources with unique IDs
+     * 3. Creating the DatasetLifecycleManager with mock dependencies
+     * 4. Setting up a thread pool for concurrent operations
+     */
+    @Before
+    public void setUp() throws Exception {
+        // Initialize collections
+        mockIndexes = new ConcurrentHashMap<>();
+        mockResources = new ConcurrentHashMap<>();
+        createdResourcePaths = new ArrayList<>();
+        datasetResourceMap = new HashMap<>();
+
+        // Set up mocks
+        mockServiceContext = mock(INCServiceContext.class);
+        mockStorageProperties = mock(StorageProperties.class);
+        mockResourceRepository = mock(ILocalResourceRepository.class);
+        mockRecoveryManager = mock(IRecoveryManager.class);
+        mockLogManager = mock(ILogManager.class);
+        mockVBC = mock(IVirtualBufferCache.class);
+        mockCheckpointProvider = mock(IIndexCheckpointManagerProvider.class);
+        mockLockNotifier = mock(IDiskResourceCacheLockNotifier.class);
+        mockIOManager = mock(IIOManager.class);
+
+        // Mock behavior
+        when(mockServiceContext.getIoManager()).thenReturn(mockIOManager);
+        when(mockStorageProperties.getMemoryComponentsNum()).thenReturn(2);
+        when(mockRecoveryManager.isLazyRecoveryEnabled()).thenReturn(lazyRecoveryEnabled);
+
+        // Create DatasetLifecycleManager FIRST
+        datasetLifecycleManager =
+                new DatasetLifecycleManager(mockServiceContext, mockStorageProperties, mockResourceRepository,
+                        mockRecoveryManager, mockLogManager, mockVBC, mockCheckpointProvider, mockLockNotifier);
+
+        // NEXT get the datasets map via reflection
+        try {
+            Field datasetsField = DatasetLifecycleManager.class.getDeclaredField("datasets");
+            datasetsField.setAccessible(true);
+            @SuppressWarnings("unchecked")
+            Map<Integer, DatasetResource> datasets =
+                    (Map<Integer, DatasetResource>) datasetsField.get(datasetLifecycleManager);
+            if (datasets != null) {
+                LOGGER.info("Accessed datasets map successfully, found {} entries", datasets.size());
+                // Store a direct reference to the actual map
+                datasetResourceMap = datasets;
+            } else {
+                LOGGER.warn("Retrieved datasets map is null");
+            }
+        } catch (Exception e) {
+            LOGGER.error("Failed to access datasets field via reflection", e);
+        }
+
+        // FINALLY setup mock resources (so they get the real datasets map)
+        setupMockResources();
+
+        executorService = Executors.newFixedThreadPool(NUM_THREADS);
+    }
+
+    /**
+     * Helper method to set the lazy recovery flag and reconfigure the mock
+     */
+    public void setLazyRecoveryEnabled(boolean enabled) {
+        this.lazyRecoveryEnabled = enabled;
+        when(mockRecoveryManager.isLazyRecoveryEnabled()).thenReturn(enabled);
+        LOGGER.info("Set lazy recovery enabled to: {}", enabled);
+    }
+
+    /**
+     * Creates mock resources for testing.
+     * For each combination of dataset and partition:
+     * 1. Creates primary and secondary indexes with unique resource IDs
+     * 2. Sets up the repository to return the appropriate resources
+     * 3. Verifies that all resources have unique IDs
+     *
+     * This is crucial for testing the register/open/unregister lifecycle,
+     * as it ensures each resource path maps to a unique mock index.
+     */
+    private void setupMockResources() throws HyracksDataException {
+        LOGGER.debug("Setting up mock resources");
+
+        // Setup mock resources for different datasets and partitions
+        for (int i = 0; i < NUM_DATASETS; i++) {
+            // Use datasetId starting from 101 to avoid reserved IDs (below 100)
+            int datasetId = 101 + i;
+            String datasetName = "ds" + i;
+            for (int j = 0; j < NUM_PARTITIONS; j++) {
+                // Create primary index
+                String primaryPath = getResourcePath(j, datasetName, 0, datasetName);
+                setupMockResource(primaryPath, datasetId, j, datasetId * 100 + j * 10);
+
+                // Create secondary index
+                String secondaryPath = getResourcePath(j, datasetName, 1, datasetName + "_idx");
+                setupMockResource(secondaryPath, datasetId, j, datasetId * 100 + j * 10 + 1);
+
+                createdResourcePaths.add(primaryPath);
+                createdResourcePaths.add(secondaryPath);
+            }
+        }
+
+        // Wire up the mockResourceRepository to return our mock resources
+        when(mockResourceRepository.get(anyString())).thenAnswer(invocation -> {
+            String path = invocation.getArgument(0);
+            LocalResource resource = mockResources.get(path);
+            LOGGER.debug("get({}) returning {}", path, resource);
+            return resource;
+        });
+
+        LOGGER.debug("Created {} mock resources", mockResources.size());
+    }
+
+    /**
+     * Sets up a mock resource for a specific resource path.
+     * For each resource path, this method:
+     * 1. Creates a unique MockIndex instance
+     * 2. Creates a mock DatasetLocalResource that returns the MockIndex
+     * 3. Creates a mock LocalResource with the unique resourceId
+     * 4. Sets up the IO manager to resolve the resource path
+     *
+     * The key pattern here is that each resource path must map to a unique
+     * MockIndex, and each resource must have a unique ID.
+     */
+    private void setupMockResource(String resourcePath, int datasetId, int partition, long resourceId)
+            throws HyracksDataException {
+
+        // Create and store a mock index, passing the datasetResourceMap
+        MockIndex mockIndex = new MockIndex(resourcePath, datasetId, datasetResourceMap);
+        mockIndexes.put(resourcePath, mockIndex);
+
+        // Create a mock dataset local resource
+        DatasetLocalResource mockDatasetResource = mock(DatasetLocalResource.class);
+        when(mockDatasetResource.getDatasetId()).thenReturn(datasetId);
+        when(mockDatasetResource.getPartition()).thenReturn(partition);
+
+        // Important: We need to ensure each createInstance call returns the specific MockIndex 
+        // for this resource path
+        when(mockDatasetResource.createInstance(mockServiceContext)).thenAnswer(invocation -> {
+            return mockIndexes.get(resourcePath);
+        });
+
+        // Create a mock local resource
+        LocalResource mockLocalResource = mock(LocalResource.class);
+        when(mockLocalResource.getId()).thenReturn(resourceId);
+        when(mockLocalResource.getPath()).thenReturn(resourcePath);
+        when(mockLocalResource.getResource()).thenReturn(mockDatasetResource);
+
+        mockResources.put(resourcePath, mockLocalResource);
+
+        // Create a mock file reference
+        FileReference mockFileRef = mock(FileReference.class);
+        when(mockFileRef.getRelativePath()).thenReturn(resourcePath);
+        when(mockIOManager.resolveAbsolutePath(resourcePath)).thenReturn(mockFileRef);
+    }
+
+    /**
+     * Generates a standardized resource path for a given partition, dataset, and index.
+     * Format: storage/partition_X/DbName/ScopeName/DatasetName/ReplicationFactor/ResourceID_IndexName
+     * This ensures consistent resource path formatting throughout the tests.
+     *
+     * Note: The replication factor is always 0 in production environments, so we match that here.
+     */
+    private String getResourcePath(int partition, String datasetName, int resourceId, String indexName) {
+        // Format: storage/partition_X/DbName/ScopeName/DatasetName/ReplicationFactor/IndexName
+        // Always use 0 for replication factor, and include resourceId as part of the index name
+        return String.format("%s/partition_%d/%s/%s/%s/0/%s_%s", STORAGE_DIR, partition, DB_NAME, SCOPE_NAME,
+                datasetName, resourceId, indexName);
+    }
+
+    /**
+     * Cleans up after tests by shutting down the executor service.
+     */
+    @After
+    public void tearDown() throws Exception {
+        executorService.shutdownNow();
+        executorService.awaitTermination(5, TimeUnit.SECONDS);
+        datasetLifecycleManager.stop(false, null);
+    }
+
+    /**
+     * Tests concurrent registration and opening of indexes.
+     * This test verifies that:
+     * 1. Multiple threads can concurrently register and open indexes without causing errors
+     * 2. Re-registration of an already registered index returns the same index instance
+     * 3. Indexes are properly activated when opened
+     * 4. All register and open operations properly maintain index state
+     */
+    @Test
+    public void testConcurrentRegisterIfAbsentAndOpen() throws Exception {
+        LOGGER.info("Starting testConcurrentRegisterAndOpen");
+
+        final CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS);
+        final AtomicInteger successCount = new AtomicInteger(0);
+        final AtomicInteger errorCount = new AtomicInteger(0);
+        final Map<String, IIndex> firstRegistrations = new ConcurrentHashMap<>();
+        final List<Future<?>> futures = new ArrayList<>();
+
+        // Register half of the resources upfront
+        int preRegisteredCount = createdResourcePaths.size() / 2;
+        for (int i = 0; i < preRegisteredCount; i++) {
+            String resourcePath = createdResourcePaths.get(i);
+            IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+            firstRegistrations.put(resourcePath, index);
+        }
+
+        // Create threads that will randomly register/re-register and open indexes
+        for (int i = 0; i < NUM_THREADS; i++) {
+            futures.add(executorService.submit(() -> {
+                try {
+                    barrier.await(); // Ensure all threads start at the same time
+
+                    Random random = new Random();
+                    for (int j = 0; j < NUM_OPERATIONS_PER_THREAD; j++) {
+                        // Pick a random resource path
+                        String resourcePath = createdResourcePaths.get(random.nextInt(createdResourcePaths.size()));
+
+                        // Randomly choose between register or open
+                        if (random.nextBoolean()) {
+                            IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+                            assertNotNull("Index should not be null after register", index);
+
+                            // If this path was previously registered, verify it's the same instance
+                            IIndex firstInstance = firstRegistrations.get(resourcePath);
+                            if (firstInstance != null) {
+                                assertSame("Re-registration should return the same index instance", firstInstance,
+                                        index);
+                            } else {
+                                // First time registering this path, record the instance
+                                firstRegistrations.putIfAbsent(resourcePath, index);
+                            }
+
+                            successCount.incrementAndGet();
+                        } else {
+                            try {
+                                // Try to register first if not already registered
+                                if (!firstRegistrations.containsKey(resourcePath)) {
+                                    IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+                                    firstRegistrations.putIfAbsent(resourcePath, index);
+                                }
+
+                                // Now open it
+                                datasetLifecycleManager.open(resourcePath);
+                                ILSMIndex index = (ILSMIndex) datasetLifecycleManager.get(resourcePath);
+                                assertNotNull("Index should not be null after open", index);
+                                assertTrue("Index should be activated after open", ((MockIndex) index).isActivated());
+                                successCount.incrementAndGet();
+                            } catch (Exception e) {
+                                throw e;
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    errorCount.incrementAndGet();
+                }
+            }));
+        }
+
+        // Wait for all threads to complete
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals("No unexpected errors should occur", 0, errorCount.get());
+        LOGGER.info("Completed testConcurrentRegisterAndOpen: {} successful operations", successCount.get());
+        assertTrue("Some operations should succeed", successCount.get() > 0);
+
+        // Verify all resources are now registered
+        for (String resourcePath : createdResourcePaths) {
+            IIndex index = firstRegistrations.get(resourcePath);
+            assertNotNull("All resources should be registered", index);
+
+            // Verify one final time that re-registration returns the same instance
+            IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+            assertSame("Final re-registration should return the same index instance", index, reRegisteredIndex);
+        }
+    }
+
+    /**
+     * Tests concurrent register, open, and unregister operations.
+     * This test verifies that:
+     * 1. Multiple threads can concurrently perform all lifecycle operations (register, open, unregister)
+     * 2. Re-registration of an already registered index returns the same index instance
+     * 3. Resources can be properly unregistered after being opened and closed
+     * 4. Concurrent lifecycle operations do not interfere with each other
+     */
+    @Test
+    public void testConcurrentRegisterIfAbsentOpenAndUnregister() throws Exception {
+        LOGGER.info("Starting testConcurrentRegisterOpenAndUnregister with {} threads", NUM_THREADS);
+
+        final Map<String, IIndex> registeredInstances = new HashMap<>();
+        final AtomicInteger successCount = new AtomicInteger(0);
+        final AtomicInteger expectedErrorCount = new AtomicInteger(0);
+        final AtomicInteger unexpectedErrorCount = new AtomicInteger(0);
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < NUM_THREADS; i++) {
+            futures.add(executorService.submit(() -> {
+                try {
+                    final Random random = new Random();
+                    for (int j = 0; j < NUM_OPERATIONS_PER_THREAD; j++) {
+                        // Pick a random resource path
+                        String resourcePath = createdResourcePaths.get(random.nextInt(createdResourcePaths.size()));
+
+                        try {
+                            // Randomly choose between register, open, or unregister
+                            int op = random.nextInt(3);
+                            if (op == 0) {
+                                // Register index
+                                LOGGER.debug("Thread {} registering resource {}", Thread.currentThread().getId(),
+                                        resourcePath);
+
+                                IIndex index = datasetLifecycleManager.get(resourcePath);
+                                synchronized (registeredInstances) {
+                                    registeredInstances.put(resourcePath, index);
+                                }
+
+                                LOGGER.debug("Thread {} registered resource {}", Thread.currentThread().getId(),
+                                        resourcePath);
+                                successCount.incrementAndGet();
+                            } else if (op == 1) {
+                                // Open index
+                                LOGGER.debug("Thread {} opening resource {}", Thread.currentThread().getId(),
+                                        resourcePath);
+
+                                IIndex index;
+                                synchronized (registeredInstances) {
+                                    index = registeredInstances.get(resourcePath);
+                                }
+
+                                if (index != null) {
+                                    try {
+                                        datasetLifecycleManager.open(resourcePath);
+                                        LOGGER.debug("Thread {} opened resource {}", Thread.currentThread().getId(),
+                                                resourcePath);
+                                        successCount.incrementAndGet();
+                                    } catch (HyracksDataException e) {
+                                        if (e.getMessage() != null
+                                                && e.getMessage().contains("HYR0104: Index does not exist")) {
+                                            LOGGER.debug("Thread {} failed to open unregistered resource {}: {}",
+                                                    Thread.currentThread().getId(), resourcePath, e.getMessage());
+                                            expectedErrorCount.incrementAndGet();
+                                        } else {
+                                            LOGGER.error("Thread {} failed to open resource {}: {}",
+                                                    Thread.currentThread().getId(), resourcePath, e.getMessage(), e);
+                                            unexpectedErrorCount.incrementAndGet();
+                                        }
+                                    }
+                                }
+                            } else {
+                                // Unregister index
+                                LOGGER.debug("Thread {} unregistering resource {}", Thread.currentThread().getId(),
+                                        resourcePath);
+
+                                try {
+                                    datasetLifecycleManager.unregister(resourcePath);
+                                    LOGGER.debug("Thread {} unregistered resource {}", Thread.currentThread().getId(),
+                                            resourcePath);
+                                    synchronized (registeredInstances) {
+                                        registeredInstances.remove(resourcePath);
+                                    }
+                                    successCount.incrementAndGet();
+                                } catch (HyracksDataException e) {
+                                    if (e.getMessage() != null
+                                            && (e.getMessage().contains("HYR0104: Index does not exist")
+                                                    || e.getMessage().contains("HYR0105: Cannot drop in-use index"))) {
+                                        LOGGER.debug("Thread {} failed to unregister resource {}: {}",
+                                                Thread.currentThread().getId(), resourcePath, e.getMessage());
+                                        expectedErrorCount.incrementAndGet();
+                                    } else {
+                                        LOGGER.error("Thread {} failed to unregister resource {}: {}",
+                                                Thread.currentThread().getId(), resourcePath, e.getMessage(), e);
+                                        unexpectedErrorCount.incrementAndGet();
+                                    }
+                                }
+                            }
+                        } catch (Exception e) {
+                            LOGGER.error("Thread {} encountered error on resource {}: {}",
+                                    Thread.currentThread().getId(), resourcePath, e.getMessage(), e);
+                            unexpectedErrorCount.incrementAndGet();
+                        }
+                    }
+                } catch (Exception e) {
+                    LOGGER.error("Thread {} encountered unexpected error: {}", Thread.currentThread().getId(),
+                            e.getMessage(), e);
+                    unexpectedErrorCount.incrementAndGet();
+                }
+            }));
+        }
+
+        // Wait for all threads to complete
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        // Print final stats
+        int remainingRegistered = registeredInstances.size();
+        LOGGER.info(
+                "testConcurrentRegisterOpenAndUnregister completed - Success: {}, Expected Errors: {}, Unexpected Errors: {}, Remaining registered: {}",
+                successCount.get(), expectedErrorCount.get(), unexpectedErrorCount.get(), remainingRegistered);
+
+        // Check results
+        assertEquals("No unexpected errors should occur", 0, unexpectedErrorCount.get());
+        assertTrue("Some operations should succeed", successCount.get() > 0);
+        LOGGER.info("Expected errors occurred: {} - these are normal in concurrent environment",
+                expectedErrorCount.get());
+    }
+
+    /**
+     * Tests behavior when re-registering already registered indexes.
+     * This test verifies that:
+     * 1. Registering an index returns a valid index instance
+     * 2. Re-registering the same index returns the same instance (not a new one)
+     * 3. Concurrent re-registrations and opens work correctly
+     * 4. The identity of index instances is preserved throughout operations
+     */
+    @Test
+    public void testRegisterIfAbsentAlreadyRegisteredIndex() throws Exception {
+        LOGGER.info("Starting testRegisterAlreadyRegisteredIndex");
+
+        // Register all resources first
+        Map<String, IIndex> firstRegistrations = new HashMap<>();
+        for (String resourcePath : createdResourcePaths) {
+            IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+            assertNotNull("First registration should succeed", index);
+            firstRegistrations.put(resourcePath, index);
+
+            // Verify the returned index is the same as our mock index
+            MockIndex mockIndex = mockIndexes.get(resourcePath);
+            assertSame("Register should return our mock index for " + resourcePath, mockIndex, index);
+        }
+
+        // Try to register again - should return the same instances without re-registering
+        for (String resourcePath : createdResourcePaths) {
+            IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+            assertNotNull("Re-registration should succeed", reRegisteredIndex);
+            // Verify we get back the same instance (no re-registration occurred)
+            assertSame("Re-registration should return the same index instance", firstRegistrations.get(resourcePath),
+                    reRegisteredIndex);
+
+            // Double check it's still our mock index
+            MockIndex mockIndex = mockIndexes.get(resourcePath);
+            assertSame("Re-register should still return our mock index for " + resourcePath, mockIndex,
+                    reRegisteredIndex);
+        }
+
+        // Now try concurrent open and re-register operations
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch completionLatch = new CountDownLatch(NUM_THREADS);
+        final AtomicInteger openSuccesses = new AtomicInteger(0);
+        final ConcurrentHashMap<String, Integer> reRegisterCounts = new ConcurrentHashMap<>();
+
+        for (int i = 0; i < NUM_THREADS; i++) {
+            final int threadId = i;
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+
+                    // Even threads try to open, odd threads try to re-register
+                    if (threadId % 2 == 0) {
+                        String resourcePath = createdResourcePaths.get(threadId % createdResourcePaths.size());
+                        datasetLifecycleManager.open(resourcePath);
+                        ILSMIndex index = (ILSMIndex) datasetLifecycleManager.get(resourcePath);
+                        assertNotNull("Index should not be null after open", index);
+                        assertTrue("Index should be activated after open", ((MockIndex) index).isActivated());
+                        openSuccesses.incrementAndGet();
+                    } else {
+                        String resourcePath = createdResourcePaths.get(threadId % createdResourcePaths.size());
+                        IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+                        assertNotNull("Re-registration should succeed", reRegisteredIndex);
+                        // Keep track of re-registrations
+                        reRegisterCounts.compute(resourcePath, (k, v) -> v == null ? 1 : v + 1);
+                        // Verify it's the same instance as the original registration
+                        assertSame("Re-registration should return the same index instance",
+                                firstRegistrations.get(resourcePath), reRegisteredIndex);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+                    completionLatch.countDown();
+                }
+            });
+        }
+
+        // Start all threads
+        startLatch.countDown();
+
+        // Wait for completion
+        completionLatch.await(10, TimeUnit.SECONDS);
+
+        // Verify results
+        assertEquals("Half of threads should succeed in opening", NUM_THREADS / 2, openSuccesses.get());
+
+        // Verify that re-registration attempts occurred for each resource
+        for (String path : createdResourcePaths) {
+            // Some resources should have been re-registered multiple times
+            Integer reRegCount = reRegisterCounts.get(path);
+            if (reRegCount != null) {
+                LOGGER.info("Resource path {} was re-registered {} times", path, reRegCount);
+            }
+        }
+
+        LOGGER.info("Completed testRegisterAlreadyRegisteredIndex: {} successful opens", openSuccesses.get());
+    }
+
+    /**
+     * Tests concurrent opening of the same index by multiple threads.
+     * This test verifies that:
+     * 1. Multiple threads can concurrently open the same index without errors
+     * 2. The index is properly activated after being opened
+     * 3. Concurrent opens do not interfere with each other
+     */
+    @Test
+    public void testConcurrentOpenSameIndex() throws Exception {
+        LOGGER.info("Starting testConcurrentOpenSameIndex");
+
+        final int THREADS = 10;
+        final String RESOURCE_PATH = createdResourcePaths.get(0); // First created resource path
+        final CountDownLatch startLatch = new CountDownLatch(1);
+        final CountDownLatch completionLatch = new CountDownLatch(THREADS);
+        final AtomicInteger errors = new AtomicInteger(0);
+
+        // Register the index first - REQUIRED before open
+        datasetLifecycleManager.registerIfAbsent(RESOURCE_PATH, null);
+
+        // Create threads that will all open the same index
+        for (int i = 0; i < THREADS; i++) {
+            executorService.submit(() -> {
+                try {
+                    startLatch.await();
+                    datasetLifecycleManager.open(RESOURCE_PATH);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    errors.incrementAndGet();
+                } finally {
+                    completionLatch.countDown();
+                }
+            });
+        }
+
+        // Start all threads simultaneously
+        startLatch.countDown();
+
+        // Wait for all threads to complete
+        completionLatch.await(10, TimeUnit.SECONDS);
+
+        assertEquals("No errors should occur when opening the same index concurrently", 0, errors.get());
+
+        // Verify the index is actually open
+        ILSMIndex index = (ILSMIndex) datasetLifecycleManager.get(RESOURCE_PATH);
+        assertNotNull("Index should be retrievable", index);
+        assertTrue("Index should be activated", ((MockIndex) index).isActivated());
+
+        LOGGER.info("Completed testConcurrentOpenSameIndex: index activated={}", ((MockIndex) index).isActivated());
+    }
+
+    /**
+     * Tests concurrent closing of an index after it has been opened.
+     * This test verifies that:
+     * 1. An index can be closed after being opened
+     * 2. Multiple threads can attempt to close the same index
+     * 3. The index is properly deactivated after being closed
+     */
+    @Test
+    public void testConcurrentCloseAfterOpen() throws Exception {
+        LOGGER.info("Starting testConcurrentCloseAfterOpen");
+
+        final String RESOURCE_PATH = createdResourcePaths.get(1); // Second created resource path
+        final CountDownLatch openLatch = new CountDownLatch(1);
+        final CountDownLatch closeLatch = new CountDownLatch(1);
+        final AtomicBoolean openSuccess = new AtomicBoolean(false);
+        final AtomicBoolean closeSuccess = new AtomicBoolean(false);
+
+        // Register the index first
+        datasetLifecycleManager.registerIfAbsent(RESOURCE_PATH, null);
+
+        // Thread to open the index
+        executorService.submit(() -> {
+            try {
+                datasetLifecycleManager.open(RESOURCE_PATH);
+                openSuccess.set(true);
+                openLatch.countDown();
+
+                // Wait a bit to simulate work with the open index
+                Thread.sleep(100);
+            } catch (Exception e) {
+                e.printStackTrace();
+                fail("Open operation failed: " + e.getMessage());
+            }
+        });
+
+        // Thread to close the index after it's opened
+        executorService.submit(() -> {
+            try {
+                openLatch.await(); // Wait for open to complete
+                datasetLifecycleManager.close(RESOURCE_PATH);
+                closeSuccess.set(true);
+                closeLatch.countDown();
+            } catch (Exception e) {
+                e.printStackTrace();
+                fail("Close operation failed: " + e.getMessage());
+            }
+        });
+
+        // Wait for operations to complete
+        closeLatch.await(5, TimeUnit.SECONDS);
+
+        assertTrue("Open operation should succeed", openSuccess.get());
+        assertTrue("Close operation should succeed", closeSuccess.get());
+
+        LOGGER.info("Completed testConcurrentCloseAfterOpen: openSuccess={}, closeSuccess={}", openSuccess.get(),
+                closeSuccess.get());
+    }
+
+    /**
+     * Tests the identity and uniqueness of mock objects and resources.
+     * This test verifies that:
+     * 1. Each resource path has a unique MockIndex instance
+     * 2. Each LocalResource has a unique ID
+     * 3. The createInstance method returns the correct MockIndex for each resource path
+     * 4. Registration returns the expected MockIndex instance
+     *
+     * This test is crucial for ensuring the test infrastructure itself is correctly set up.
+     */
+    @Test
+    public void testMockIndexIdentity() throws HyracksDataException {
+        LOGGER.info("Starting testMockIndexIdentity");
+
+        // Verify that the mockIndexes map is correctly populated
+        assertTrue("Mock indexes should be created", !mockIndexes.isEmpty());
+        assertEquals("Should have created mocks for all resource paths", createdResourcePaths.size(),
+                mockIndexes.size());
+
+        // Verify each local resource has a unique ID
+        Set<Long> resourceIds = new HashSet<>();
+        for (LocalResource resource : mockResources.values()) {
+            long id = resource.getId();
+            if (!resourceIds.add(id)) {
+                fail("Duplicate resource ID found: " + id + " for path: " + resource.getPath());
+            }
+        }
+
+        // For each resource path, verify the mock setup
+        for (String resourcePath : createdResourcePaths) {
+            // Check if we have a mock index for this path
+            MockIndex expectedMockIndex = mockIndexes.get(resourcePath);
+            assertNotNull("Should have a mock index for " + resourcePath, expectedMockIndex);
+
+            // Get the local resource
+            LocalResource lr = mockResourceRepository.get(resourcePath);
+            assertNotNull("Should have a local resource for " + resourcePath, lr);
+
+            // Print resource ID for verification
+            LOGGER.info("Resource path: {}, ID: {}", resourcePath, lr.getId());
+
+            // Get the dataset resource
+            DatasetLocalResource datasetResource = (DatasetLocalResource) lr.getResource();
+            assertNotNull("Should have a dataset resource for " + resourcePath, datasetResource);
+
+            // Call createInstance and verify we get our mock index
+            IIndex createdIndex = datasetResource.createInstance(mockServiceContext);
+            assertSame("createInstance should return our mock index for " + resourcePath, expectedMockIndex,
+                    createdIndex);
+
+            // Now register and verify we get the same mock index
+            IIndex registeredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+            assertSame("Registered index should be our mock index for " + resourcePath, expectedMockIndex,
+                    registeredIndex);
+
+            // Confirm the reference equality of these objects
+            LOGGER.info("Path: {}", resourcePath);
+            LOGGER.info("  Expected mock: {}", System.identityHashCode(expectedMockIndex));
+            LOGGER.info("  Created instance: {}", System.identityHashCode(createdIndex));
+            LOGGER.info("  Registered instance: {}", System.identityHashCode(registeredIndex));
+        }
+
+        LOGGER.info("Completed testMockIndexIdentity: verified {} resources", createdResourcePaths.size());
+    }
+
+    /**
+     * Tests that unregistering indexes works correctly, but only after they have been properly registered and opened.
+     * This test verifies that:
+     * 1. Indexes can be registered and opened successfully
+     * 2. All indexes are properly activated after opening
+     * 3. Indexes can be unregistered after being registered and opened
+     * 4. After unregistering, the indexes are no longer retrievable
+     * 5. The correct lifecycle sequence (register → open → unregister) is enforced
+     */
+    @Test
+    public void testUnregisterAfterRegisterIfAbsentAndOpen() throws Exception {
+        LOGGER.info("Starting testUnregisterAfterRegisterAndOpen");
+
+        Map<String, IIndex> registeredIndexes = new HashMap<>();
+        int totalIndexes = createdResourcePaths.size();
+        AtomicInteger successfulUnregisters = new AtomicInteger(0);
+
+        // Step 1: Register and open all indexes sequentially
+        for (String resourcePath : createdResourcePaths) {
+            LOGGER.debug("Registering and opening: {}", resourcePath);
+
+            // Register the index
+            IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+            assertNotNull("Index should be registered successfully", index);
+            registeredIndexes.put(resourcePath, index);
+
+            // After registration, we should have a DatasetResource
+            LocalResource resource = mockResources.get(resourcePath);
+            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
+            int datasetId = datasetLocalResource.getDatasetId();
+            int partition = datasetLocalResource.getPartition();
+
+            // Check if we have a DatasetResource in our map
+            DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+            if (datasetResource != null) {
+                LOGGER.debug("Found DatasetResource for dataset {}", datasetId);
+
+                // See if there's an operation tracker for this partition already
+                ILSMOperationTracker existingTracker = null;
+                try {
+                    // Use reflection to get the datasetPrimaryOpTrackers map
+                    Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+                    opTrackersField.setAccessible(true);
+                    Map<Integer, ILSMOperationTracker> opTrackers =
+                            (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
+
+                    // Check if we already have a tracker for this partition
+                    existingTracker = opTrackers.get(partition);
+                    LOGGER.debug("Existing tracker for partition {}: {}", partition, existingTracker);
+                } catch (Exception e) {
+                    LOGGER.error("Failed to access opTrackers field via reflection", e);
+                }
+            } else {
+                LOGGER.debug("No DatasetResource found for dataset {}", datasetId);
+            }
+
+            // Open the index
+            datasetLifecycleManager.open(resourcePath);
+
+            // Verify index is activated
+            MockIndex mockIndex = (MockIndex) index;
+            assertTrue("Index should be activated after opening: " + resourcePath, mockIndex.isActivated());
+
+            // Verify it's retrievable
+            long resourceId = resource.getId();
+            IIndex retrievedIndex = datasetLifecycleManager.getIndex(datasetId, resourceId);
+            assertSame("Retrieved index should be the same instance", index, retrievedIndex);
+        }
+
+        LOGGER.info("All {} indexes registered and opened successfully", totalIndexes);
+
+        // Step 2: Close and then unregister each index
+        for (String resourcePath : createdResourcePaths) {
+            LOGGER.debug("Closing and unregistering: {}", resourcePath);
+
+            // Verify the index is activated before closing
+            IIndex index = registeredIndexes.get(resourcePath);
+            MockIndex mockIndex = (MockIndex) index;
+            assertTrue("Index should be activated before closing: " + resourcePath, mockIndex.isActivated());
+
+            // Get resource information before closing
+            LocalResource resource = mockResources.get(resourcePath);
+            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
+            int datasetId = datasetLocalResource.getDatasetId();
+            int partition = datasetLocalResource.getPartition();
+
+            // Close the index first to ensure reference count goes to 0
+            LOGGER.debug("Closing index: {}", resourcePath);
+            datasetLifecycleManager.close(resourcePath);
+
+            // Brief pause to allow any asynchronous operations to complete
+            Thread.sleep(50);
+
+            // Get the operation tracker and verify it has 0 active operations
+            DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+            ILSMOperationTracker opTracker = mockIndex.getOperationTracker();
+
+            LOGGER.debug("Before unregister: Resource path={}, datasetId={}, partition={}, tracker={}", resourcePath,
+                    datasetId, partition, opTracker);
+
+            // Unregister the index after it's closed
+            LOGGER.debug("Unregistering index: {}", resourcePath);
+            try {
+                datasetLifecycleManager.unregister(resourcePath);
+                successfulUnregisters.incrementAndGet();
+                LOGGER.debug("Successfully unregistered index: {}", resourcePath);
+            } catch (Exception e) {
+                LOGGER.error("Failed to unregister index: {}", resourcePath, e);
+                fail("Failed to unregister index: " + resourcePath + ": " + e.getMessage());
+            }
+
+            // Verify the index is no longer retrievable
+            try {
+                LocalResource resourceAfterUnregister = mockResources.get(resourcePath);
+                long resourceIdAfterUnregister = resourceAfterUnregister.getId();
+                IIndex retrievedIndexAfterUnregister =
+                        datasetLifecycleManager.getIndex(datasetId, resourceIdAfterUnregister);
+                Assert.assertNull("Index should not be retrievable after unregister: " + resourcePath,
+                        retrievedIndexAfterUnregister);
+            } catch (HyracksDataException e) {
+                // This is also an acceptable outcome if getIndex throws an exception for unregistered indexes
+                LOGGER.debug("Expected exception when retrieving unregistered index: {}", e.getMessage());
+            }
+        }
+
+        assertEquals("All indexes should be unregistered", totalIndexes, successfulUnregisters.get());
+        LOGGER.info("Completed testUnregisterAfterRegisterAndOpen: successfully unregistered {} indexes",
+                successfulUnregisters.get());
+    }
+
+    /**
+     * Tests that only one primary operation tracker is created per partition per dataset.
+     * This test verifies that:
+     * 1. When multiple indexes for the same dataset and partition are registered, they share the same operation tracker
+     * 2. Different partitions of the same dataset have different operation trackers
+     * 3. Different datasets have completely separate operation trackers
+     */
+    @Test
+    public void testPrimaryOperationTrackerSingularity() throws Exception {
+        LOGGER.info("Starting testPrimaryOperationTrackerSingularity");
+
+        // Maps to track operation trackers by dataset and partition
+        Map<Integer, Map<Integer, Object>> datasetToPartitionToTracker = new HashMap<>();
+        Map<String, IIndex> registeredIndexes = new HashMap<>();
+
+        // Step 1: Register all indexes first
+        for (String resourcePath : createdResourcePaths) {
+            LOGGER.debug("Registering index: {}", resourcePath);
+
+            // Register the index
+            IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+            assertNotNull("Index should be registered successfully", index);
+            registeredIndexes.put(resourcePath, index);
+
+            // Get dataset and partition information
+            LocalResource resource = mockResources.get(resourcePath);
+            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
+            int datasetId = datasetLocalResource.getDatasetId();
+            int partition = datasetLocalResource.getPartition();
+
+            // Check if we have a DatasetResource in our map
+            DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+            assertNotNull("DatasetResource should be created for dataset " + datasetId, datasetResource);
+
+            // At this point, operation tracker might not exist yet since it's created lazily during open()
+            try {
+                // Use reflection to get the datasetPrimaryOpTrackers map
+                Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+                opTrackersField.setAccessible(true);
+                Map<Integer, ILSMOperationTracker> opTrackers =
+                        (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
+
+                // Check if tracker already exists (might be null)
+                Object existingTracker = opTrackers.get(partition);
+                LOGGER.debug("Before open(): Tracker for dataset {} partition {}: {}", datasetId, partition,
+                        existingTracker);
+            } catch (Exception e) {
+                LOGGER.error("Failed to access opTrackers field via reflection", e);
+            }
+        }
+
+        // Step 2: Open all indexes to trigger operation tracker creation
+        for (String resourcePath : createdResourcePaths) {
+            LOGGER.debug("Opening index: {}", resourcePath);
+
+            // Open the index to trigger operation tracker creation
+            datasetLifecycleManager.open(resourcePath);
+
+            // Get dataset and partition information
+            LocalResource resource = mockResources.get(resourcePath);
+            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
+            int datasetId = datasetLocalResource.getDatasetId();
+            int partition = datasetLocalResource.getPartition();
+
+            // Now the operation tracker should exist
+            DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+            Object opTracker = null;
+            try {
+                // Use reflection to get the datasetPrimaryOpTrackers map
+                Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+                opTrackersField.setAccessible(true);
+                Map<Integer, ILSMOperationTracker> opTrackers =
+                        (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
+
+                // Get the tracker for this partition
+                opTracker = opTrackers.get(partition);
+                assertNotNull("Operation tracker should exist after open() for partition " + partition, opTracker);
+
+                LOGGER.debug("After open(): Found tracker for dataset {} partition {}: {}", datasetId, partition,
+                        System.identityHashCode(opTracker));
+            } catch (Exception e) {
+                LOGGER.error("Failed to access opTrackers field via reflection", e);
+                fail("Failed to access operation trackers: " + e.getMessage());
+            }
+
+            // Store the tracker in our map for later comparison
+            datasetToPartitionToTracker.computeIfAbsent(datasetId, k -> new HashMap<>()).putIfAbsent(partition,
+                    opTracker);
+        }
+
+        // Step 3: Create and register secondary indexes for each dataset/partition
+        List<String> secondaryPaths = new ArrayList<>();
+        for (String resourcePath : createdResourcePaths) {
+            // Create a "secondary index" path by appending "-secondary" to the resource path
+            String secondaryPath = resourcePath + "-secondary";
+            secondaryPaths.add(secondaryPath);
+
+            // Create a mock resource for this secondary index
+            LocalResource originalResource = mockResources.get(resourcePath);
+            DatasetLocalResource originalDatasetLocalResource = (DatasetLocalResource) originalResource.getResource();
+            int datasetId = originalDatasetLocalResource.getDatasetId();
+            int partition = originalDatasetLocalResource.getPartition();
+
+            // Set up a new mock resource with the same dataset and partition but a different path
+            setupMockResource(secondaryPath, datasetId, partition, originalResource.getId() + 10000);
+
+            LOGGER.debug("Registering secondary index: {}", secondaryPath);
+
+            // Register the secondary index
+            IIndex secondaryIndex = datasetLifecycleManager.registerIfAbsent(secondaryPath, null);
+            assertNotNull("Secondary index should be registered successfully", secondaryIndex);
+            registeredIndexes.put(secondaryPath, secondaryIndex);
+        }
+
+        // Step 4: Open the secondary indexes to trigger operation tracker reuse
+        for (String secondaryPath : secondaryPaths) {
+            LOGGER.debug("Opening secondary index: {}", secondaryPath);
+
+            // Open the secondary index
+            datasetLifecycleManager.open(secondaryPath);
+
+            // Get dataset and partition information
+            LocalResource resource = mockResources.get(secondaryPath);
+            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
+            int datasetId = datasetLocalResource.getDatasetId();
+            int partition = datasetLocalResource.getPartition();
+
+            // Get the operation tracker after opening
+            DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+            Object secondaryOpTracker = null;
+            try {
+                // Use reflection to get the datasetPrimaryOpTrackers map
+                Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+                opTrackersField.setAccessible(true);
+                Map<Integer, ILSMOperationTracker> opTrackers =
+                        (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
+
+                // Get the tracker for this partition
+                secondaryOpTracker = opTrackers.get(partition);
+                assertNotNull("Operation tracker should exist for secondary index partition " + partition,
+                        secondaryOpTracker);
+
+                LOGGER.debug("After opening secondary: Found tracker for dataset {} partition {}: {}", datasetId,
+                        partition, System.identityHashCode(secondaryOpTracker));
+            } catch (Exception e) {
+                LOGGER.error("Failed to access opTrackers field via reflection", e);
+                fail("Failed to access operation trackers: " + e.getMessage());
+            }
+
+            // Verify this is the same tracker as the one used by the primary index
+            Object originalTracker = datasetToPartitionToTracker.get(datasetId).get(partition);
+            assertSame("Operation tracker should be reused for the same dataset/partition", originalTracker,
+                    secondaryOpTracker);
+        }
+
+        // Step 5: Verify that different partitions of the same dataset have different operation trackers
+        for (Map.Entry<Integer, Map<Integer, Object>> datasetEntry : datasetToPartitionToTracker.entrySet()) {
+            int datasetId = datasetEntry.getKey();
+            Map<Integer, Object> partitionTrackers = datasetEntry.getValue();
+
+            if (partitionTrackers.size() > 1) {
+                LOGGER.debug("Verifying different trackers for different partitions in dataset {}", datasetId);
+
+                // Collect all the tracker instances for this dataset
+                Set<Object> uniqueTrackers = new HashSet<>(partitionTrackers.values());
+
+                // The number of unique trackers should equal the number of partitions
+                assertEquals("Each partition should have its own operation tracker", partitionTrackers.size(),
+                        uniqueTrackers.size());
+            }
+        }
+
+        // Step 6: Verify that different datasets have different operation trackers for the same partition
+        if (datasetToPartitionToTracker.size() > 1) {
+            for (int partition = 0; partition < NUM_PARTITIONS; partition++) {
+                final int partitionId = partition;
+
+                // Collect all trackers for this partition across different datasets
+                List<Object> trackersForPartition = datasetToPartitionToTracker.entrySet().stream()
+                        .filter(entry -> entry.getValue().containsKey(partitionId))
+                        .map(entry -> entry.getValue().get(partitionId)).collect(Collectors.toList());
+
+                if (trackersForPartition.size() > 1) {
+                    LOGGER.debug("Verifying different trackers across datasets for partition {}", partitionId);
+
+                    // All trackers should be unique (no sharing between datasets)
+                    Set<Object> uniqueTrackers = new HashSet<>(trackersForPartition);
+                    assertEquals("Each dataset should have its own operation tracker for partition " + partitionId,
+                            trackersForPartition.size(), uniqueTrackers.size());
+                }
+            }
+        }
+
+        // Step 7: Clean up - close all indexes
+        for (String path : new ArrayList<>(registeredIndexes.keySet())) {
+            try {
+                datasetLifecycleManager.close(path);
+            } catch (Exception e) {
+                LOGGER.error("Error closing index {}", path, e);
+            }
+        }
+
+        LOGGER.info(
+                "Completed testPrimaryOperationTrackerSingularity: verified unique trackers for each dataset/partition combination");
+    }
+
+    /**
+     * Tests that resources cannot be opened after unregistering unless re-registered first.
+     * This test verifies:
+     * 1. Resources must be registered before they can be opened
+     * 2. After unregistering, resources cannot be opened until re-registered
+     * 3. Re-registration of an unregistered resource works correctly
+     */
+    @Test
+    public void testCannotOpenUnregisteredResource() throws Exception {
+        LOGGER.info("Starting testCannotOpenUnregisteredResource");
+
+        String resourcePath = createdResourcePaths.get(0);
+
+        // Register resource first
+        IIndex index = registerIfAbsentIndex(resourcePath);
+        assertNotNull("Index should be registered successfully", index);
+        LOGGER.debug("Registered resource {}", resourcePath);
+
+        // Unregister it
+        datasetLifecycleManager.unregister(resourcePath);
+        LOGGER.debug("Unregistered resource {}", resourcePath);
+
+        // Now try to open it - should fail
+        try {
+            datasetLifecycleManager.open(resourcePath);
+            fail("Should not be able to open an unregistered resource");
+        } catch (HyracksDataException e) {
+            LOGGER.debug("Caught expected exception: {}", e.getMessage());
+            assertTrue("Exception should mention index not existing", e.getMessage().contains("does not exist"));
+        }
+
+        // Re-register should work
+        MockIndex mockIndex = mockIndexes.get(resourcePath);
+        IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, mockIndex);
+        assertNotNull("Index should be re-registered successfully", reRegisteredIndex);
+        LOGGER.debug("Re-registered resource {}", resourcePath);
+
+        // Now open should work
+        datasetLifecycleManager.open(resourcePath);
+        LOGGER.debug("Successfully opened resource after re-registering");
+
+        LOGGER.info("Completed testCannotOpenUnregisteredResource");
+    }
+
+    /**
+     * Tests the full lifecycle of operation trackers to ensure they're properly created,
+     * maintained throughout the index lifecycle, and cleaned up at the appropriate time.
+     * This test verifies that:
+     * 1. Operation trackers are created when indexes are opened for the first time
+     * 2. Operation trackers remain stable during index usage
+     * 3. Operation trackers are properly shared among indexes of the same dataset/partition
+     * 4. Operation trackers are not prematurely nullified or cleared
+     * 5. Operation trackers are correctly removed when all indexes of a dataset/partition are unregistered
+     */
+    @Test
+    public void testOperationTrackerLifecycle() throws Exception {
+        // Register primary index
+        String dsName = "ds0";
+        int dsId = 101; // Updated from 0 to 101 to match our new datasetId scheme
+        int partition = 0;
+        String primaryIndexName = dsName;
+        String secondaryIndexName = dsName + "_idx";
+        String primaryResourcePath = getResourcePath(partition, dsName, 0, primaryIndexName);
+        String secondaryResourcePath = getResourcePath(partition, dsName, 1, secondaryIndexName);
+
+        LOGGER.debug("Registering primary index at {}", primaryResourcePath);
+        MockIndex mockPrimaryIndex = mockIndexes.get(primaryResourcePath);
+        IIndex registeredIndex = datasetLifecycleManager.registerIfAbsent(primaryResourcePath, mockPrimaryIndex);
+        assertNotNull("Index should be registered successfully", registeredIndex);
+
+        // Get the dataset resource and check if it has been created
+        DatasetResource datasetResource = datasetResourceMap.get(dsId);
+        assertNotNull("Dataset resource should be created during registration", datasetResource);
+
+        // Verify there's no operation tracker before opening
+        ILSMOperationTracker trackerBeforeOpen = getOperationTracker(datasetResource, partition);
+        LOGGER.debug("Operation tracker before open: {}", trackerBeforeOpen);
+
+        // Open primary index which should create and register an operation tracker
+        LOGGER.debug("Opening primary index at {}", primaryResourcePath);
+        datasetLifecycleManager.open(primaryResourcePath);
+
+        // Verify operation tracker exists after opening
+        ILSMOperationTracker trackerAfterOpen = getOperationTracker(datasetResource, partition);
+        LOGGER.debug("Operation tracker after primary open: {}", trackerAfterOpen);
+        assertNotNull("Operation tracker should be created after opening primary index", trackerAfterOpen);
+
+        // Verify the tracker is from our mock index
+        MockIndex mockIndex = mockIndexes.get(primaryResourcePath);
+        assertTrue("Mock index should be activated after open", mockIndex.isActivated());
+
+        // Compare with the mock tracker from the index
+        ILSMOperationTracker mockTracker = mockIndex.getOperationTracker();
+        LOGGER.debug("Mock tracker from index: {}", mockTracker);
+        assertEquals("Operation tracker should be the one from the mock index", mockTracker, trackerAfterOpen);
+
+        // Open secondary index which should reuse the same operation tracker
+        LOGGER.debug("Registering secondary index at {}", secondaryResourcePath);
+        MockIndex mockSecondaryIndex = mockIndexes.get(secondaryResourcePath);
+        datasetLifecycleManager.registerIfAbsent(secondaryResourcePath, mockSecondaryIndex);
+
+        LOGGER.debug("Opening secondary index at {}", secondaryResourcePath);
+        datasetLifecycleManager.open(secondaryResourcePath);
+
+        // Verify operation tracker is still the same after opening secondary
+        ILSMOperationTracker trackerAfterSecondaryOpen = getOperationTracker(datasetResource, partition);
+        LOGGER.debug("Operation tracker after secondary open: {}", trackerAfterSecondaryOpen);
+        assertNotNull("Operation tracker should still exist after opening secondary index", trackerAfterSecondaryOpen);
+        assertEquals("Should still be the same operation tracker", trackerAfterOpen, trackerAfterSecondaryOpen);
+
+        // Close primary index - should keep the tracker as secondary is still open
+        LOGGER.debug("Closing primary index at {}", primaryResourcePath);
+        datasetLifecycleManager.close(primaryResourcePath);
+
+        // Verify operation tracker still exists
+        ILSMOperationTracker trackerAfterPrimaryClose = getOperationTracker(datasetResource, partition);
+        LOGGER.debug("Operation tracker after primary close: {}", trackerAfterPrimaryClose);
+        assertNotNull("Operation tracker should still exist after closing primary index", trackerAfterPrimaryClose);
+        assertEquals("Should still be the same operation tracker", trackerAfterOpen, trackerAfterPrimaryClose);
+
+        // Close secondary index - should remove the tracker as all indexes are closed
+        LOGGER.debug("Closing secondary index at {}", secondaryResourcePath);
+        datasetLifecycleManager.close(secondaryResourcePath);
+
+        // Verify operation tracker is removed
+        ILSMOperationTracker trackerAfterSecondaryClose = getOperationTracker(datasetResource, partition);
+        LOGGER.debug("Operation tracker after secondary close: {}", trackerAfterSecondaryClose);
+
+        // Unregister indexes
+        LOGGER.debug("Unregistering primary index at {}", primaryResourcePath);
+        datasetLifecycleManager.unregister(primaryResourcePath);
+
+        LOGGER.debug("Unregistering secondary index at {}", secondaryResourcePath);
+        datasetLifecycleManager.unregister(secondaryResourcePath);
+
+        // Verify dataset resource is removed
+        DatasetResource datasetResourceAfterUnregister = datasetResourceMap.get(dsId);
+        LOGGER.debug("Dataset resource after unregister: {}", datasetResourceAfterUnregister);
+    }
+
+    /**
+     * Helper method to get the operation tracker for a dataset/partition.
+     * Uses reflection to access the private datasetPrimaryOpTrackers map in DatasetResource.
+     */
+    private ILSMOperationTracker getOperationTracker(DatasetResource datasetResource, int partition) {
+        try {
+            // Access the datasetPrimaryOpTrackers map through reflection
+            Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+            opTrackersField.setAccessible(true);
+            @SuppressWarnings("unchecked")
+            Map<Integer, ILSMOperationTracker> opTrackers =
+                    (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
+
+            // Return the operation tracker for this partition, if it exists
+            if (opTrackers != null) {
+                ILSMOperationTracker tracker = opTrackers.get(partition);
+                if (tracker != null) {
+                    // Make sure we're getting a PrimaryIndexOperationTracker
+                    if (!(tracker instanceof PrimaryIndexOperationTracker)) {
+                        LOGGER.warn("Tracker is not a PrimaryIndexOperationTracker: {}", tracker.getClass().getName());
+                    }
+                }
+                return tracker;
+            }
+        } catch (Exception e) {
+            LOGGER.error("Failed to get operation tracker via reflection", e);
+        }
+        return null;
+    }
+
+    /**
+     * Tests that operation trackers are properly attached to indexes and survive open and close operations.
+     * This test verifies:
+     * 1. Operation trackers are properly created and associated with indexes
+     * 2. Operation trackers remain valid during the index lifecycle
+     * 3. Multiple opens and closes don't invalidate the operation tracker
+     */
+    @Test
+    public void testOperationTrackerAttachment() throws Exception {
+        LOGGER.info("Starting testOperationTrackerAttachment");
+
+        // Choose a specific resource path to test with
+        String resourcePath = createdResourcePaths.get(0);
+
+        // Get dataset and partition information for this resource
+        LocalResource resource = mockResources.get(resourcePath);
+        DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
+        int datasetId = datasetLocalResource.getDatasetId();
+        int partition = datasetLocalResource.getPartition();
+
+        // Register the resource first using our helper
+        IIndex index = registerIfAbsentIndex(resourcePath);
+        assertNotNull("Index should be registered successfully", index);
+        LOGGER.debug("Registered resource {}", resourcePath);
+
+        // Get the dataset resource
+        DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+        assertNotNull("DatasetResource should exist after registration", datasetResource);
+
+        // Verify no operation tracker exists before opening
+        ILSMOperationTracker trackerBeforeOpen = getOperationTracker(datasetResource, partition);
+        LOGGER.debug("Operation tracker before open: {}", trackerBeforeOpen);
+
+        // Open the index - this should create and attach an operation tracker
+        datasetLifecycleManager.open(resourcePath);
+        LOGGER.debug("Opened resource {}", resourcePath);
+
+        // Verify the operation tracker exists and is attached to the MockIndex
+        ILSMOperationTracker trackerAfterOpen = getOperationTracker(datasetResource, partition);
+        assertNotNull("Operation tracker should exist after open", trackerAfterOpen);
+        LOGGER.debug("Operation tracker after open: {}", trackerAfterOpen);
+
+        // Get the mock index and verify its tracker matches
+        MockIndex mockIndex = mockIndexes.get(resourcePath);
+        assertTrue("Mock index should be activated", mockIndex.isActivated());
+
+        // Verify the mock index has the same operation tracker
+        ILSMOperationTracker indexTracker = mockIndex.getOperationTracker();
+        assertSame("Mock index should have the same operation tracker", trackerAfterOpen, indexTracker);
+
+        // Close and unregister for cleanup
+        datasetLifecycleManager.close(resourcePath);
+        datasetLifecycleManager.unregister(resourcePath);
+
+        LOGGER.info("Completed testOperationTrackerAttachment");
+    }
+
+    /**
+     * A mock implementation of ILSMIndex for testing
+     */
+    private static class MockIndex implements ILSMIndex {
+        private final String resourcePath;
+        private final int datasetId;
+        private boolean activated = false;
+        // Create a mock of the specific PrimaryIndexOperationTracker class instead of the generic interface
+        private PrimaryIndexOperationTracker mockTracker = mock(PrimaryIndexOperationTracker.class);
+        private final Map<Integer, DatasetResource> datasetResourceMap;
+
+        public MockIndex(String resourcePath, int datasetId, Map<Integer, DatasetResource> datasetResourceMap) {
+            this.resourcePath = resourcePath;
+            this.datasetId = datasetId;
+            this.datasetResourceMap = datasetResourceMap;
+        }
+
+        @Override
+        public void create() throws HyracksDataException {
+
+        }
+
+        @Override
+        public void activate() {
+            LOGGER.debug("Activating {}", this);
+            activated = true;
+        }
+
+        @Override
+        public void clear() throws HyracksDataException {
+
+        }
+
+        @Override
+        public void deactivate() throws HyracksDataException {
+            LOGGER.debug("Deactivating {}", this);
+            activated = false;
+        }
+
+        @Override
+        public void destroy() throws HyracksDataException {
+
+        }
+
+        @Override
+        public void purge() throws HyracksDataException {
+
+        }
+
+        @Override
+        public void deactivate(boolean flushOnExit) {
+            LOGGER.debug("Deactivating {} with flushOnExit={}", this, flushOnExit);
+            activated = false;
+        }
+
+        @Override
+        public ILSMIndexAccessor createAccessor(IIndexAccessParameters iap) throws HyracksDataException {
+            return null;
+        }
+
+        @Override
+        public void validate() throws HyracksDataException {
+
+        }
+
+        @Override
+        public IBufferCache getBufferCache() {
+            return null;
+        }
+
+        @Override
+        public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
+                boolean checkIfEmptyIndex, IPageWriteCallback callback) throws HyracksDataException {
+            return null;
+        }
+
+        @Override
+        public int getNumOfFilterFields() {
+            return 0;
+        }
+
+        public boolean isActivated() {
+            return activated;
+        }
+
+        @Override
+        public String toString() {
+            // Ensure this matches exactly the resourcePath used for registration
+            return resourcePath;
+        }
+
+        @Override
+        public boolean isCurrentMutableComponentEmpty() {
+            return true;
+        }
+
+        @Override
+        public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> diskComponents,
+                IReplicationJob.ReplicationOperation operation, LSMOperationType opType) throws HyracksDataException {
+
+        }
+
+        @Override
+        public boolean isMemoryComponentsAllocated() {
+            return false;
+        }
+
+        @Override
+        public void allocateMemoryComponents() throws HyracksDataException {
+
+        }
+
+        @Override
+        public ILSMMemoryComponent getCurrentMemoryComponent() {
+            return null;
+        }
+
+        @Override
+        public int getCurrentMemoryComponentIndex() {
+            return 0;
+        }
+
+        @Override
+        public List<ILSMMemoryComponent> getMemoryComponents() {
+            return List.of();
+        }
+
+        @Override
+        public boolean isDurable() {
+            return false;
+        }
+
+        @Override
+        public void updateFilter(ILSMIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException {
+
+        }
+
+        @Override
+        public ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException {
+            return null;
+        }
+
+        @Override
+        public int getNumberOfAllMemoryComponents() {
+            return 0;
+        }
+
+        @Override
+        public ILSMHarness getHarness() {
+            return null;
+        }
+
+        @Override
+        public String getIndexIdentifier() {
+            return "";
+        }
+
+        @Override
+        public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
+                boolean checkIfEmptyIndex, Map<String, Object> parameters) throws HyracksDataException {
+            return null;
+        }
+
+        @Override
+        public void resetCurrentComponentIndex() {
+
+        }
+
+        @Override
+        public IIndexDiskCacheManager getDiskCacheManager() {
+            return null;
+        }
+
+        @Override
+        public void scheduleCleanup(List<ILSMDiskComponent> inactiveDiskComponents) throws HyracksDataException {
+
+        }
+
+        @Override
+        public boolean isAtomic() {
+            return false;
+        }
+
+        @Override
+        public void commit() throws HyracksDataException {
+
+        }
+
+        @Override
+        public void abort() throws HyracksDataException {
+
+        }
+
+        @Override
+        public ILSMMergePolicy getMergePolicy() {
+            return null;
+        }
+
+        @Override
+        public ILSMOperationTracker getOperationTracker() {
+            // This is the key method that DatasetLifecycleManager calls during open()
+            // to get the operation tracker and register it
+
+            LOGGER.debug("getOperationTracker() called for index: {}", resourcePath);
+
+            // Get dataset ID and partition from the resource path
+            int partition = -1;
+
+            // Extract partition from resource path (storage/partition_X/...)
+            String[] parts = resourcePath.split("/");
+            for (int i = 0; i < parts.length; i++) {
+                if (parts[i].startsWith("partition_")) {
+                    try {
+                        partition = Integer.parseInt(parts[i].substring("partition_".length()));
+                        break;
+                    } catch (NumberFormatException e) {
+                        LOGGER.warn("Failed to parse partition number from {}", parts[i]);
+                    }
+                }
+            }
+
+            if (partition != -1) {
+                // Find the dataset resource from our map
+                DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+                if (datasetResource != null) {
+                    try {
+                        // Access the datasetPrimaryOpTrackers map through reflection
+                        Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+                        opTrackersField.setAccessible(true);
+                        @SuppressWarnings("unchecked")
+                        Map<Integer, ILSMOperationTracker> opTrackers =
+                                (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
+
+                        // Manual registration - simulate what DatasetLifecycleManager would do
+                        if (opTrackers != null && !opTrackers.containsKey(partition)) {
+                            LOGGER.debug(
+                                    "Directly adding tracker to datasetPrimaryOpTrackers for dataset {}, partition {}: {}",
+                                    datasetId, partition, mockTracker);
+                            opTrackers.put(partition, mockTracker);
+                        }
+                    } catch (Exception e) {
+                        LOGGER.error("Failed to access datasetPrimaryOpTrackers via reflection", e);
+                    }
+                }
+            }
+
+            return mockTracker;
+        }
+
+        @Override
+        public ILSMIOOperationCallback getIOOperationCallback() {
+            return null;
+        }
+
+        @Override
+        public List<ILSMDiskComponent> getDiskComponents() {
+            return List.of();
+        }
+
+        @Override
+        public boolean isPrimaryIndex() {
+            return false;
+        }
+
+        @Override
+        public void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException {
+
+        }
+
+        @Override
+        public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
+                throws HyracksDataException {
+
+        }
+
+        @Override
+        public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException {
+
+        }
+
+        @Override
+        public ILSMIOOperation createFlushOperation(ILSMIndexOperationContext ctx) throws HyracksDataException {
+            return null;
+        }
+
+        @Override
+        public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException {
+            return null;
+        }
+
+        @Override
+        public ILSMIOOperation createMergeOperation(ILSMIndexOperationContext ctx) throws HyracksDataException {
+            return null;
+        }
+
+        @Override
+        public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException {
+            return null;
+        }
+
+        @Override
+        public void addDiskComponent(ILSMDiskComponent index) throws HyracksDataException {
+
+        }
+
+        @Override
+        public void addBulkLoadedDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
+
+        }
+
+        @Override
+        public void subsumeMergedComponents(ILSMDiskComponent newComponent, List<ILSMComponent> mergedComponents)
+                throws HyracksDataException {
+
+        }
+
+        @Override
+        public void changeMutableComponent() {
+
+        }
+
+        @Override
+        public void changeFlushStatusForCurrentMutableCompoent(boolean needsFlush) {
+
+        }
+
+        @Override
+        public boolean hasFlushRequestForCurrentMutableComponent() {
+            return false;
+        }
+
+        @Override
+        public void getOperationalComponents(ILSMIndexOperationContext ctx) throws HyracksDataException {
+
+        }
+
+        @Override
+        public List<ILSMDiskComponent> getInactiveDiskComponents() {
+            return List.of();
+        }
+
+        @Override
+        public void addInactiveDiskComponent(ILSMDiskComponent diskComponent) {
+
+        }
+
+        @Override
+        public List<ILSMMemoryComponent> getInactiveMemoryComponents() {
+            return List.of();
+        }
+
+        @Override
+        public void addInactiveMemoryComponent(ILSMMemoryComponent memoryComponent) {
+
+        }
+
+        // Additional method implementations required by the interface
+        // would be implemented here with minimal functionality
+
+        /**
+         * Sets the operation tracker for this mock index.
+         * Added to support DatasetLifecycleManagerLazyRecoveryTest use cases.
+         */
+        public void setOperationTracker(PrimaryIndexOperationTracker tracker) {
+            this.mockTracker = tracker;
+        }
+    }
+
+    /**
+     * Enhanced version of the MockIndex implementation that provides better
+     * tracking of operation tracker state changes.
+     */
+    private static class EnhancedMockIndex extends MockIndex {
+        private PrimaryIndexOperationTracker assignedTracker;
+        private final List<String> trackerEvents = new ArrayList<>();
+
+        public EnhancedMockIndex(String resourcePath, int datasetId, Map<Integer, DatasetResource> datasetResourceMap) {
+            super(resourcePath, datasetId, datasetResourceMap);
+        }
+
+        @Override
+        public ILSMOperationTracker getOperationTracker() {
+            PrimaryIndexOperationTracker tracker = (PrimaryIndexOperationTracker) super.getOperationTracker();
+
+            if (assignedTracker == null) {
+                assignedTracker = tracker;
+                trackerEvents.add("Initial tracker assignment: " + tracker);
+            } else if (assignedTracker != tracker) {
+                trackerEvents.add("Tracker changed from " + assignedTracker + " to " + tracker);
+                assignedTracker = tracker;
+            }
+
+            return tracker;
+        }
+
+        public List<String> getTrackerEvents() {
+            return trackerEvents;
+        }
+    }
+
+    /**
+     * Tests specific race conditions that could lead to null operation trackers during unregistration.
+     * This test verifies:
+     * 1. Concurrent registration, opening, and unregistration don't create null operation trackers
+     * 2. The operation tracker remains valid throughout concurrent operations
+     * 3. Operation tracker cleanup occurs only when appropriate
+     */
+    @Test
+    public void testRaceConditionsWithOperationTracker() throws Exception {
+        LOGGER.info("Starting testRaceConditionsWithOperationTracker");
+
+        // Create a resource path for testing
+        String resourcePath = createdResourcePaths.get(0);
+
+        // Get dataset and partition information for this resource
+        LocalResource resource = mockResources.get(resourcePath);
+        DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
+        int datasetId = datasetLocalResource.getDatasetId();
+        int partition = datasetLocalResource.getPartition();
+
+        // Setup an enhanced mock index for this test to track operation tracker events
+        EnhancedMockIndex enhancedMockIndex = new EnhancedMockIndex(resourcePath, datasetId, datasetResourceMap);
+        mockIndexes.put(resourcePath, enhancedMockIndex);
+
+        // Register the resource to make it available
+        IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, enhancedMockIndex);
+        assertNotNull("Index should be registered successfully", index);
+        assertSame("Should get our enhanced mock index", enhancedMockIndex, index);
+
+        // Create multiple threads that will perform random operations on the same resource
+        final int NUM_CONCURRENT_THREADS = 10;
+        final int OPERATIONS_PER_THREAD = 20;
+        final CyclicBarrier startBarrier = new CyclicBarrier(NUM_CONCURRENT_THREADS);
+        final CountDownLatch completionLatch = new CountDownLatch(NUM_CONCURRENT_THREADS);
+
+        for (int i = 0; i < NUM_CONCURRENT_THREADS; i++) {
+            final int threadId = i;
+            executorService.submit(() -> {
+                try {
+                    startBarrier.await();
+                    Random random = new Random();
+
+                    for (int op = 0; op < OPERATIONS_PER_THREAD; op++) {
+                        // Perform a random operation:
+                        // 0 = check index existence
+                        // 1 = open
+                        // 2 = close
+                        // 3 = register
+                        // 4 = check operation tracker
+                        int operation = random.nextInt(5);
+
+                        switch (operation) {
+                            case 0:
+                                // Check if index exists
+                                IIndex existingIndex = datasetLifecycleManager.get(resourcePath);
+                                LOGGER.debug("Thread {} checked index existence: {}", threadId, existingIndex != null);
+                                break;
+                            case 1:
+                                // Open the index
+                                try {
+                                    datasetLifecycleManager.open(resourcePath);
+                                    LOGGER.debug("Thread {} opened index", threadId);
+                                } catch (Exception e) {
+                                    // This is expected occasionally since the resource might be unregistered
+                                    LOGGER.debug("Thread {} failed to open index: {}", threadId, e.getMessage());
+                                }
+                                break;
+                            case 2:
+                                // Close the index
+                                try {
+                                    datasetLifecycleManager.close(resourcePath);
+                                    LOGGER.debug("Thread {} closed index", threadId);
+                                } catch (Exception e) {
+                                    // This is expected occasionally
+                                    LOGGER.debug("Thread {} failed to close index: {}", threadId, e.getMessage());
+                                }
+                                break;
+                            case 3:
+                                // Register or re-register the index
+                                try {
+                                    IIndex reregisteredIndex =
+                                            datasetLifecycleManager.registerIfAbsent(resourcePath, enhancedMockIndex);
+                                    LOGGER.debug("Thread {} registered index: {}", threadId, reregisteredIndex != null);
+                                } catch (Exception e) {
+                                    LOGGER.debug("Thread {} failed to register index: {}", threadId, e.getMessage());
+                                }
+                                break;
+                            case 4:
+                                // Check operation tracker
+                                try {
+                                    // Get the dataset resource
+                                    DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+                                    if (datasetResource != null) {
+                                        // Get and verify the operation tracker
+                                        ILSMOperationTracker tracker = getOperationTracker(datasetResource, partition);
+                                        if (tracker instanceof PrimaryIndexOperationTracker) {
+                                            LOGGER.debug("Thread {} found valid PrimaryIndexOperationTracker: {}",
+                                                    threadId, tracker);
+                                        } else if (tracker != null) {
+                                            LOGGER.warn("Thread {} found non-PrimaryIndexOperationTracker: {}",
+                                                    threadId, tracker.getClass().getName());
+                                        } else {
+                                            LOGGER.debug("Thread {} found no operation tracker", threadId);
+                                        }
+                                    }
+                                } catch (Exception e) {
+                                    LOGGER.debug("Thread {} error checking operation tracker: {}", threadId,
+                                            e.getMessage());
+                                }
+                                break;
+                        }
+
+                        // Small delay to increase chance of race conditions
+                        Thread.sleep(random.nextInt(5));
+                    }
+
+                    LOGGER.debug("Thread {} completed all operations", threadId);
+                    completionLatch.countDown();
+                } catch (Exception e) {
+                    LOGGER.error("Thread {} error during test: {}", threadId, e.getMessage(), e);
+                    completionLatch.countDown();
+                }
+            });
+        }
+
+        // Wait for all threads to complete
+        boolean allCompleted = completionLatch.await(30, TimeUnit.SECONDS);
+        if (!allCompleted) {
+            LOGGER.warn("Not all threads completed in time");
+        }
+
+        // Get the tracker events to verify behavior
+        List<String> trackerEvents = enhancedMockIndex.getTrackerEvents();
+        LOGGER.debug("Tracker events recorded: {}", trackerEvents.size());
+        for (String event : trackerEvents) {
+            LOGGER.debug("Tracker event: {}", event);
+        }
+
+        // Verify the index is still registered at the end
+        IIndex finalIndex = datasetLifecycleManager.get(resourcePath);
+        if (finalIndex != null) {
+            // Get the final operation tracker state if the index is still registered
+            DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+            if (datasetResource != null) {
+                ILSMOperationTracker finalTracker = getOperationTracker(datasetResource, partition);
+                LOGGER.debug("Final operation tracker state: {}", finalTracker);
+
+                // If there's a valid tracker, it should be a PrimaryIndexOperationTracker
+                if (finalTracker != null) {
+                    assertTrue("Final tracker should be a PrimaryIndexOperationTracker",
+                            finalTracker instanceof PrimaryIndexOperationTracker);
+                }
+            }
+        }
+
+        LOGGER.info("Completed testRaceConditionsWithOperationTracker");
+    }
+
+    // Helper method to ensure consistent register approach
+    public IIndex registerIfAbsentIndex(String resourcePath) throws HyracksDataException {
+        MockIndex mockIndex = mockIndexes.get(resourcePath);
+        IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, mockIndex);
+        LOGGER.debug("Registered {} with mock index {}", resourcePath, mockIndex);
+        return index;
+    }
+
+    @Test
+    public void testBalancedOpenCloseUnregister() throws Exception {
+        LOGGER.info("Starting testBalancedOpenCloseUnregister");
+
+        // Select a resource to test with
+        String resourcePath = createdResourcePaths.get(0);
+
+        // Register the index
+        IIndex index = registerIfAbsentIndex(resourcePath);
+        assertNotNull("Index should be registered successfully", index);
+        LOGGER.info("Registered index: {}", resourcePath);
+
+        // Number of times to open/close
+        final int numOperations = 5;
+
+        // Open the index multiple times
+        for (int i = 0; i < numOperations; i++) {
+            datasetLifecycleManager.open(resourcePath);
+            LOGGER.info("Opened index {} (#{}/{})", resourcePath, i + 1, numOperations);
+        }
+
+        // Close the index the same number of times
+        for (int i = 0; i < numOperations; i++) {
+            datasetLifecycleManager.close(resourcePath);
+            LOGGER.info("Closed index {} (#{}/{})", resourcePath, i + 1, numOperations);
+        }
+
+        // At this point, reference count should be balanced and unregistration should succeed
+        try {
+            datasetLifecycleManager.unregister(resourcePath);
+            LOGGER.info("Successfully unregistered index {} after balanced open/close operations", resourcePath);
+
+            // After unregistration, simulate real behavior by making the repository return null for this path
+            // This is what would happen in a real environment - the resource would be removed from storage
+            mockResources.remove(resourcePath);
+            LOGGER.info("Removed resource {} from mock resources", resourcePath);
+        } catch (HyracksDataException e) {
+            fail("Failed to unregister index after balanced open/close: " + e.getMessage());
+        }
+
+        // Verify the index is actually unregistered by attempting to get it
+        IIndex retrievedIndex = datasetLifecycleManager.get(resourcePath);
+        assertNull("Index should no longer be registered after unregistration", retrievedIndex);
+
+        // Verify the resource is no longer in our mock repository
+        LocalResource retrievedResource = mockResourceRepository.get(resourcePath);
+        assertNull("Resource should be removed from repository after unregistration", retrievedResource);
+
+        // Try to open the unregistered index - should fail
+        boolean openFailed = false;
+        try {
+            datasetLifecycleManager.open(resourcePath);
+        } catch (HyracksDataException e) {
+            openFailed = true;
+            LOGGER.info("As expected, failed to open unregistered index: {}", e.getMessage());
+            assertTrue("Error should indicate the index doesn't exist",
+                    e.getMessage().contains("Index does not exist"));
+        }
+        assertTrue("Opening an unregistered index should fail", openFailed);
+    }
+
+    // Add these accessor methods at the end of the class
+
+    /**
+     * Returns the dataset lifecycle manager for testing
+     */
+    public DatasetLifecycleManager getDatasetLifecycleManager() {
+        return datasetLifecycleManager;
+    }
+
+    /**
+     * Returns the mock service context for testing
+     */
+    public INCServiceContext getMockServiceContext() {
+        return mockServiceContext;
+    }
+
+    /**
+     * Returns the list of created resource paths
+     */
+    public List<String> getCreatedResourcePaths() {
+        return createdResourcePaths;
+    }
+
+    /**
+     * Returns the map of mock resources
+     */
+    public Map<String, LocalResource> getMockResources() {
+        return mockResources;
+    }
+
+    /**
+     * Returns the mock recovery manager
+     */
+    public IRecoveryManager getMockRecoveryManager() {
+        return mockRecoveryManager;
+    }
+
+    /**
+     * Returns the mock index for a path
+     */
+    public MockIndex getMockIndex(String resourcePath) {
+        return mockIndexes.get(resourcePath);
+    }
+
+    /**
+     * Returns the mock resource repository
+     */
+    public ILocalResourceRepository getMockResourceRepository() {
+        return mockResourceRepository;
+    }
+
+    /**
+     * Creates a mock index for a resource path and adds it to the mockIndexes map.
+     * This is used by DatasetLifecycleManagerLazyRecoveryTest to ensure consistent mock creation.
+     */
+    public void createMockIndexForPath(String resourcePath, int datasetId, int partition,
+            PrimaryIndexOperationTracker opTracker) throws HyracksDataException {
+        // Extract needed dataset ID from the path or use provided one
+        MockIndex mockIndex = new MockIndex(resourcePath, datasetId, datasetResourceMap);
+
+        // Set the operation tracker directly if provided
+        if (opTracker != null) {
+            mockIndex.setOperationTracker(opTracker);
+        }
+
+        // Store the mock index
+        mockIndexes.put(resourcePath, mockIndex);
+
+        // Create an answer that returns this mock index
+        DatasetLocalResource mockDatasetResource = null;
+        for (Map.Entry<String, LocalResource> entry : mockResources.entrySet()) {
+            if (entry.getKey().equals(resourcePath)) {
+                mockDatasetResource = (DatasetLocalResource) entry.getValue().getResource();
+                break;
+            }
+        }
+
+        if (mockDatasetResource != null) {
+            when(mockDatasetResource.createInstance(any())).thenReturn(mockIndex);
+        }
+
+        LOGGER.debug("Created mock index for {}: {}", resourcePath, mockIndex);
+    }
+}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerLazyRecoveryTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerLazyRecoveryTest.java
new file mode 100644
index 0000000..a4b5a55
--- /dev/null
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerLazyRecoveryTest.java
@@ -0,0 +1,1223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests the behavior of the DatasetLifecycleManager with lazy recovery enabled.
+ * This test class focuses on verifying that lazy recovery works correctly.
+ */
+public class DatasetLifecycleManagerLazyRecoveryTest {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private DatasetLifecycleManagerConcurrentTest concurrentTest;
+    private List<String> recoveredResourcePaths;
+    private Set<String> datasetPartitionPaths;
+    private Map<String, FileReference> pathToFileRefMap;
+    private ExecutorService executorService;
+
+    @Before
+    public void setUp() throws Exception {
+        // Create and initialize the base test
+        concurrentTest = new DatasetLifecycleManagerConcurrentTest();
+        concurrentTest.setUp();
+
+        // Enable lazy recovery
+        concurrentTest.setLazyRecoveryEnabled(true);
+        LOGGER.info("Lazy recovery enabled for all tests");
+
+        // Track recovered resources
+        recoveredResourcePaths = new ArrayList<>();
+        datasetPartitionPaths = new TreeSet<>();
+        pathToFileRefMap = new HashMap<>();
+
+        // Extract the dataset partition paths from the resource paths and log partition info
+        // These are the parent directories that contain multiple index files
+        // Format: storage/partition_X/DBName/ScopeName/DatasetName/ReplicationFactor/IndexName
+        // Note: The replication factor is ALWAYS 0 in production and should be 0 in all tests
+        Pattern pattern = Pattern.compile("(.*?/partition_(\\d+)/[^/]+/[^/]+/[^/]+/(\\d+))/.+");
+
+        // Map to collect resources by partition for logging
+        Map<Integer, List<String>> resourcesByPartition = new HashMap<>();
+
+        for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
+            Matcher matcher = pattern.matcher(resourcePath);
+            if (matcher.matches()) {
+                String partitionPath = matcher.group(1); // Now includes the replication factor
+                int partitionId = Integer.parseInt(matcher.group(2));
+                int replicationFactor = Integer.parseInt(matcher.group(3)); // Usually 0
+
+                LOGGER.info("Resource path: {}", resourcePath);
+                LOGGER.info("  Partition path: {}", partitionPath);
+                LOGGER.info("  Partition ID: {}", partitionId);
+                LOGGER.info("  Replication factor: {}", replicationFactor);
+
+                // Track resources by partition for logging
+                resourcesByPartition.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(resourcePath);
+
+                // Add to our partition paths for mocking
+                datasetPartitionPaths.add(partitionPath);
+
+                // Create a FileReference mock for this partition path
+                FileReference mockPartitionRef = mock(FileReference.class);
+                when(mockPartitionRef.getAbsolutePath()).thenReturn(partitionPath);
+                pathToFileRefMap.put(partitionPath, mockPartitionRef);
+            } else {
+                LOGGER.warn("Resource path doesn't match expected pattern: {}", resourcePath);
+            }
+        }
+
+        // Log resources by partition for debugging
+        LOGGER.info("Found {} dataset partition paths", datasetPartitionPaths.size());
+        for (Map.Entry<Integer, List<String>> entry : resourcesByPartition.entrySet()) {
+            LOGGER.info("Partition {}: {} resources", entry.getKey(), entry.getValue().size());
+            for (String path : entry.getValue()) {
+                LOGGER.info("  - {}", path);
+            }
+        }
+
+        // Mock the IOManager.resolve() method to return our mock FileReferences
+        IIOManager mockIOManager = concurrentTest.getMockServiceContext().getIoManager();
+        doAnswer(inv -> {
+            String path = inv.getArgument(0);
+            // For each possible partition path, check if this is the path being resolved
+            for (String partitionPath : datasetPartitionPaths) {
+                if (path.equals(partitionPath)) {
+                    LOGGER.info("Resolving path {} to FileReference", path);
+                    return pathToFileRefMap.get(partitionPath);
+                }
+            }
+            // If no match, create a new mock FileReference
+            FileReference mockFileRef = mock(FileReference.class);
+            when(mockFileRef.getAbsolutePath()).thenReturn(path);
+            when(mockFileRef.toString()).thenReturn(path);
+            return mockFileRef;
+        }).when(mockIOManager).resolve(anyString());
+
+        // Create executor service for concurrent tests
+        executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+    }
+
+    /**
+     * Tests that when an index is opened with lazy recovery enabled, 
+     * all resources under the same dataset partition path are recovered together.
+     * This verifies the batch recovery behavior for all resources with the same prefix path.
+     */
+    @Test
+    public void testLazyRecoveryForEntirePartition() throws Exception {
+        LOGGER.info("Starting testLazyRecoveryForEntirePartition");
+
+        // Get resources for one dataset partition
+        String datasetPartitionPath = datasetPartitionPaths.iterator().next();
+        List<String> resourcesInPartition = concurrentTest.getCreatedResourcePaths().stream()
+                .filter(path -> path.startsWith(datasetPartitionPath)).collect(Collectors.toList());
+
+        LOGGER.info("Testing partition path: {}", datasetPartitionPath);
+        LOGGER.info("Resources in this partition: {}", resourcesInPartition);
+
+        // Mock the resource repository to track calls to getResources
+        ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
+
+        // Setup capture of resources that get recovered - using FileReference
+        doAnswer(inv -> {
+            List<FileReference> refs = inv.getArgument(1);
+            if (refs != null && !refs.isEmpty()) {
+                FileReference fileRef = refs.get(0);
+                String rootPath = fileRef.getAbsolutePath();
+                LOGGER.info("Looking up resources with FileReference: {}", fileRef);
+                LOGGER.info("FileReference absolute path: {}", rootPath);
+
+                // The rootPath from DatasetLifecycleManager now includes the replication factor
+                // e.g., storage/partition_1/Default/SDefault/ds0/0
+                LOGGER.info("Using corrected rootPath for resource lookup: {}", rootPath);
+
+                // Get all resources that match this root path
+                Map<Long, LocalResource> matchingResources = new HashMap<>();
+
+                // Convert our string-keyed map to long-keyed map as expected by the API
+                // Note: We need to match exactly, not just prefix, because the rootPath now includes the replication factor
+                concurrentTest.getMockResources().entrySet().stream()
+                        .filter(entry -> entry.getKey().equals(rootPath) || entry.getKey().startsWith(rootPath + "/"))
+                        .forEach(entry -> {
+                            LocalResource resource = entry.getValue();
+                            matchingResources.put(resource.getId(), resource);
+                            // Record which resources were recovered for our test verification
+                            recoveredResourcePaths.add(entry.getKey());
+                            LOGGER.info("Including resource for recovery: {} (ID: {})", entry.getKey(),
+                                    resource.getId());
+                        });
+
+                LOGGER.info("Found {} resources to recover", matchingResources.size());
+
+                // Return resources mapped by ID as the real implementation would
+                return matchingResources;
+            }
+            return Map.of();
+        }).when(mockRepository).getResources(any(), anyList());
+
+        // Setup capture for recoverIndexes calls
+        doAnswer(inv -> {
+            List<ILSMIndex> indexes = inv.getArgument(0);
+            if (indexes != null && !indexes.isEmpty()) {
+                LOGGER.info("Recovering {} indexes", indexes.size());
+                for (ILSMIndex index : indexes) {
+                    LOGGER.info("Recovering index: {}", index);
+                }
+            }
+            return null;
+        }).when(concurrentTest.getMockRecoveryManager()).recoverIndexes(anyList());
+
+        // Choose the first resource in the partition to register and open
+        String resourcePath = resourcesInPartition.get(0);
+
+        // Register the index
+        IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
+        assertNotNull("Index should be registered successfully", index);
+        LOGGER.info("Successfully registered index at {}", resourcePath);
+
+        // Open the index - should trigger lazy recovery
+        concurrentTest.getDatasetLifecycleManager().open(resourcePath);
+        LOGGER.info("Successfully opened index at {}", resourcePath);
+
+        // Verify the recovery manager was consulted
+        verify(concurrentTest.getMockRecoveryManager(), atLeastOnce()).isLazyRecoveryEnabled();
+
+        // Verify that all resources in the partition were considered for recovery
+        LOGGER.info("Verifying recovery for resources in partition: {}", resourcesInPartition);
+        LOGGER.info("Resources marked as recovered: {}", recoveredResourcePaths);
+
+        for (String path : resourcesInPartition) {
+            boolean wasRecovered = isResourcePathRecovered(path);
+            LOGGER.info("Resource {} - recovered: {}", path, wasRecovered);
+            assertTrue("Resource should have been considered for recovery: " + path, wasRecovered);
+        }
+
+        // Cleanup
+        concurrentTest.getDatasetLifecycleManager().close(resourcePath);
+        concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
+        concurrentTest.getMockResources().remove(resourcePath);
+
+        // Verify cleanup was successful
+        assertNull("Index should no longer be registered",
+                concurrentTest.getDatasetLifecycleManager().get(resourcePath));
+    }
+
+    /**
+     * Tests that lazy recovery is only performed the first time an index is opened
+     * and is skipped on subsequent opens.
+     */
+    @Test
+    public void testLazyRecoveryOnlyHappensOnce() throws Exception {
+        LOGGER.info("Starting testLazyRecoveryOnlyHappensOnce");
+
+        // Get a resource path
+        String resourcePath = concurrentTest.getCreatedResourcePaths().get(0);
+        LOGGER.info("Testing with resource: {}", resourcePath);
+
+        // Find the dataset partition path for this resource
+        String partitionPath = null;
+        for (String path : datasetPartitionPaths) {
+            if (resourcePath.startsWith(path)) {
+                partitionPath = path;
+                break;
+            }
+        }
+        LOGGER.info("Resource belongs to partition: {}", partitionPath);
+
+        // Track recovery calls
+        List<String> recoveredPaths = new ArrayList<>();
+
+        // Mock repository to track resource lookups during recovery
+        final Map<Integer, Integer> recoveryCount = new HashMap<>();
+        recoveryCount.put(0, 0); // First recovery count
+        recoveryCount.put(1, 0); // Second recovery count
+
+        ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
+
+        // Setup capture of resources that get recovered
+        doAnswer(inv -> {
+            List<FileReference> refs = inv.getArgument(1);
+            if (refs != null && !refs.isEmpty()) {
+                FileReference fileRef = refs.get(0);
+                String rootPath = fileRef.getAbsolutePath();
+
+                // Get all resources that match this root path
+                Map<Long, LocalResource> matchingResources = new HashMap<>();
+
+                concurrentTest.getMockResources().entrySet().stream()
+                        .filter(entry -> entry.getKey().startsWith(rootPath)).forEach(entry -> {
+                            LocalResource resource = entry.getValue();
+                            matchingResources.put(resource.getId(), resource);
+                            recoveredPaths.add(entry.getKey());
+                        });
+
+                // Increment recovery counter based on which open we're on
+                if (recoveryCount.get(1) == 0) {
+                    recoveryCount.put(0, recoveryCount.get(0) + matchingResources.size());
+                    LOGGER.info("First open recovery finding {} resources", matchingResources.size());
+                } else {
+                    recoveryCount.put(1, recoveryCount.get(1) + matchingResources.size());
+                    LOGGER.info("Second open recovery finding {} resources", matchingResources.size());
+                }
+
+                return matchingResources;
+            }
+            return Map.of();
+        }).when(mockRepository).getResources(any(), anyList());
+
+        // Mock recovery behavior to track calls
+        doAnswer(inv -> {
+            List<ILSMIndex> indexes = inv.getArgument(0);
+            if (indexes != null && !indexes.isEmpty()) {
+                for (ILSMIndex index : indexes) {
+                    String path = index.toString();
+                    LOGGER.info("Recovering index: {}", path);
+                }
+            }
+            return null;
+        }).when(concurrentTest.getMockRecoveryManager()).recoverIndexes(anyList());
+
+        // Register the index
+        IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
+        assertNotNull("Index should be registered successfully", index);
+
+        // First open - should trigger recovery
+        LOGGER.info("First open of index");
+        concurrentTest.getDatasetLifecycleManager().open(resourcePath);
+        LOGGER.info("First open recovery found {} resources", recoveryCount.get(0));
+
+        // Close the index
+        concurrentTest.getDatasetLifecycleManager().close(resourcePath);
+
+        // Second open - should NOT trigger recovery again
+        LOGGER.info("Second open of index");
+        recoveryCount.put(1, 0); // Reset second recovery counter
+        concurrentTest.getDatasetLifecycleManager().open(resourcePath);
+        LOGGER.info("Second open recovery found {} resources", recoveryCount.get(1));
+
+        // Verify recovery was skipped on second open
+        assertTrue("First open should recover resources", recoveryCount.get(0) > 0);
+        assertTrue("Recovery should not happen on subsequent opens",
+                recoveryCount.get(1) == 0 || recoveryCount.get(1) < recoveryCount.get(0));
+
+        // Cleanup
+        concurrentTest.getDatasetLifecycleManager().close(resourcePath);
+        concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
+        concurrentTest.getMockResources().remove(resourcePath);
+    }
+
+    /**
+     * A comprehensive test that runs multiple operations with lazy recovery enabled.
+     * This test verifies that lazy recovery works correctly across various scenarios:
+     * - Opening multiple resources in the same partition
+     * - Operations across multiple partitions
+     * - Reference counting with multiple open/close
+     * - Proper unregistration after use
+     */
+    @Test
+    public void testComprehensiveLazyRecoveryWithMultipleResources() throws Exception {
+        LOGGER.info("Starting testComprehensiveLazyRecoveryWithMultipleResources");
+
+        // Create sets to track which resources have been recovered
+        Set<String> recoveredResources = new TreeSet<>();
+        Set<String> recoveredPartitions = new TreeSet<>();
+
+        // Mock repository to track resource lookups during recovery
+        ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
+
+        // Setup capture of resources that get recovered
+        doAnswer(inv -> {
+            List<FileReference> refs = inv.getArgument(1);
+            if (refs != null && !refs.isEmpty()) {
+                FileReference fileRef = refs.get(0);
+                String rootPath = fileRef.getAbsolutePath();
+                LOGGER.info("Looking up resources with root path: {}", rootPath);
+
+                // Track this partition as being recovered
+                recoveredPartitions.add(rootPath);
+
+                // Get all resources that match this root path
+                Map<Long, LocalResource> matchingResources = new HashMap<>();
+
+                concurrentTest.getMockResources().entrySet().stream()
+                        .filter(entry -> entry.getKey().startsWith(rootPath)).forEach(entry -> {
+                            LocalResource resource = entry.getValue();
+                            matchingResources.put(resource.getId(), resource);
+                            recoveredResources.add(entry.getKey());
+                            LOGGER.info("Including resource for recovery: {} (ID: {})", entry.getKey(),
+                                    resource.getId());
+                        });
+
+                return matchingResources;
+            }
+            return Map.of();
+        }).when(mockRepository).getResources(any(), anyList());
+
+        // Group resources by partition for easier testing
+        Map<String, List<String>> resourcesByPartition = new HashMap<>();
+
+        for (String partitionPath : datasetPartitionPaths) {
+            List<String> resourcesInPartition = concurrentTest.getCreatedResourcePaths().stream()
+                    .filter(path -> path.startsWith(partitionPath)).collect(Collectors.toList());
+
+            resourcesByPartition.put(partitionPath, resourcesInPartition);
+        }
+
+        LOGGER.info("Found {} partitions with resources", resourcesByPartition.size());
+
+        // For each partition, perform a series of operations
+        for (Map.Entry<String, List<String>> entry : resourcesByPartition.entrySet()) {
+            String partitionPath = entry.getKey();
+            List<String> resources = entry.getValue();
+
+            LOGGER.info("Testing partition: {} with {} resources", partitionPath, resources.size());
+
+            // 1. Register and open the first resource - should trigger recovery for all resources in this partition
+            String firstResource = resources.get(0);
+            LOGGER.info("Registering and opening first resource: {}", firstResource);
+
+            IIndex firstIndex = concurrentTest.registerIfAbsentIndex(firstResource);
+            assertNotNull("First index should be registered successfully", firstIndex);
+
+            // Open the index - should trigger lazy recovery for all resources in this partition
+            concurrentTest.getDatasetLifecycleManager().open(firstResource);
+
+            // Verify partition was recovered
+            assertTrue("Partition should be marked as recovered", recoveredPartitions.contains(partitionPath));
+
+            // 2. Register and open a second resource if available - should NOT trigger recovery again
+            if (resources.size() > 1) {
+                // Clear recovery tracking before opening second resource
+                int beforeSize = recoveredResources.size();
+                String secondResource = resources.get(1);
+
+                LOGGER.info("Registering and opening second resource: {}", secondResource);
+                IIndex secondIndex = concurrentTest.registerIfAbsentIndex(secondResource);
+                assertNotNull("Second index should be registered successfully", secondIndex);
+
+                // Open the second index - should NOT trigger lazy recovery again for this partition
+                concurrentTest.getDatasetLifecycleManager().open(secondResource);
+
+                // Verify no additional recovery happened
+                int afterSize = recoveredResources.size();
+                LOGGER.info("Resources recovered before: {}, after: {}", beforeSize, afterSize);
+                // We might see some recovery of the specific resource, but not the whole partition again
+
+                // 3. Open and close the second resource multiple times to test reference counting
+                for (int i = 0; i < 3; i++) {
+                    concurrentTest.getDatasetLifecycleManager().close(secondResource);
+                    concurrentTest.getDatasetLifecycleManager().open(secondResource);
+                }
+
+                // Close and unregister the second resource
+                concurrentTest.getDatasetLifecycleManager().close(secondResource);
+                concurrentTest.getDatasetLifecycleManager().unregister(secondResource);
+                LOGGER.info("Successfully closed and unregistered second resource: {}", secondResource);
+            }
+
+            // 4. Close and unregister the first resource
+            concurrentTest.getDatasetLifecycleManager().close(firstResource);
+            concurrentTest.getDatasetLifecycleManager().unregister(firstResource);
+            LOGGER.info("Successfully closed and unregistered first resource: {}", firstResource);
+
+            // Verify both resources are truly unregistered
+            for (String resource : resources.subList(0, Math.min(2, resources.size()))) {
+                assertNull("Resource should be unregistered: " + resource,
+                        concurrentTest.getDatasetLifecycleManager().get(resource));
+            }
+        }
+
+        // Summary of recovery
+        LOGGER.info("Recovered {} partitions out of {}", recoveredPartitions.size(), datasetPartitionPaths.size());
+        LOGGER.info("Recovered {} resources out of {}", recoveredResources.size(),
+                concurrentTest.getCreatedResourcePaths().size());
+    }
+
+    /**
+     * Tests that open operation succeeds after unregister + register sequence.
+     * This verifies the operation tracker is properly reset when re-registering.
+     */
+    @Test
+    public void testUnregisterRegisterIfAbsentOpenSequence() throws Exception {
+        LOGGER.info("Starting testUnregisterRegisterOpenSequence");
+
+        // Get a resource path
+        String resourcePath = concurrentTest.getCreatedResourcePaths().get(0);
+        LOGGER.info("Testing with resource: {}", resourcePath);
+
+        // First lifecycle - register, open, close, unregister
+        IIndex firstIndex = concurrentTest.registerIfAbsentIndex(resourcePath);
+        assertNotNull("Index should be registered successfully in first lifecycle", firstIndex);
+
+        concurrentTest.getDatasetLifecycleManager().open(resourcePath);
+        LOGGER.info("Successfully opened index in first lifecycle");
+
+        concurrentTest.getDatasetLifecycleManager().close(resourcePath);
+        concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
+        LOGGER.info("Successfully closed and unregistered index in first lifecycle");
+
+        // Second lifecycle - register again, open, close, unregister
+        IIndex secondIndex = concurrentTest.registerIfAbsentIndex(resourcePath);
+        assertNotNull("Index should be registered successfully in second lifecycle", secondIndex);
+
+        // This open should succeed if the operation tracker was properly reset
+        try {
+            concurrentTest.getDatasetLifecycleManager().open(resourcePath);
+            LOGGER.info("Successfully opened index in second lifecycle");
+        } catch (Exception e) {
+            LOGGER.error("Failed to open index in second lifecycle", e);
+            fail("Open should succeed in second lifecycle: " + e.getMessage());
+        }
+
+        // Cleanup
+        concurrentTest.getDatasetLifecycleManager().close(resourcePath);
+        concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
+        LOGGER.info("Successfully completed two full lifecycles with the same resource");
+    }
+
+    /**
+     * Tests that recovery works correctly across different partitions.
+     * This test specifically verifies that partition IDs are correctly propagated
+     * during the recovery process.
+     */
+    @Test
+    public void testRecoveryAcrossPartitions() throws Exception {
+        LOGGER.info("Starting testRecoveryAcrossPartitions");
+
+        // Skip test if we don't have at least 2 partitions
+        if (datasetPartitionPaths.size() < 2) {
+            LOGGER.info("Skipping testRecoveryAcrossPartitions - need at least 2 partitions");
+            return;
+        }
+
+        // Create maps to track recovery by partition
+        Map<Integer, Boolean> partitionRecovered = new HashMap<>();
+        Map<Integer, Set<String>> resourcesRecoveredByPartition = new HashMap<>();
+
+        // Find resources from different partitions
+        Map<Integer, String> resourcesByPartition = new HashMap<>();
+        // Format: storage/partition_X/DBName/ScopeName/DatasetName/ReplicationFactor/IndexName
+        Pattern pattern = Pattern.compile(".*?/partition_(\\d+)/.+");
+
+        for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
+            Matcher matcher = pattern.matcher(resourcePath);
+            if (matcher.matches()) {
+                int partitionId = Integer.parseInt(matcher.group(1));
+                resourcesByPartition.putIfAbsent(partitionId, resourcePath);
+                partitionRecovered.put(partitionId, false);
+                resourcesRecoveredByPartition.put(partitionId, new TreeSet<>());
+            }
+        }
+
+        LOGGER.info("Found resources in {} different partitions: {}", resourcesByPartition.size(),
+                resourcesByPartition.keySet());
+
+        // Mock repository to track which partition's resources are being recovered
+        ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
+
+        // Setup capture of resources that get recovered
+        doAnswer(inv -> {
+            List<FileReference> refs = inv.getArgument(1);
+            if (refs != null && !refs.isEmpty()) {
+                FileReference fileRef = refs.get(0);
+                String rootPath = fileRef.getAbsolutePath();
+                LOGGER.info("Looking up resources with root path: {}", rootPath);
+
+                // The rootPath now includes the replication factor
+                LOGGER.info("Using corrected rootPath for resource lookup: {}", rootPath);
+
+                // Get the partition ID from the path
+                Matcher matcher = Pattern.compile(".*?/partition_(\\d+)/.+").matcher(rootPath);
+                if (matcher.matches()) {
+                    int partitionId = Integer.parseInt(matcher.group(1));
+                    partitionRecovered.put(partitionId, true);
+                    LOGGER.info("Recovery requested for partition {}", partitionId);
+                } else {
+                    LOGGER.warn("Could not extract partition ID from path: {}", rootPath);
+                }
+
+                // Get all resources that match this root path
+                Map<Long, LocalResource> matchingResources = new HashMap<>();
+
+                concurrentTest.getMockResources().entrySet().stream()
+                        .filter(entry -> entry.getKey().equals(rootPath) || entry.getKey().startsWith(rootPath + "/"))
+                        .forEach(entry -> {
+                            LocalResource resource = entry.getValue();
+                            matchingResources.put(resource.getId(), resource);
+
+                            // Extract partition ID from path
+                            Matcher m = Pattern.compile(".*?/partition_(\\d+)/.+").matcher(entry.getKey());
+                            if (m.matches()) {
+                                int partitionId = Integer.parseInt(m.group(1));
+                                resourcesRecoveredByPartition.get(partitionId).add(entry.getKey());
+                                LOGGER.info("Resource {} added to recovery for partition {}", entry.getKey(),
+                                        partitionId);
+                            }
+                        });
+
+                LOGGER.info("Found {} resources to recover", matchingResources.size());
+                return matchingResources;
+            }
+            return Map.of();
+        }).when(mockRepository).getResources(any(), anyList());
+
+        // Open resources from each partition
+        for (Map.Entry<Integer, String> entry : resourcesByPartition.entrySet()) {
+            int partitionId = entry.getKey();
+            String resourcePath = entry.getValue();
+
+            LOGGER.info("Testing recovery for partition {} with resource {}", partitionId, resourcePath);
+
+            // Register and open the resource
+            IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
+            assertNotNull("Index should be registered successfully", index);
+
+            // Get the LocalResource and verify its partition ID
+            LocalResource localResource = concurrentTest.getMockResources().get(resourcePath);
+            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
+            int resourcePartitionId = datasetLocalResource.getPartition();
+
+            LOGGER.info("Resource {} has partition ID {} (expected {})", resourcePath, resourcePartitionId,
+                    partitionId);
+
+            // Open the index - should trigger lazy recovery for this partition
+            concurrentTest.getDatasetLifecycleManager().open(resourcePath);
+            LOGGER.info("Successfully opened index for partition {}", partitionId);
+
+            // Verify this partition was recovered
+            assertTrue("Partition " + partitionId + " should be marked as recovered",
+                    partitionRecovered.get(partitionId));
+
+            // Verify resources for this partition were recovered
+            Set<String> recoveredResources = resourcesRecoveredByPartition.get(partitionId);
+            assertTrue("At least one resource should be recovered for partition " + partitionId,
+                    !recoveredResources.isEmpty());
+
+            // Cleanup
+            concurrentTest.getDatasetLifecycleManager().close(resourcePath);
+            concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
+        }
+
+        // Summary of recovery by partition
+        for (Map.Entry<Integer, Boolean> entry : partitionRecovered.entrySet()) {
+            int partitionId = entry.getKey();
+            boolean recovered = entry.getValue();
+            int resourceCount = resourcesRecoveredByPartition.get(partitionId).size();
+
+            LOGGER.info("Partition {}: Recovered={}, Resources recovered={}", partitionId, recovered, resourceCount);
+        }
+    }
+
+    /**
+     * Tests that the partition ID in DatasetLocalResource objects is correctly set.
+     * This test specifically verifies that partition IDs match the resource path.
+     */
+    @Test
+    public void testPartitionIdMatchesPath() throws Exception {
+        LOGGER.info("Starting testPartitionIdMatchesPath");
+
+        // Find resources from different partitions
+        Map<Integer, Set<String>> resourcesByPartition = new HashMap<>();
+        Pattern pattern = Pattern.compile(".*?/partition_(\\d+)/.+");
+
+        // First, collect all resources by partition
+        for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
+            Matcher matcher = pattern.matcher(resourcePath);
+            if (matcher.matches()) {
+                int partitionId = Integer.parseInt(matcher.group(1));
+                resourcesByPartition.computeIfAbsent(partitionId, k -> new TreeSet<>()).add(resourcePath);
+            }
+        }
+
+        boolean incorrectPartitionIdFound = false;
+
+        // Examine each resource and check its partition ID
+        for (Map.Entry<Integer, Set<String>> entry : resourcesByPartition.entrySet()) {
+            int expectedPartitionId = entry.getKey();
+            Set<String> resources = entry.getValue();
+
+            LOGGER.info("Checking {} resources for partition {}", resources.size(), expectedPartitionId);
+
+            for (String resourcePath : resources) {
+                LocalResource localResource = concurrentTest.getMockResources().get(resourcePath);
+                if (localResource != null) {
+                    DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
+                    int actualPartitionId = datasetLocalResource.getPartition();
+
+                    LOGGER.info("Resource: {}, Expected partition: {}, Actual partition: {}", resourcePath,
+                            expectedPartitionId, actualPartitionId);
+
+                    if (expectedPartitionId != actualPartitionId) {
+                        LOGGER.error("PARTITION ID MISMATCH for resource {}! Expected: {}, Actual: {}", resourcePath,
+                                expectedPartitionId, actualPartitionId);
+                        incorrectPartitionIdFound = true;
+                    }
+                } else {
+                    LOGGER.warn("Resource not found in mockResources: {}", resourcePath);
+                }
+            }
+        }
+
+        // This assertion will fail if any partition ID is incorrect
+        assertFalse("Incorrect partition IDs found in resources", incorrectPartitionIdFound);
+
+        // Now test DatasetLifecycleManager.getDatasetLocalResource directly
+        LOGGER.info("Testing DatasetLifecycleManager.getDatasetLocalResource directly");
+
+        // Find a resource in partition 1 (if available)
+        Set<String> partition1Resources = resourcesByPartition.getOrDefault(1, Collections.emptySet());
+        if (!partition1Resources.isEmpty()) {
+            String resourcePath = partition1Resources.iterator().next();
+            LOGGER.info("Testing getDatasetLocalResource with partition 1 resource: {}", resourcePath);
+
+            // Register the resource
+            IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
+            assertNotNull("Index should be registered successfully", index);
+
+            // Directly access the DatasetLocalResource to check partition ID
+            // We'll need to use reflection since getDatasetLocalResource is private
+            try {
+                // Get the private method
+                Method getDatasetLocalResourceMethod =
+                        DatasetLifecycleManager.class.getDeclaredMethod("getDatasetLocalResource", String.class);
+                getDatasetLocalResourceMethod.setAccessible(true);
+
+                // Invoke the method
+                DatasetLocalResource result = (DatasetLocalResource) getDatasetLocalResourceMethod
+                        .invoke(concurrentTest.getDatasetLifecycleManager(), resourcePath);
+
+                assertNotNull("DatasetLocalResource should not be null", result);
+                int actualPartitionId = result.getPartition();
+                LOGGER.info("getDatasetLocalResource for {} returned partition ID: {}", resourcePath,
+                        actualPartitionId);
+                assertEquals("Partition ID should be 1", 1, actualPartitionId);
+            } catch (Exception e) {
+                LOGGER.error("Error accessing getDatasetLocalResource method", e);
+                fail("Failed to access getDatasetLocalResource method: " + e.getMessage());
+            }
+
+            // Clean up
+            concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
+        } else {
+            LOGGER.info("No resources found in partition 1, skipping direct test");
+        }
+    }
+
+    /**
+     * Tests that the mock resources are correctly set up with the proper partition IDs.
+     * This test directly examines the mockResources map to verify the mocks.
+     */
+    @Test
+    public void testMockResourcesSetup() throws Exception {
+        LOGGER.info("Starting testMockResourcesSetup");
+
+        // Extract partition IDs from paths and compare with DatasetLocalResource partitions
+        Pattern pattern = Pattern.compile(".*?/partition_(\\d+)/.+");
+        int correctResources = 0;
+        int incorrectResources = 0;
+
+        LOGGER.info("Total mock resources: {}", concurrentTest.getMockResources().size());
+
+        // Print out all setupMockResource calls that would be needed to recreate the resources
+        for (Map.Entry<String, LocalResource> entry : concurrentTest.getMockResources().entrySet()) {
+            String resourcePath = entry.getKey();
+            LocalResource localResource = entry.getValue();
+
+            // Extract the expected partition ID from the path
+            Matcher matcher = pattern.matcher(resourcePath);
+            if (matcher.matches()) {
+                int expectedPartitionId = Integer.parseInt(matcher.group(1));
+
+                // Get the actual partition ID from the resource
+                Object resourceObj = localResource.getResource();
+                if (resourceObj instanceof DatasetLocalResource) {
+                    DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resourceObj;
+                    int actualPartitionId = datasetLocalResource.getPartition();
+                    int datasetId = datasetLocalResource.getDatasetId();
+                    long resourceId = localResource.getId();
+
+                    // Log resource details
+                    LOGGER.info(
+                            "Resource: {}, Expected Partition: {}, Actual Partition: {}, Dataset ID: {}, Resource ID: {}",
+                            resourcePath, expectedPartitionId, actualPartitionId, datasetId, resourceId);
+
+                    // Generate the setupMockResource call that would create this resource
+                    LOGGER.info("setupMockResource(\"{}\", {}, {}, {});", resourcePath, datasetId, expectedPartitionId,
+                            resourceId);
+
+                    if (expectedPartitionId == actualPartitionId) {
+                        correctResources++;
+                    } else {
+                        incorrectResources++;
+                        LOGGER.error("PARTITION MISMATCH in mock resources: {} (expected: {}, actual: {})",
+                                resourcePath, expectedPartitionId, actualPartitionId);
+                    }
+                } else {
+                    LOGGER.warn("Resource object is not a DatasetLocalResource: {}", resourceObj);
+                }
+            } else {
+                LOGGER.warn("Resource path doesn't match expected pattern: {}", resourcePath);
+            }
+        }
+
+        LOGGER.info("Resources with correct partition IDs: {}", correctResources);
+        LOGGER.info("Resources with incorrect partition IDs: {}", incorrectResources);
+
+        // Fail the test if any resources have incorrect partition IDs
+        assertEquals("All resources should have correct partition IDs", 0, incorrectResources);
+
+        // Now test a direct mock creation and retrieval to see if that works correctly
+        LOGGER.info("Testing direct mock creation and retrieval");
+
+        // Create a test mock resource with partition 1
+        String testPath = "test/partition_1/Default/SDefault/testDs/0/testIndex"; // 0 is replication factor (always 0)
+        int testDatasetId = 999;
+        int testPartition = 1;
+        long testResourceId = 12345;
+
+        // Create a mock dataset local resource
+        DatasetLocalResource mockDatasetResource = mock(DatasetLocalResource.class);
+        when(mockDatasetResource.getDatasetId()).thenReturn(testDatasetId);
+        when(mockDatasetResource.getPartition()).thenReturn(testPartition);
+
+        // Create a mock LSMINDEX (not just IIndex) since recovery expects ILSMIndex
+        ILSMIndex mockIndex = mock(ILSMIndex.class);
+        when(mockIndex.toString()).thenReturn(testPath);
+
+        // Add necessary behavior for LSM operations
+        when(mockIndex.isDurable()).thenReturn(true);
+        when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+
+        // Allow this resource to be created
+        when(mockDatasetResource.createInstance(any())).thenReturn(mockIndex);
+
+        // Create a mock local resource
+        LocalResource mockLocalResource = mock(LocalResource.class);
+        when(mockLocalResource.getId()).thenReturn(testResourceId);
+        when(mockLocalResource.getPath()).thenReturn(testPath);
+        when(mockLocalResource.getResource()).thenReturn(mockDatasetResource);
+
+        // Add to mock resources map
+        concurrentTest.getMockResources().put(testPath, mockLocalResource);
+
+        // Create a mock file reference
+        FileReference mockFileRef = mock(FileReference.class);
+        when(mockFileRef.getRelativePath()).thenReturn(testPath);
+        when(mockFileRef.getAbsolutePath()).thenReturn(testPath);
+        when(concurrentTest.getMockServiceContext().getIoManager().resolveAbsolutePath(testPath))
+                .thenReturn(mockFileRef);
+    }
+
+    /**
+     * Creates a mock resource for testing.
+     */
+    private void setupMockResourceForTest(String resourcePath, int datasetId, int partition, long resourceId)
+            throws HyracksDataException {
+        // Create a mock dataset local resource
+        DatasetLocalResource mockDatasetResource = mock(DatasetLocalResource.class);
+        when(mockDatasetResource.getDatasetId()).thenReturn(datasetId);
+        when(mockDatasetResource.getPartition()).thenReturn(partition);
+
+        // Ensure we have a dataset resource
+        DatasetResource datasetResource = concurrentTest.getDatasetLifecycleManager().getDatasetLifecycle(datasetId);
+
+        // Create op tracker if needed
+        PrimaryIndexOperationTracker opTracker = datasetResource.getOpTracker(partition);
+        if (opTracker == null) {
+            opTracker = mock(PrimaryIndexOperationTracker.class);
+            datasetResource.setPrimaryIndexOperationTracker(partition, opTracker);
+            LOGGER.info("Created operation tracker for dataset {} partition {}", datasetId, partition);
+        }
+
+        // Create a mock local resource
+        LocalResource mockLocalResource = mock(LocalResource.class);
+        when(mockLocalResource.getId()).thenReturn(resourceId);
+        when(mockLocalResource.getPath()).thenReturn(resourcePath);
+        when(mockLocalResource.getResource()).thenReturn(mockDatasetResource);
+
+        // Add to mock resources map
+        concurrentTest.getMockResources().put(resourcePath, mockLocalResource);
+
+        // Create a mock file reference
+        FileReference mockFileRef = mock(FileReference.class);
+        when(mockFileRef.getRelativePath()).thenReturn(resourcePath);
+        when(mockFileRef.getAbsolutePath()).thenReturn(resourcePath);
+        when(concurrentTest.getMockServiceContext().getIoManager().resolveAbsolutePath(resourcePath))
+                .thenReturn(mockFileRef);
+
+        // Ensure the mock index is the one that will be returned during registration
+        concurrentTest.createMockIndexForPath(resourcePath, datasetId, partition, opTracker);
+    }
+
+    /**
+     * Tests that unregister operation succeeds during lazy recovery.
+     * This test verifies that resources are correctly unregistered during lazy recovery.
+     */
+    @Test
+    public void testConcurrentUnregisterDuringLazyRecovery() throws Exception {
+        LOGGER.info("Starting testConcurrentUnregisterDuringLazyRecovery");
+
+        // We need at least 6 resources for a good test
+        final int REQUIRED_RESOURCES = 6;
+
+        // Get multiple resources from the same dataset and partition
+        List<String> resourcesInSamePartition = findResourcesInSamePartition(REQUIRED_RESOURCES);
+
+        // If we don't have enough resources, create more
+        if (resourcesInSamePartition.size() < REQUIRED_RESOURCES) {
+            LOGGER.info("Only found {} resources, creating additional resources", resourcesInSamePartition.size());
+
+            // Get dataset and partition info from the existing resources, or create new ones
+            int datasetId;
+            int partitionId;
+            String datasetName;
+
+            if (!resourcesInSamePartition.isEmpty()) {
+                // Use the same dataset and partition as existing resources
+                String existingResource = resourcesInSamePartition.get(0);
+                LocalResource localResource = concurrentTest.getMockResources().get(existingResource);
+                DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
+                datasetId = datasetLocalResource.getDatasetId();
+                partitionId = datasetLocalResource.getPartition();
+
+                // Extract dataset name from the existing resource path
+                Pattern datasetPattern = Pattern.compile(".*?/partition_\\d+/[^/]+/[^/]+/([^/]+)/\\d+/.*");
+                Matcher matcher = datasetPattern.matcher(existingResource);
+                if (matcher.matches()) {
+                    datasetName = matcher.group(1);
+                    LOGGER.info("Using dataset name from existing resource: {}", datasetName);
+                } else {
+                    datasetName = "testDs";
+                    LOGGER.warn("Could not extract dataset name from {}, using default: {}", existingResource,
+                            datasetName);
+                }
+            } else {
+                // Create new dataset and partition
+                datasetId = 999; // Use a unique dataset ID
+                partitionId = 1; // Use partition 1
+                datasetName = "testDs";
+            }
+
+            LOGGER.info("Creating resources for dataset {} partition {}", datasetId, partitionId);
+
+            // Create additional resources until we reach the required count
+            for (int i = resourcesInSamePartition.size(); i < REQUIRED_RESOURCES; i++) {
+                String indexName = "test_index_" + i;
+                // Use the dataset name extracted from existing resources
+                String resourcePath = String.format("%s/partition_%d/%s/%s/%s/0/%s", "storage", partitionId, "Default",
+                        "SDefault", datasetName, indexName);
+
+                // Create new mock resources
+                setupMockResourceForTest(resourcePath, datasetId, partitionId, datasetId * 1000L + i);
+                resourcesInSamePartition.add(resourcePath);
+                // LOGGER.info("Created additional resource: {}", resourcePath);
+            }
+        }
+
+        LOGGER.info("Have {} resources for test", resourcesInSamePartition.size());
+
+        // Register ALL resources
+        List<String> registeredResources = new ArrayList<>();
+        for (String resourcePath : resourcesInSamePartition) {
+            IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
+            assertNotNull("Index should be registered successfully: " + resourcePath, index);
+            registeredResources.add(resourcePath);
+            LOGGER.info("Registered resource: {}", resourcePath);
+        }
+
+        // We'll use 1/3 of resources for unregistering
+        int unregisterCount = Math.max(2, resourcesInSamePartition.size() / 3);
+
+        // Split resources: one for opening, several for unregistering, rest for recovery
+        String resourceForOpen = registeredResources.get(0);
+        List<String> resourcesToUnregister = registeredResources.subList(1, 1 + unregisterCount);
+        List<String> otherResources = registeredResources.subList(1 + unregisterCount, registeredResources.size());
+
+        LOGGER.info("Using {} resources: 1 for open, {} for unregistering, {} for verification",
+                registeredResources.size(), resourcesToUnregister.size(), otherResources.size());
+
+        // Get the dataset and partition ID for tracking
+        LocalResource localResourceA = concurrentTest.getMockResources().get(resourceForOpen);
+        DatasetLocalResource datasetLocalResourceA = (DatasetLocalResource) localResourceA.getResource();
+        int datasetId = datasetLocalResourceA.getDatasetId();
+        int partitionId = datasetLocalResourceA.getPartition();
+
+        LOGGER.info("Test using dataset {} partition {}", datasetId, partitionId);
+
+        // Set up mock for getResources to return actual resources during recovery
+        ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
+        doAnswer(inv -> {
+            List<FileReference> refs = inv.getArgument(1);
+            if (refs != null && !refs.isEmpty()) {
+                // Create a map of resources to return
+                Map<Long, LocalResource> result = new HashMap<>();
+                // Add our test resources from the same partition
+                for (String path : registeredResources) {
+                    LocalResource res = concurrentTest.getMockResources().get(path);
+                    if (res != null) {
+                        result.put(res.getId(), res);
+                        LOGGER.info("Adding resource to recovery batch: {}", path);
+                    }
+                }
+                return result;
+            }
+            return Collections.emptyMap();
+        }).when(mockRepository).getResources(any(), any());
+
+        // Ensure operation tracker is properly initialized for the dataset resources
+        DatasetResource datasetResource = concurrentTest.getDatasetLifecycleManager().getDatasetLifecycle(datasetId);
+        if (datasetResource.getOpTracker(partitionId) == null) {
+            PrimaryIndexOperationTracker opTracker = mock(PrimaryIndexOperationTracker.class);
+            datasetResource.setPrimaryIndexOperationTracker(partitionId, opTracker);
+            LOGGER.info("Created and set operation tracker for dataset {} partition {}", datasetId, partitionId);
+        }
+
+        // Verify operation tracker exists before proceeding
+        assertNotNull("Operation tracker should be initialized for partition " + partitionId,
+                datasetResource.getOpTracker(partitionId));
+
+        // Execute operations with controlled timing
+        CountDownLatch recoveryStartedLatch = new CountDownLatch(1);
+        CountDownLatch unregisterCompleteLatch = new CountDownLatch(1);
+
+        // Mock the recovery to signal when it starts
+        IRecoveryManager mockRecoveryManager = concurrentTest.getMockRecoveryManager();
+        doAnswer(invocation -> {
+            List<ILSMIndex> indexes = invocation.getArgument(0);
+            LOGGER.info("Recovery started with {} indexes", indexes.size());
+
+            // Signal that recovery has started
+            recoveryStartedLatch.countDown();
+
+            // Wait for unregister to complete before continuing
+            LOGGER.info("Waiting for unregister to complete");
+            unregisterCompleteLatch.await(10, TimeUnit.SECONDS);
+            LOGGER.info("Continuing with recovery after unregister");
+
+            // Mark resources as recovered
+            for (ILSMIndex index : indexes) {
+                String path = index.toString();
+                recoveredResourcePaths.add(path);
+                // Remove individual resource logs
+            }
+            // Add a summary after the loop
+            LOGGER.info("Marked {} resources as recovered", indexes.size());
+
+            return null;
+        }).when(mockRecoveryManager).recoverIndexes(anyList());
+
+        // Thread 1: Open resource for open (will trigger recovery for all)
+        Future<?> openFuture = executorService.submit(() -> {
+            try {
+                LOGGER.info("Thread 1: Opening resource to trigger recovery...");
+                concurrentTest.getDatasetLifecycleManager().open(resourceForOpen);
+                LOGGER.info("Thread 1: Successfully opened resource");
+            } catch (Exception e) {
+                LOGGER.error("Thread 1: Error opening resource", e);
+            }
+        });
+
+        // Wait for recovery to start
+        LOGGER.info("Waiting for recovery to start");
+        boolean recoveryStarted = recoveryStartedLatch.await(10, TimeUnit.SECONDS);
+        assertTrue("Recovery should have started", recoveryStarted);
+        LOGGER.info("Recovery has started");
+
+        // Thread 2: Unregister multiple resources while recovery is happening
+        Future<?> unregisterFuture = executorService.submit(() -> {
+            try {
+                LOGGER.info("Thread 2: Unregistering {} resources...", resourcesToUnregister.size());
+
+                for (String resourcePath : resourcesToUnregister) {
+                    concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
+                }
+
+                // Signal that unregister is complete
+                unregisterCompleteLatch.countDown();
+                LOGGER.info("Thread 2: Successfully unregistered all resources");
+            } catch (Exception e) {
+                LOGGER.error("Thread 2: Error unregistering resources", e);
+                // Ensure we don't deadlock in case of failure
+                unregisterCompleteLatch.countDown();
+            }
+        });
+
+        // Wait for unregister to complete
+        unregisterFuture.get(50, TimeUnit.SECONDS);
+
+        // Wait for open to complete
+        openFuture.get(10, TimeUnit.SECONDS);
+
+        // Verify expectations
+        LOGGER.info("Checking final state");
+
+        // Check which resources were recovered
+        List<String> recoveredResources = new ArrayList<>();
+        List<String> nonRecoveredResources = new ArrayList<>();
+
+        for (String resourcePath : registeredResources) {
+            if (isResourcePathRecovered(resourcePath)) {
+                recoveredResources.add(resourcePath);
+            } else {
+                nonRecoveredResources.add(resourcePath);
+            }
+        }
+
+        LOGGER.info("Results: {} resources recovered, {} resources not recovered", recoveredResources.size(),
+                nonRecoveredResources.size());
+
+        // Resource for open should have been opened successfully
+        assertTrue("Resource for open should be recovered", isResourcePathRecovered(resourceForOpen));
+
+        // Resources to unregister should be unregistered
+        for (String resourcePath : resourcesToUnregister) {
+            IIndex indexAfter = concurrentTest.getDatasetLifecycleManager().get(resourcePath);
+            assertNull("Resource should be unregistered: " + resourcePath, indexAfter);
+        }
+
+        // Other resources should either be recovered or not, depending on timing
+        LOGGER.info("Completed testConcurrentUnregisterDuringLazyRecovery");
+    }
+
+    /**
+     * Helper method to find resources that belong to the same dataset and partition.
+     * 
+     * @param minCount Minimum number of resources needed
+     * @return List of resource paths from the same dataset and partition
+     */
+    private List<String> findResourcesInSamePartition(int minCount) {
+        // Group resources by dataset ID and partition ID
+        Map<String, List<String>> resourcesByDatasetAndPartition = new HashMap<>();
+
+        LOGGER.info("Looking for at least {} resources in the same partition", minCount);
+
+        for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
+            LocalResource localResource = concurrentTest.getMockResources().get(resourcePath);
+            if (localResource != null) {
+                DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
+                int datasetId = datasetLocalResource.getDatasetId();
+                int partitionId = datasetLocalResource.getPartition();
+
+                // Create a composite key using datasetId and partitionId
+                String key = datasetId + "_" + partitionId;
+                resourcesByDatasetAndPartition.computeIfAbsent(key, k -> new ArrayList<>()).add(resourcePath);
+            }
+        }
+
+        // Just log summary, not details
+        LOGGER.info("Found {} groups of resources by dataset/partition", resourcesByDatasetAndPartition.size());
+
+        // Find a group with enough resources
+        for (List<String> resources : resourcesByDatasetAndPartition.values()) {
+            if (resources.size() >= minCount) {
+                String firstResource = resources.get(0);
+                LocalResource localResource = concurrentTest.getMockResources().get(firstResource);
+                DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
+
+                LOGGER.info("Found suitable group with {} resources (dataset={}, partition={})", resources.size(),
+                        datasetLocalResource.getDatasetId(), datasetLocalResource.getPartition());
+                return resources;
+            }
+        }
+
+        // If no group has enough resources, return the one with the most
+        List<String> bestGroup = resourcesByDatasetAndPartition.values().stream()
+                .max(Comparator.comparingInt(List::size)).orElse(Collections.emptyList());
+
+        if (!bestGroup.isEmpty()) {
+            String firstResource = bestGroup.get(0);
+            LocalResource localResource = concurrentTest.getMockResources().get(firstResource);
+            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
+
+            LOGGER.info("Using best available group: {} resources (dataset={}, partition={})", bestGroup.size(),
+                    datasetLocalResource.getDatasetId(), datasetLocalResource.getPartition());
+        } else {
+            LOGGER.warn("Could not find any resources in the same partition");
+        }
+
+        return bestGroup;
+    }
+
+    /**
+     * Helper method to check if a resource path is contained in the recovered paths.
+     * This handles potential differences between the resource path string and the index.toString() format.
+     */
+    private boolean isResourcePathRecovered(String resourcePath) {
+        // Direct match check
+        if (recoveredResourcePaths.contains(resourcePath)) {
+            return true;
+        }
+
+        // For each recovered path, check if it contains the resource path
+        for (String recoveredPath : recoveredResourcePaths) {
+            if (recoveredPath.equals(resourcePath) ||
+            // Check if the recoveredPath contains the resourcePath (for formatted toString values)
+                    recoveredPath.contains(resourcePath)) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index cb4e068..320d089 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -138,9 +138,13 @@
 
     @Override
     public LocalResource get(String relativePath) throws HyracksDataException {
+        LocalResource resource = getLocalResourceFromCache(relativePath);
+        if (resource != null) {
+            return resource;
+        }
         beforeReadAccess();
         try {
-            LocalResource resource = resourceCache.getIfPresent(relativePath);
+            resource = resourceCache.getIfPresent(relativePath);
             if (resource == null) {
                 FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
                 resource = readLocalResource(resourceFile);
@@ -154,6 +158,20 @@
         }
     }
 
+    private LocalResource getLocalResourceFromCache(String relativePath) {
+        LocalResource resource;
+        beforeWriteAccess();
+        try {
+            resource = resourceCache.getIfPresent(relativePath);
+            if (resource != null) {
+                return resource;
+            }
+        } finally {
+            afterWriteAccess();
+        }
+        return null;
+    }
+
     @SuppressWarnings("squid:S1181")
     @Override
     public void insert(LocalResource resource) throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
index f5edc74..3f87c6d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
@@ -68,7 +68,7 @@
         ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory();
         localResourceRepository = localResourceRepositoryFactory.createRepository();
         resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
-        lcManager = new IndexLifecycleManager();
+        lcManager = new IndexLifecycleManager(appCtx, localResourceRepository);
     }
 
     public void close() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
index 8a67c71..b03f54f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
@@ -109,6 +109,11 @@
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
-        lcManager.register(resourceRelPath, index);
+        IIndex registeredIndex = lcManager.registerIfAbsent(resourceRelPath, index);
+        if (registeredIndex != index) {
+            // some other thread has registered the index
+            // indicating this is not the first time, the index is being created
+            throw new HyracksDataException("Index with resource ID " + resourceId + " already exists.");
+        }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index b79c3b1..550ad24 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -20,32 +20,25 @@
 package org.apache.hyracks.storage.am.common.dataflow;
 
 import org.apache.hyracks.api.application.INCServiceContext;
-import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.IResource;
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.storage.common.LocalResource;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
 
+@NotThreadSafe
 public class IndexDataflowHelper implements IIndexDataflowHelper {
 
-    private static final Logger LOGGER = LogManager.getLogger();
-    private final INCServiceContext ctx;
     private final IResourceLifecycleManager<IIndex> lcManager;
     private final ILocalResourceRepository localResourceRepository;
     private final FileReference resourceRef;
     private IIndex index;
 
-    public IndexDataflowHelper(final INCServiceContext ctx, IStorageManager storageMgr, FileReference resourceRef)
-            throws HyracksDataException {
-        this.ctx = ctx;
+    public IndexDataflowHelper(final INCServiceContext ctx, IStorageManager storageMgr, FileReference resourceRef) {
         this.lcManager = storageMgr.getLifecycleManager(ctx);
         this.localResourceRepository = storageMgr.getLocalResourceRepository(ctx);
         this.resourceRef = resourceRef;
@@ -58,56 +51,18 @@
 
     @Override
     public void open() throws HyracksDataException {
-        //Get local resource file
-        synchronized (lcManager) {
-            index = lcManager.get(resourceRef.getRelativePath());
-            if (index == null) {
-                LocalResource lr = readIndex();
-                lcManager.register(lr.getPath(), index);
-            }
-            lcManager.open(resourceRef.getRelativePath());
-        }
-    }
-
-    private LocalResource readIndex() throws HyracksDataException {
-        // Get local resource
-        LocalResource lr = getResource();
-        if (lr == null) {
-            throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourceRef.getRelativePath());
-        }
-        IResource resource = lr.getResource();
-        index = resource.createInstance(ctx);
-        return lr;
+        index = lcManager.registerIfAbsent(resourceRef.getRelativePath(), index);
+        lcManager.open(resourceRef.getRelativePath());
     }
 
     @Override
     public void close() throws HyracksDataException {
-        synchronized (lcManager) {
-            lcManager.close(resourceRef.getRelativePath());
-        }
+        lcManager.close(resourceRef.getRelativePath());
     }
 
     @Override
     public void destroy() throws HyracksDataException {
-        LOGGER.log(Level.INFO, "Dropping index " + resourceRef.getRelativePath() + " on node " + ctx.getNodeId());
-        synchronized (lcManager) {
-            index = lcManager.get(resourceRef.getRelativePath());
-            if (index != null) {
-                lcManager.unregister(resourceRef.getRelativePath());
-            } else {
-                readIndex();
-            }
-
-            if (getResourceId() != -1) {
-                localResourceRepository.delete(resourceRef.getRelativePath());
-            }
-            index.destroy();
-        }
-    }
-
-    private long getResourceId() throws HyracksDataException {
-        LocalResource lr = localResourceRepository.get(resourceRef.getRelativePath());
-        return lr == null ? -1 : lr.getId();
+        lcManager.destroy(resourceRef.getRelativePath());
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index ab301a9..dd80bcb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -25,24 +25,33 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.IResource;
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
+import org.apache.hyracks.storage.common.LocalResource;
 
 public class IndexLifecycleManager implements IResourceLifecycleManager<IIndex>, ILifeCycleComponent {
     private static final long DEFAULT_MEMORY_BUDGET = 1024 * 1024 * 100; // 100 megabytes
 
     private final Map<String, IndexInfo> indexInfos;
     private final long memoryBudget;
+    private final INCServiceContext appCtx;
+    private final ILocalResourceRepository resourceRepository;
     private long memoryUsed;
 
-    public IndexLifecycleManager() {
-        this(DEFAULT_MEMORY_BUDGET);
+    public IndexLifecycleManager(INCServiceContext appCtx, ILocalResourceRepository resourceRepository) {
+        this(appCtx, resourceRepository, DEFAULT_MEMORY_BUDGET);
     }
 
-    public IndexLifecycleManager(long memoryBudget) {
+    public IndexLifecycleManager(INCServiceContext appCtx, ILocalResourceRepository resourceRepository,
+            long memoryBudget) {
+        this.appCtx = appCtx;
+        this.resourceRepository = resourceRepository;
         this.indexInfos = new HashMap<>();
         this.memoryBudget = memoryBudget;
         this.memoryUsed = 0;
@@ -159,11 +168,34 @@
     }
 
     @Override
-    public void register(String resourcePath, IIndex index) throws HyracksDataException {
+    public IIndex registerIfAbsent(String resourcePath, IIndex index) throws HyracksDataException {
         if (indexInfos.containsKey(resourcePath)) {
-            throw new HyracksDataException("Index with resource name " + resourcePath + " already exists.");
+            return indexInfos.get(resourcePath).index;
+        }
+        if (index == null) {
+            index = getOrCreate(resourcePath);
         }
         indexInfos.put(resourcePath, new IndexInfo(index));
+        return index;
+    }
+
+    public IIndex getOrCreate(String resourcePath) throws HyracksDataException {
+        IIndex index = get(resourcePath);
+        if (index == null) {
+            index = readIndex(resourcePath);
+            registerIfAbsent(resourcePath, index);
+        }
+        return index;
+    }
+
+    private IIndex readIndex(String resourcePath) throws HyracksDataException {
+        // Get local resource
+        LocalResource lr = resourceRepository.get(resourcePath);
+        if (lr == null) {
+            throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
+        }
+        IResource resource = lr.getResource();
+        return resource.createInstance(appCtx);
     }
 
     @Override
@@ -209,4 +241,24 @@
         }
         indexInfos.remove(resourcePath);
     }
+
+    @Override
+    public void destroy(String resourcePath) throws HyracksDataException {
+        IIndex index = get(resourcePath);
+        if (index != null) {
+            unregister(resourcePath);
+        } else {
+            readIndex(resourcePath);
+        }
+
+        if (getResourceId(resourcePath) != -1) {
+            resourceRepository.delete(resourcePath);
+        }
+        index.destroy();
+    }
+
+    private long getResourceId(String resourcePath) throws HyracksDataException {
+        LocalResource lr = resourceRepository.get(resourcePath);
+        return lr == null ? -1 : lr.getId();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
index 6200680..1d74379 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
@@ -41,10 +41,8 @@
      *
      * @param resourceId
      * @param resource
-     * @throws HyracksDataException
-     *             if a resource is already registered with this resourceId
      */
-    public void register(String resourceId, R resource) throws HyracksDataException;
+    public R registerIfAbsent(String resourceId, R resource) throws HyracksDataException;
 
     /**
      * Opens a resource. The resource is moved to the open state
@@ -75,8 +73,15 @@
     /**
      * unregister a resource removing its resources in memory and on disk
      *
-     * @param resourceId
+     * @param resourcePath
      * @throws HyracksDataException
      */
-    public void unregister(String resourceId) throws HyracksDataException;
+    public void unregister(String resourcePath) throws HyracksDataException;
+
+    /**
+     * delete the resource
+     * @param resourcePath
+     * @throws HyracksDataException
+     */
+    public void destroy(String resourcePath) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
index 0b4d2ed..1328bc4 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
@@ -59,7 +59,7 @@
 
     @Override
     public IResourceLifecycleManager<IIndex> getLifecycleManager(INCServiceContext ctx) {
-        return TestStorageManagerComponentHolder.getIndexLifecycleManager();
+        return TestStorageManagerComponentHolder.getIndexLifecycleManager(ctx);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 7f696b4..1f7c0ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -78,9 +78,9 @@
         lcManager = null;
     }
 
-    public synchronized static IResourceLifecycleManager<IIndex> getIndexLifecycleManager() {
+    public synchronized static IResourceLifecycleManager<IIndex> getIndexLifecycleManager(INCServiceContext ctx) {
         if (lcManager == null) {
-            lcManager = new IndexLifecycleManager();
+            lcManager = new IndexLifecycleManager(ctx, getLocalResourceRepository());
         }
         return lcManager;
     }