Revert "[ASTERIXDB-3574][STO] Taking resource-level lock instead of global lock"

Reason for Revert: causing some correctness issue while caching/uncaching files

Ext-ref: MB-65695

Change-Id: I0bc0afaccaaf3519fa6b51df06b6077933f91461
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19691
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 5035d25..269f6f7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -284,6 +284,7 @@
         long lsn = -1;
         ILSMIndex index = null;
         LocalResource localResource = null;
+        DatasetLocalResource localResourceMetadata = null;
         boolean foundWinner = false;
         JobEntityCommits jobEntityWinners = null;
 
@@ -353,11 +354,12 @@
                             //if index is not registered into IndexLifeCycleManager,
                             //create the index using LocalMetadata stored in LocalResourceRepository
                             //get partition path in this node
+                            localResourceMetadata = (DatasetLocalResource) localResource.getResource();
                             index = (ILSMIndex) datasetLifecycleManager.get(localResource.getPath());
                             if (index == null) {
                                 //#. create index instance and register to indexLifeCycleManager
-                                index = (ILSMIndex) datasetLifecycleManager.registerIfAbsent(localResource.getPath(),
-                                        null);
+                                index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
+                                datasetLifecycleManager.register(localResource.getPath(), index);
                                 datasetLifecycleManager.open(localResource.getPath());
                                 try {
                                     maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 2803f21..8d40acb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -19,10 +19,10 @@
 package org.apache.asterix.common.context;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.asterix.common.api.IIOBlockingOperation;
@@ -60,8 +60,8 @@
     private boolean durable;
 
     public DatasetInfo(int datasetID, ILogManager logManager) {
-        this.partitionIndexes = new ConcurrentHashMap<>();
-        this.indexes = new ConcurrentHashMap<>();
+        this.partitionIndexes = new HashMap<>();
+        this.indexes = new HashMap<>();
         this.partitionPendingIO = new Int2IntOpenHashMap();
         this.setLastAccess(-1);
         this.datasetID = datasetID;
@@ -203,13 +203,13 @@
         return Collections.unmodifiableMap(indexes);
     }
 
-    public void addIndex(long resourceID, IndexInfo indexInfo) {
+    public synchronized void addIndex(long resourceID, IndexInfo indexInfo) {
         indexes.put(resourceID, indexInfo);
         partitionIndexes.computeIfAbsent(indexInfo.getPartition(), partition -> new HashSet<>()).add(indexInfo);
         LOGGER.debug("registered reference to index {}", indexInfo);
     }
 
-    public void removeIndex(long resourceID) {
+    public synchronized void removeIndex(long resourceID) {
         IndexInfo info = indexes.remove(resourceID);
         if (info != null) {
             partitionIndexes.get(info.getPartition()).remove(info);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 0128297..96f7241 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -23,13 +23,10 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.WeakHashMap;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.IntPredicate;
 import java.util.function.Predicate;
 
@@ -65,16 +62,13 @@
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.IResource;
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
 import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
 import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
-import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-@ThreadSafe
 public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent {
 
     private static final Logger LOGGER = LogManager.getLogger();
@@ -88,8 +82,6 @@
     private final LogRecord waitLog;
     protected final IDiskResourceCacheLockNotifier lockNotifier;
     private volatile boolean stopped = false;
-    private final ReentrantReadWriteLock stopLock;
-    private final Map<String, ReentrantReadWriteLock> resourceLocks;
     private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
     // all LSM-trees share the same virtual buffer cache list
     private final List<IVirtualBufferCache> vbcs;
@@ -104,7 +96,6 @@
         this.storageProperties = storageProperties;
         this.resourceRepository = resourceRepository;
         this.vbc = vbc;
-        this.stopLock = new ReentrantReadWriteLock();
         int numMemoryComponents = storageProperties.getMemoryComponentsNum();
         this.vbcs = new ArrayList<>(numMemoryComponents);
         for (int i = 0; i < numMemoryComponents; i++) {
@@ -112,14 +103,13 @@
         }
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
         this.lockNotifier = lockNotifier;
-        this.resourceLocks = Collections.synchronizedMap(new WeakHashMap<>());
         waitLog = new LogRecord();
         waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
         waitLog.computeAndSetLogSize();
     }
 
     @Override
-    public ILSMIndex get(String resourcePath) throws HyracksDataException {
+    public synchronized ILSMIndex get(String resourcePath) throws HyracksDataException {
         validateDatasetLifecycleManagerState();
         int datasetID = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -127,7 +117,7 @@
     }
 
     @Override
-    public ILSMIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
+    public synchronized ILSMIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
         validateDatasetLifecycleManagerState();
         DatasetResource datasetResource = datasets.get(datasetID);
         if (datasetResource == null) {
@@ -137,52 +127,16 @@
     }
 
     @Override
-    public IIndex registerIfAbsent(String resourcePath, IIndex index) throws HyracksDataException {
-        stopLock.readLock().lock();
-        try {
-            validateDatasetLifecycleManagerState();
-
-            IIndex existingIndex = get(resourcePath);
-            if (existingIndex != null) {
-                return existingIndex;
-            }
-
-            ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
-            resourceLock.writeLock().lock();
-            try {
-                existingIndex = get(resourcePath);
-                if (existingIndex != null) {
-                    return existingIndex;
-                }
-
-                if (index == null) {
-                    index = getOrCreateIndex(resourcePath);
-                }
-
-                int datasetID = getDIDfromResourcePath(resourcePath);
-                LocalResource resource = resourceRepository.get(resourcePath);
-                lockNotifier.onRegister(resource, index);
-
-                DatasetResource datasetResource = datasets.get(datasetID);
-                if (datasetResource == null) {
-                    datasetResource = getDatasetLifecycle(datasetID);
-                }
-
-                datasetResource.register(resource, (ILSMIndex) index);
-            } finally {
-                resourceLock.writeLock().unlock();
-            }
-        } finally {
-            stopLock.readLock().unlock();
+    public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException {
+        validateDatasetLifecycleManagerState();
+        int did = getDIDfromResourcePath(resourcePath);
+        LocalResource resource = resourceRepository.get(resourcePath);
+        DatasetResource datasetResource = datasets.get(did);
+        lockNotifier.onRegister(resource, index);
+        if (datasetResource == null) {
+            datasetResource = getDatasetLifecycle(did);
         }
-
-        return index;
-    }
-
-    private ReentrantReadWriteLock getResourceLock(String resourcePath) {
-        // create fair locks inorder to avoid starving.
-        // actually I believe there shouldn't be any kinda starving as the separate locks are created for each resource.
-        return resourceLocks.computeIfAbsent(resourcePath, k -> new ReentrantReadWriteLock(true));
+        datasetResource.register(resource, (ILSMIndex) index);
     }
 
     protected int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
@@ -209,261 +163,115 @@
         return (DatasetLocalResource) lr.getResource();
     }
 
-    /**
-     * Concurrency considerations for dataset operations:
-     *
-     * This method requires dataset locks to handle concurrent operations:
-     *
-     * 1. Dataset-level locking:
-     *    - The open() method works on all indexes of a dataset.
-     *    - The first-time open() call triggers recoverIndex(), which acquires the dataset lock.
-     *    - Race conditions may occur if index recovery happens while an index is being dropped.
-     *
-     * 2. Lock acquisition strategy:
-     *    - Both the dataset lock and the local resource lock must be acquired.
-     *    - This prevents register/open operations from occurring while an unregister operation is in progress.
-     *    - These locks must be held until the unregister operation completes.
-     *
-     * 3. Possible race scenarios:
-     *    Consider the following threads:
-     *      - t1: unregister(ds/0/idx1)
-     *      - t2: open(ds/0/idx1)
-     *      - t3: unregister(ds/0/idx2)
-     *      - t4: unregister(newDs/0/idx)
-     *    - If t2 is running, t1 and t3 should wait (same dataset), but t4 can proceed (different dataset).
-     *    - If t2 hasn't started yet (dataset lock not acquired), t1 and t3 could execute.
-     *
-     * 4. Race condition handling:
-     *    - If t1 starts unregistering, t2 starts opening, and t3 starts unregistering simultaneously:
-     *      - t2 takes the dataset lock.
-     *      - Depending on the timing of registration completion, the resource repository may or may not contain the index.
-     *      - If the index does not exist, an INDEX_DOES_NOT_EXIST error will occur.
-     *
-     * Note: At the compilation layer, exclusive dataset locks prevent DROP/UNREGISTER operations
-     *       from colliding with OPEN/REGISTER operations.
-     */
     @Override
-    public void unregister(String resourcePath) throws HyracksDataException {
-        stopLock.readLock().lock();
-        try {
-            validateDatasetLifecycleManagerState();
-            String datasetPartitionPath = StoragePathUtil.getDatasetPartitionPath(resourcePath);
+    public synchronized void unregister(String resourcePath) throws HyracksDataException {
+        validateDatasetLifecycleManagerState();
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
+        DatasetResource dsr = datasets.get(did);
+        IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
 
-            ReentrantReadWriteLock partitionResourceLock = getResourceLock(datasetPartitionPath);
-            ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
-            partitionResourceLock.writeLock().lock();
-            try {
-                resourceLock.writeLock().lock();
-                try {
-                    int did = getDIDfromResourcePath(resourcePath);
-                    long resourceID = getResourceIDfromResourcePath(resourcePath);
-                    DatasetResource dsr = datasets.get(did);
-                    IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+        if (dsr == null || iInfo == null) {
+            throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
+        }
 
-                    if (dsr == null || iInfo == null) {
-                        throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
-                    }
-
-                    lockNotifier.onUnregister(resourceID);
-                    PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
-                    if (iInfo.getReferenceCount() != 0
-                            || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
-                        if (LOGGER.isErrorEnabled()) {
-                            final String logMsg = String.format(
-                                    "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
-                                    resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
-                            LOGGER.error(logMsg);
-                        }
-                        throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
-                                StoragePathUtil.getIndexNameFromPath(resourcePath));
-                    }
-
-                    // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
-                    DatasetInfo dsInfo = dsr.getDatasetInfo();
-                    dsInfo.waitForIO();
-                    closeIndex(iInfo);
-                    dsInfo.removeIndex(resourceID);
-                    synchronized (dsInfo) {
-                        int referenceCount = dsInfo.getReferenceCount();
-                        boolean open = dsInfo.isOpen();
-                        boolean empty = dsInfo.getIndexes().isEmpty();
-                        if (referenceCount == 0 && open && empty && !dsInfo.isExternal()) {
-                            LOGGER.debug("removing dataset {} from cache", dsInfo.getDatasetID());
-                            removeDatasetFromCache(dsInfo.getDatasetID());
-                        } else {
-                            LOGGER.debug("keeping dataset {} in cache, ref count {}, open {}, indexes count: {}",
-                                    dsInfo.getDatasetID(), referenceCount, open, dsInfo.getIndexes().size());
-                        }
-                    }
-                } finally {
-                    resourceLock.writeLock().unlock();
-                }
-            } finally {
-                partitionResourceLock.writeLock().unlock();
+        lockNotifier.onUnregister(resourceID);
+        PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+        if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+            if (LOGGER.isErrorEnabled()) {
+                final String logMsg = String.format(
+                        "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+                        resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+                LOGGER.error(logMsg);
             }
-        } finally {
-            stopLock.readLock().unlock();
+            throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+                    StoragePathUtil.getIndexNameFromPath(resourcePath));
+        }
+
+        // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
+        dsInfo.waitForIO();
+        closeIndex(iInfo);
+        dsInfo.removeIndex(resourceID);
+        synchronized (dsInfo) {
+            int referenceCount = dsInfo.getReferenceCount();
+            boolean open = dsInfo.isOpen();
+            boolean empty = dsInfo.getIndexes().isEmpty();
+            if (referenceCount == 0 && open && empty && !dsInfo.isExternal()) {
+                LOGGER.debug("removing dataset {} from cache", dsInfo.getDatasetID());
+                removeDatasetFromCache(dsInfo.getDatasetID());
+            } else {
+                LOGGER.debug("keeping dataset {} in cache, ref count {}, open {}, indexes count: {}",
+                        dsInfo.getDatasetID(), referenceCount, open, dsInfo.getIndexes().size());
+            }
         }
     }
 
     @Override
-    public void destroy(String resourcePath) throws HyracksDataException {
-        stopLock.readLock().lock();
+    public synchronized void open(String resourcePath) throws HyracksDataException {
+        validateDatasetLifecycleManagerState();
+        DatasetLocalResource localResource = getDatasetLocalResource(resourcePath);
+        if (localResource == null) {
+            throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
+        }
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
+
+        lockNotifier.onOpen(resourceID);
         try {
-            ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
-            resourceLock.writeLock().lock();
-            try {
-                LOGGER.info("Dropping index {} on node {}", resourcePath, serviceCtx.getNodeId());
-                IIndex index = get(resourcePath);
-                if (index != null) {
-                    unregister(resourcePath);
-                } else {
-                    index = readIndex(resourcePath);
-                }
-                if (getResourceId(resourcePath) != -1) {
-                    resourceRepository.delete(resourcePath);
-                }
-                index.destroy();
-            } finally {
-                resourceLock.writeLock().unlock();
+            DatasetResource datasetResource = datasets.get(did);
+            int partition = localResource.getPartition();
+            if (shouldRecoverLazily(datasetResource, partition)) {
+                performLocalRecovery(resourcePath, datasetResource, partition);
+            } else {
+                openResource(resourcePath, false);
             }
         } finally {
-            stopLock.readLock().unlock();
+            lockNotifier.onClose(resourceID);
         }
     }
 
-    private long getResourceId(String resourcePath) throws HyracksDataException {
-        LocalResource lr = resourceRepository.get(resourcePath);
-        return lr == null ? -1 : lr.getId();
-    }
-
-    @Override
-    public void open(String resourcePath) throws HyracksDataException {
-        stopLock.readLock().lock();
-        try {
-            validateDatasetLifecycleManagerState();
-
-            DatasetLocalResource localResource = getDatasetLocalResource(resourcePath);
-            if (localResource == null) {
-                throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
-            }
-
-            int did = getDIDfromResourcePath(resourcePath);
-            long resourceID = getResourceIDfromResourcePath(resourcePath);
-
-            // Notify before opening a resource
-            lockNotifier.onOpen(resourceID);
-            try {
-                DatasetResource datasetResource = datasets.get(did);
-                int partition = localResource.getPartition();
-                boolean lazyRecover = shouldRecoverLazily(datasetResource, partition);
-
-                if (!lazyRecover) {
-                    openResource(resourcePath, false);
-                    return;
-                }
-
-                // Perform local recovery by taking a lock on the root resource
-                boolean recoveredByCurrentThread = performLocalRecovery(resourcePath, datasetResource, partition);
-
-                /*
-                 * Concurrent Access Scenario for Index Resource Recovery:
-                 * ------------------------------------------------------
-                 * When multiple threads attempt to open the same index resource, a race
-                 * condition can occur.
-                 *
-                 * Example:
-                 * - Thread1 (handling ds/0/idx1) and Thread2 (handling ds/0/idx2) may both
-                 *   try to recover dataset indexes.
-                 * - Thread1 enters `ensureIndexOpenAndConsistent()`, detects that the resource
-                 *   is not recovered, and starts recovery.
-                 * - Before Thread1 marks recovery as complete, Thread2 also checks and finds
-                 *   the resource not recovered, so it attempts recovery too.
-                 *
-                 * However, index recovery is an expensive operation and should be performed by
-                 * only one thread. To prevent multiple recoveries, `ensureIndexOpenAndConsistent()`
-                 * must be synchronized on the root dataset resource. This ensures that only one
-                 * thread performs the recovery, while others wait.
-                 *
-                 * Behavior After Synchronization:
-                 * - Thread1 recovers and marks the index as recovered.
-                 * - Thread2, after acquiring the lock, sees that the index is already recovered.
-                 * - It returns `false` from `ensureIndexOpenAndConsistent()` and proceeds to call
-                 *   `open()`.
-                 * - Since `open()` is idempotent, if the index is already open (`iInfo.isOpen()`),
-                 *   it does nothing except incrementing open stats.
-                 */
-                if (!recoveredByCurrentThread) {
-                    openResource(resourcePath, false);
-                }
-            } finally {
-                lockNotifier.onClose(resourceID);
-            }
-        } finally {
-            stopLock.readLock().unlock();
-        }
-    }
-
-    private boolean performLocalRecovery(String resourcePath, DatasetResource datasetResource, int partition)
+    private void performLocalRecovery(String resourcePath, DatasetResource datasetResource, int partition)
             throws HyracksDataException {
+        LOGGER.debug("performing local recovery for dataset {} partition {}", datasetResource.getDatasetInfo(),
+                partition);
+        FileReference indexRootRef = StoragePathUtil.getIndexRootPath(serviceCtx.getIoManager(), resourcePath);
+        Map<Long, LocalResource> resources = resourceRepository.getResources(r -> true, List.of(indexRootRef));
 
-        String indexRootRefPath = StoragePathUtil.getDatasetPartitionPath(resourcePath);
-        ReentrantReadWriteLock resourceLock = getResourceLock(indexRootRefPath);
-        FileReference indexRootRef = serviceCtx.getIoManager().resolve(indexRootRefPath);
-        resourceLock.writeLock().lock();
-        try {
-            if (!shouldRecoverLazily(datasetResource, partition)) {
-                return false;
-            }
-            LOGGER.debug("performing local recovery for dataset {} partition {}", datasetResource.getDatasetInfo(),
-                    partition);
-            Map<Long, LocalResource> resources = resourceRepository.getResources(r -> true, List.of(indexRootRef));
-
-            List<ILSMIndex> indexes = new ArrayList<>();
-            for (LocalResource resource : resources.values()) {
-                if (shouldSkipRecoveringResource(resource)) {
-                    continue;
-                }
-
-                ILSMIndex index = (ILSMIndex) registerIfAbsent(resource.getPath(), null);
-                boolean undoTouch = !resourcePath.equals(resource.getPath());
-                openResource(resource.getPath(), undoTouch);
-                indexes.add(index);
+        List<ILSMIndex> indexes = new ArrayList<>();
+        for (LocalResource resource : resources.values()) {
+            if (shouldSkipResource(resource)) {
+                continue;
             }
 
-            if (!indexes.isEmpty()) {
-                recoveryMgr.recoverIndexes(indexes);
-            }
-
-            datasetResource.markRecovered(partition);
-            return true;
-        } finally {
-            resourceLock.writeLock().unlock();
+            ILSMIndex index = getOrCreateIndex(resource);
+            boolean undoTouch = !resourcePath.equals(resource.getPath());
+            openResource(resource.getPath(), undoTouch);
+            indexes.add(index);
         }
+
+        if (!indexes.isEmpty()) {
+            recoveryMgr.recoverIndexes(indexes);
+        }
+
+        datasetResource.markRecovered(partition);
     }
 
-    private boolean shouldSkipRecoveringResource(LocalResource resource) {
+    private boolean shouldSkipResource(LocalResource resource) {
         DatasetLocalResource lr = (DatasetLocalResource) resource.getResource();
         return MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId())
                 || (lr.getResource() instanceof LSMBTreeLocalResource
                         && ((LSMBTreeLocalResource) lr.getResource()).isSecondaryNoIncrementalMaintenance());
     }
 
-    private IIndex getOrCreateIndex(String resourcePath) throws HyracksDataException {
-        IIndex index = get(resourcePath);
-        if (index != null) {
-            return index;
+    private ILSMIndex getOrCreateIndex(LocalResource resource) throws HyracksDataException {
+        ILSMIndex index = get(resource.getPath());
+        if (index == null) {
+            DatasetLocalResource lr = (DatasetLocalResource) resource.getResource();
+            index = (ILSMIndex) lr.createInstance(serviceCtx);
+            register(resource.getPath(), index);
         }
-        return readIndex(resourcePath);
-    }
-
-    private IIndex readIndex(String resourcePath) throws HyracksDataException {
-        LocalResource lr = resourceRepository.get(resourcePath);
-        if (lr == null) {
-            throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
-        }
-        IResource resource = lr.getResource();
-        return resource.createInstance(serviceCtx);
+        return index;
     }
 
     private void openResource(String resourcePath, boolean undoTouch) throws HyracksDataException {
@@ -492,15 +300,11 @@
         boolean indexTouched = false;
         try {
             if (!iInfo.isOpen()) {
-                synchronized (iInfo) {
-                    if (!iInfo.isOpen()) {
-                        ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
-                        synchronized (opTracker) {
-                            iInfo.getIndex().activate();
-                        }
-                        iInfo.setOpen(true);
-                    }
+                ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
+                synchronized (opTracker) {
+                    iInfo.getIndex().activate();
                 }
+                iInfo.setOpen(true);
             }
             iInfo.touch();
             indexTouched = true;
@@ -530,7 +334,15 @@
         if (dsr != null) {
             return dsr;
         }
-        return datasets.computeIfAbsent(did, k -> new DatasetResource(new DatasetInfo(did, logManager)));
+        synchronized (datasets) {
+            dsr = datasets.get(did);
+            if (dsr == null) {
+                DatasetInfo dsInfo = new DatasetInfo(did, logManager);
+                dsr = new DatasetResource(dsInfo);
+                datasets.put(did, dsr);
+            }
+            return dsr;
+        }
     }
 
     @Override
@@ -539,62 +351,46 @@
     }
 
     @Override
-    public void close(String resourcePath) throws HyracksDataException {
-        stopLock.readLock().lock();
+    public synchronized void close(String resourcePath) throws HyracksDataException {
+        DatasetResource dsr = null;
+        IndexInfo iInfo = null;
         try {
-            DatasetResource dsr = null;
-            IndexInfo iInfo = null;
-
-            // A resource lock may not be necessary if the unregister case does not need handling.
-            ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
-            resourceLock.writeLock().lock();
-            try {
-                validateDatasetLifecycleManagerState();
-
-                int did = getDIDfromResourcePath(resourcePath);
-                long resourceID = getResourceIDfromResourcePath(resourcePath);
-
-                dsr = datasets.get(did);
-                if (dsr == null) {
-                    throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
-                }
-
-                iInfo = dsr.getIndexInfo(resourceID);
-                if (iInfo == null) {
-                    throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
-                }
-
-                lockNotifier.onClose(resourceID);
-            } finally {
-                // Regardless of any exceptions thrown in the try block (e.g., missing index),
-                // we must ensure that the index and dataset are marked as untouched.
-                if (iInfo != null) {
-                    iInfo.untouch();
-                }
-                if (dsr != null) {
-                    dsr.untouch();
-                }
-                resourceLock.writeLock().unlock();
+            validateDatasetLifecycleManagerState();
+            int did = getDIDfromResourcePath(resourcePath);
+            long resourceID = getResourceIDfromResourcePath(resourcePath);
+            dsr = datasets.get(did);
+            if (dsr == null) {
+                throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
             }
+            iInfo = dsr.getIndexInfo(resourceID);
+            if (iInfo == null) {
+                throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
+            }
+            lockNotifier.onClose(resourceID);
         } finally {
-            stopLock.readLock().unlock();
-        }
-    }
-
-    @Override
-    public List<IIndex> getOpenResources() {
-        synchronized (datasets) {
-            List<IndexInfo> openIndexesInfo = getOpenIndexesInfo();
-            List<IIndex> openIndexes = new ArrayList<>();
-            for (IndexInfo iInfo : openIndexesInfo) {
-                openIndexes.add(iInfo.getIndex());
+            // Regardless of what exception is thrown in the try-block (e.g., line 279),
+            // we have to un-touch the index and dataset.
+            if (iInfo != null) {
+                iInfo.untouch();
             }
-            return openIndexes;
+            if (dsr != null) {
+                dsr.untouch();
+            }
         }
     }
 
     @Override
-    public List<IndexInfo> getOpenIndexesInfo() {
+    public synchronized List<IIndex> getOpenResources() {
+        List<IndexInfo> openIndexesInfo = getOpenIndexesInfo();
+        List<IIndex> openIndexes = new ArrayList<>();
+        for (IndexInfo iInfo : openIndexesInfo) {
+            openIndexes.add(iInfo.getIndex());
+        }
+        return openIndexes;
+    }
+
+    @Override
+    public synchronized List<IndexInfo> getOpenIndexesInfo() {
         List<IndexInfo> openIndexesInfo = new ArrayList<>();
         for (DatasetResource dsr : datasets.values()) {
             for (IndexInfo iInfo : dsr.getIndexes().values()) {
@@ -616,50 +412,27 @@
     }
 
     @Override
-    public PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String resourcePath) {
+    public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path) {
         DatasetResource dataset = getDatasetLifecycle(datasetId);
         PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition);
-        if (opTracker != null) {
-            return opTracker;
-        }
-        ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
-        resourceLock.writeLock().lock();
-        try {
+        if (opTracker == null) {
+            populateOpTrackerAndIdGenerator(dataset, partition, path);
             opTracker = dataset.getOpTracker(partition);
-            if (opTracker != null) {
-                return opTracker;
-            }
-            populateOpTrackerAndIdGenerator(dataset, partition, resourcePath);
-            opTracker = dataset.getOpTracker(partition);
-            return opTracker;
-        } finally {
-            resourceLock.writeLock().unlock();
         }
+        return opTracker;
     }
 
     @Override
-    public ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path) {
-        DatasetResource dataset = getDatasetLifecycle(datasetId);
+    public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path) {
+        DatasetResource dataset = datasets.get(datasetId);
         ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition);
-        if (generator != null) {
-            return generator;
-        }
-        ReentrantReadWriteLock resourceLock = getResourceLock(path);
-        resourceLock.writeLock().lock();
-        try {
-            generator = dataset.getComponentIdGenerator(partition);
-            if (generator != null) {
-                return generator;
-            }
+        if (generator == null) {
             populateOpTrackerAndIdGenerator(dataset, partition, path);
             generator = dataset.getComponentIdGenerator(partition);
-            return generator;
-        } finally {
-            resourceLock.writeLock().unlock();
         }
+        return generator;
     }
 
-    // this function is not being used.
     @Override
     public synchronized IRateLimiter getRateLimiter(int datasetId, int partition, long writeRateLimit) {
         DatasetResource dataset = datasets.get(datasetId);
@@ -671,7 +444,7 @@
     }
 
     @Override
-    public boolean isRegistered(int datasetId) {
+    public synchronized boolean isRegistered(int datasetId) {
         return datasets.containsKey(datasetId);
     }
 
@@ -703,39 +476,34 @@
     }
 
     @Override
-    public void flushAllDatasets() throws HyracksDataException {
+    public synchronized void flushAllDatasets() throws HyracksDataException {
         flushAllDatasets(partition -> true);
     }
 
     @Override
-    public void flushAllDatasets(IntPredicate partitions) throws HyracksDataException {
-        synchronized (datasets) {
-            for (DatasetResource dsr : datasets.values()) {
-                if (dsr.getDatasetInfo().isOpen()) {
-                    flushDatasetOpenIndexes(dsr, partitions, false);
-                }
+    public synchronized void flushAllDatasets(IntPredicate partitions) throws HyracksDataException {
+        for (DatasetResource dsr : datasets.values()) {
+            if (dsr.getDatasetInfo().isOpen()) {
+                flushDatasetOpenIndexes(dsr, partitions, false);
             }
         }
     }
 
     @Override
-    public void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
-        synchronized (datasets) {
-            DatasetResource dsr = datasets.get(datasetId);
-            if (dsr != null) {
-                flushDatasetOpenIndexes(dsr, p -> true, asyncFlush);
-            }
+    public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
+        DatasetResource dsr = datasets.get(datasetId);
+        if (dsr != null) {
+            flushDatasetOpenIndexes(dsr, p -> true, asyncFlush);
         }
     }
 
     @Override
-    public void asyncFlushMatchingIndexes(Predicate<ILSMIndex> indexPredicate) throws HyracksDataException {
-        synchronized (datasets) {
-            for (DatasetResource dsr : datasets.values()) {
-                for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
-                    synchronized (opTracker) {
-                        asyncFlush(dsr, opTracker, indexPredicate);
-                    }
+    public synchronized void asyncFlushMatchingIndexes(Predicate<ILSMIndex> indexPredicate)
+            throws HyracksDataException {
+        for (DatasetResource dsr : datasets.values()) {
+            for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
+                synchronized (opTracker) {
+                    asyncFlush(dsr, opTracker, indexPredicate);
                 }
             }
         }
@@ -826,45 +594,38 @@
     }
 
     @Override
-    public void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException {
-        synchronized (datasets) {
-            for (DatasetResource dsr : datasets.values()) {
-                if (dsr.isOpen() && datasetsToClose.contains(dsr.getDatasetID())) {
-                    closeDataset(dsr);
-                }
+    public synchronized void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException {
+        ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
+        for (DatasetResource dsr : openDatasets) {
+            if (dsr.isOpen() && datasetsToClose.contains(dsr.getDatasetID())) {
+                closeDataset(dsr);
             }
         }
     }
 
     @Override
-    public void closeAllDatasets() throws HyracksDataException {
-        synchronized (datasets) {
-            for (DatasetResource dsr : datasets.values()) {
-                if (dsr.isOpen()) {
-                    closeDataset(dsr);
-                }
+    public synchronized void closeAllDatasets() throws HyracksDataException {
+        ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
+        for (DatasetResource dsr : openDatasets) {
+            if (dsr.isOpen()) {
+                closeDataset(dsr);
             }
         }
     }
 
     @Override
     public synchronized void stop(boolean dumpState, OutputStream outputStream) throws IOException {
-        stopLock.writeLock().lock();
-        try {
-            if (stopped) {
-                return;
-            }
-            if (dumpState) {
-                dumpState(outputStream);
-            }
-
-            closeAllDatasets();
-
-            datasets.clear();
-            stopped = true;
-        } finally {
-            stopLock.writeLock().unlock();
+        if (stopped) {
+            return;
         }
+        if (dumpState) {
+            dumpState(outputStream);
+        }
+
+        closeAllDatasets();
+
+        datasets.clear();
+        stopped = true;
     }
 
     @Override
@@ -904,11 +665,9 @@
     @Override
     public void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions)
             throws HyracksDataException {
-        synchronized (datasets) {
-            for (DatasetResource dsr : datasets.values()) {
-                if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
-                    flushDatasetOpenIndexes(dsr, partitions, false);
-                }
+        for (DatasetResource dsr : datasets.values()) {
+            if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
+                flushDatasetOpenIndexes(dsr, partitions, false);
             }
         }
     }
@@ -963,60 +722,47 @@
 
     //TODO refactor this method with unregister method
     @Override
-    public void closeIfOpen(String resourcePath) throws HyracksDataException {
-        stopLock.readLock().lock();
-        try {
-            validateDatasetLifecycleManagerState();
-            ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
-            resourceLock.writeLock().lock();
-            try {
-                int did = getDIDfromResourcePath(resourcePath);
-                long resourceID = getResourceIDfromResourcePath(resourcePath);
+    public synchronized void closeIfOpen(String resourcePath) throws HyracksDataException {
+        validateDatasetLifecycleManagerState();
+        int did = getDIDfromResourcePath(resourcePath);
+        long resourceID = getResourceIDfromResourcePath(resourcePath);
 
-                DatasetResource dsr = datasets.get(did);
-                IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+        DatasetResource dsr = datasets.get(did);
+        IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
 
-                if (dsr == null || iInfo == null) {
-                    return;
-                }
+        if (dsr == null || iInfo == null) {
+            return;
+        }
 
-                PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
-                if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
-                    if (LOGGER.isErrorEnabled()) {
-                        final String logMsg = String.format(
-                                "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
-                                resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
-                        LOGGER.error(logMsg);
-                    }
-                    throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
-                            StoragePathUtil.getIndexNameFromPath(resourcePath));
-                }
-
-                // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
-                DatasetInfo dsInfo = dsr.getDatasetInfo();
-                dsInfo.waitForIO();
-                closeIndex(iInfo);
-                dsInfo.removeIndex(resourceID);
-                synchronized (dsInfo) {
-                    if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
-                            && !dsInfo.isExternal()) {
-                        removeDatasetFromCache(dsInfo.getDatasetID());
-                    }
-                }
-            } finally {
-                resourceLock.writeLock().unlock();
+        PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+        if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+            if (LOGGER.isErrorEnabled()) {
+                final String logMsg = String.format(
+                        "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+                        resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+                LOGGER.error(logMsg);
             }
-        } finally {
-            stopLock.readLock().unlock();
+            throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+                    StoragePathUtil.getIndexNameFromPath(resourcePath));
+        }
+
+        // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+        DatasetInfo dsInfo = dsr.getDatasetInfo();
+        dsInfo.waitForIO();
+        closeIndex(iInfo);
+        dsInfo.removeIndex(resourceID);
+        synchronized (dsInfo) {
+            if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
+                    && !dsInfo.isExternal()) {
+                removeDatasetFromCache(dsInfo.getDatasetID());
+            }
         }
     }
 
     @Override
-    public void closePartition(int partitionId) {
-        synchronized (datasets) {
-            for (DatasetResource ds : datasets.values()) {
-                ds.removePartition(partitionId);
-            }
+    public synchronized void closePartition(int partitionId) {
+        for (DatasetResource ds : datasets.values()) {
+            ds.removePartition(partitionId);
         }
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 0c8f141..8e3081d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -19,10 +19,10 @@
 package org.apache.asterix.common.context;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
@@ -54,9 +54,9 @@
 
     public DatasetResource(DatasetInfo datasetInfo) {
         this.datasetInfo = datasetInfo;
-        this.datasetPrimaryOpTrackers = new ConcurrentHashMap<>();
-        this.datasetComponentIdGenerators = new ConcurrentHashMap<>();
-        this.datasetRateLimiters = new ConcurrentHashMap<>();
+        this.datasetPrimaryOpTrackers = new HashMap<>();
+        this.datasetComponentIdGenerators = new HashMap<>();
+        this.datasetRateLimiters = new HashMap<>();
         this.recoveredPartitions = new HashSet<>();
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 9f0f5c7..2fdfba4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -139,9 +139,11 @@
         return ResourceReference.of(fileAbsolutePath).getFileRelativePath().toString();
     }
 
-    public static String getDatasetPartitionPath(String relativePath) {
+    public static FileReference getIndexRootPath(IIOManager ioManager, String relativePath)
+            throws HyracksDataException {
         int separatorIndex = relativePath.lastIndexOf(File.separatorChar);
-        return relativePath.substring(0, separatorIndex);
+        String parentDirectory = relativePath.substring(0, separatorIndex);
+        return ioManager.resolve(parentDirectory);
     }
 
     /**
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerConcurrentTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerConcurrentTest.java
deleted file mode 100644
index d15a9d4..0000000
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerConcurrentTest.java
+++ /dev/null
@@ -1,2104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.context;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.dataflow.DatasetLocalResource;
-import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
-import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.hyracks.api.application.INCServiceContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.api.replication.IReplicationJob;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
-import org.apache.hyracks.storage.am.lsm.common.cloud.IIndexDiskCacheManager;
-import org.apache.hyracks.storage.common.IIndex;
-import org.apache.hyracks.storage.common.IIndexAccessParameters;
-import org.apache.hyracks.storage.common.IIndexBulkLoader;
-import org.apache.hyracks.storage.common.IIndexCursor;
-import org.apache.hyracks.storage.common.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.ISearchPredicate;
-import org.apache.hyracks.storage.common.LocalResource;
-import org.apache.hyracks.storage.common.buffercache.IBufferCache;
-import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
-import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Concurrent tests for the DatasetLifecycleManager class.
- *
- * This test class verifies that the DatasetLifecycleManager properly handles
- * concurrent register, open, close, and unregister operations on indexes.
- *
- * The tests use mock objects to simulate the index lifecycle and verify:
- * 1. Thread safety of operations
- * 2. Correct handling of re-registration attempts
- * 3. Proper activation/deactivation during open/close
- * 4. Correct identity preservation for index instances
- * 5. Concurrent access to shared resources
- *
- * Each test method focuses on a specific aspect of concurrent behavior:
- * - testConcurrentRegisterAndOpen: Tests register and open operations
- * - testConcurrentRegisterOpenAndUnregister: Tests the full lifecycle
- * - testRegisterAlreadyRegisteredIndex: Tests re-registration behavior
- * - testConcurrentOpenSameIndex: Tests opening the same index concurrently
- * - testConcurrentCloseAfterOpen: Tests closing after open operations
- * - testMockIndexIdentity: Tests the mock setup itself
- */
-public class DatasetLifecycleManagerConcurrentTest {
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    // Configuration constants
-    private static final int NUM_DATASETS = 3;
-    private static final int NUM_PARTITIONS = 2;
-    private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors();
-    private static final int NUM_OPERATIONS_PER_THREAD = 20;
-
-    // For resource paths
-    private static final String DB_NAME = "Default";
-    private static final String SCOPE_NAME = "SDefault";
-    private static final String STORAGE_DIR = "storage";
-
-    private DatasetLifecycleManager datasetLifecycleManager;
-    private INCServiceContext mockServiceContext;
-    private StorageProperties mockStorageProperties;
-    private ILocalResourceRepository mockResourceRepository;
-    private IRecoveryManager mockRecoveryManager;
-    private ILogManager mockLogManager;
-    private IVirtualBufferCache mockVBC;
-    private IIndexCheckpointManagerProvider mockCheckpointProvider;
-    private IDiskResourceCacheLockNotifier mockLockNotifier;
-    private IIOManager mockIOManager;
-
-    private Map<String, MockIndex> mockIndexes;
-    private Map<String, LocalResource> mockResources;
-
-    private ExecutorService executorService;
-    private List<String> createdResourcePaths;
-
-    private Map<Integer, DatasetResource> datasetResourceMap;
-
-    // Add this field after the other private fields
-    private boolean lazyRecoveryEnabled = false;
-
-    /**
-     * Sets up the test environment with mock objects and resources.
-     * This includes:
-     * 1. Creating mock service context and other components
-     * 2. Setting up mock resources with unique IDs
-     * 3. Creating the DatasetLifecycleManager with mock dependencies
-     * 4. Setting up a thread pool for concurrent operations
-     */
-    @Before
-    public void setUp() throws Exception {
-        // Initialize collections
-        mockIndexes = new ConcurrentHashMap<>();
-        mockResources = new ConcurrentHashMap<>();
-        createdResourcePaths = new ArrayList<>();
-        datasetResourceMap = new HashMap<>();
-
-        // Set up mocks
-        mockServiceContext = mock(INCServiceContext.class);
-        mockStorageProperties = mock(StorageProperties.class);
-        mockResourceRepository = mock(ILocalResourceRepository.class);
-        mockRecoveryManager = mock(IRecoveryManager.class);
-        mockLogManager = mock(ILogManager.class);
-        mockVBC = mock(IVirtualBufferCache.class);
-        mockCheckpointProvider = mock(IIndexCheckpointManagerProvider.class);
-        mockLockNotifier = mock(IDiskResourceCacheLockNotifier.class);
-        mockIOManager = mock(IIOManager.class);
-
-        // Mock behavior
-        when(mockServiceContext.getIoManager()).thenReturn(mockIOManager);
-        when(mockStorageProperties.getMemoryComponentsNum()).thenReturn(2);
-        when(mockRecoveryManager.isLazyRecoveryEnabled()).thenReturn(lazyRecoveryEnabled);
-
-        // Create DatasetLifecycleManager FIRST
-        datasetLifecycleManager =
-                new DatasetLifecycleManager(mockServiceContext, mockStorageProperties, mockResourceRepository,
-                        mockRecoveryManager, mockLogManager, mockVBC, mockCheckpointProvider, mockLockNotifier);
-
-        // NEXT get the datasets map via reflection
-        try {
-            Field datasetsField = DatasetLifecycleManager.class.getDeclaredField("datasets");
-            datasetsField.setAccessible(true);
-            @SuppressWarnings("unchecked")
-            Map<Integer, DatasetResource> datasets =
-                    (Map<Integer, DatasetResource>) datasetsField.get(datasetLifecycleManager);
-            if (datasets != null) {
-                LOGGER.info("Accessed datasets map successfully, found {} entries", datasets.size());
-                // Store a direct reference to the actual map
-                datasetResourceMap = datasets;
-            } else {
-                LOGGER.warn("Retrieved datasets map is null");
-            }
-        } catch (Exception e) {
-            LOGGER.error("Failed to access datasets field via reflection", e);
-        }
-
-        // FINALLY setup mock resources (so they get the real datasets map)
-        setupMockResources();
-
-        executorService = Executors.newFixedThreadPool(NUM_THREADS);
-    }
-
-    /**
-     * Helper method to set the lazy recovery flag and reconfigure the mock
-     */
-    public void setLazyRecoveryEnabled(boolean enabled) {
-        this.lazyRecoveryEnabled = enabled;
-        when(mockRecoveryManager.isLazyRecoveryEnabled()).thenReturn(enabled);
-        LOGGER.info("Set lazy recovery enabled to: {}", enabled);
-    }
-
-    /**
-     * Creates mock resources for testing.
-     * For each combination of dataset and partition:
-     * 1. Creates primary and secondary indexes with unique resource IDs
-     * 2. Sets up the repository to return the appropriate resources
-     * 3. Verifies that all resources have unique IDs
-     *
-     * This is crucial for testing the register/open/unregister lifecycle,
-     * as it ensures each resource path maps to a unique mock index.
-     */
-    private void setupMockResources() throws HyracksDataException {
-        LOGGER.debug("Setting up mock resources");
-
-        // Setup mock resources for different datasets and partitions
-        for (int i = 0; i < NUM_DATASETS; i++) {
-            // Use datasetId starting from 101 to avoid reserved IDs (below 100)
-            int datasetId = 101 + i;
-            String datasetName = "ds" + i;
-            for (int j = 0; j < NUM_PARTITIONS; j++) {
-                // Create primary index
-                String primaryPath = getResourcePath(j, datasetName, 0, datasetName);
-                setupMockResource(primaryPath, datasetId, j, datasetId * 100 + j * 10);
-
-                // Create secondary index
-                String secondaryPath = getResourcePath(j, datasetName, 1, datasetName + "_idx");
-                setupMockResource(secondaryPath, datasetId, j, datasetId * 100 + j * 10 + 1);
-
-                createdResourcePaths.add(primaryPath);
-                createdResourcePaths.add(secondaryPath);
-            }
-        }
-
-        // Wire up the mockResourceRepository to return our mock resources
-        when(mockResourceRepository.get(anyString())).thenAnswer(invocation -> {
-            String path = invocation.getArgument(0);
-            LocalResource resource = mockResources.get(path);
-            LOGGER.debug("get({}) returning {}", path, resource);
-            return resource;
-        });
-
-        LOGGER.debug("Created {} mock resources", mockResources.size());
-    }
-
-    /**
-     * Sets up a mock resource for a specific resource path.
-     * For each resource path, this method:
-     * 1. Creates a unique MockIndex instance
-     * 2. Creates a mock DatasetLocalResource that returns the MockIndex
-     * 3. Creates a mock LocalResource with the unique resourceId
-     * 4. Sets up the IO manager to resolve the resource path
-     *
-     * The key pattern here is that each resource path must map to a unique
-     * MockIndex, and each resource must have a unique ID.
-     */
-    private void setupMockResource(String resourcePath, int datasetId, int partition, long resourceId)
-            throws HyracksDataException {
-
-        // Create and store a mock index, passing the datasetResourceMap
-        MockIndex mockIndex = new MockIndex(resourcePath, datasetId, datasetResourceMap);
-        mockIndexes.put(resourcePath, mockIndex);
-
-        // Create a mock dataset local resource
-        DatasetLocalResource mockDatasetResource = mock(DatasetLocalResource.class);
-        when(mockDatasetResource.getDatasetId()).thenReturn(datasetId);
-        when(mockDatasetResource.getPartition()).thenReturn(partition);
-
-        // Important: We need to ensure each createInstance call returns the specific MockIndex 
-        // for this resource path
-        when(mockDatasetResource.createInstance(mockServiceContext)).thenAnswer(invocation -> {
-            return mockIndexes.get(resourcePath);
-        });
-
-        // Create a mock local resource
-        LocalResource mockLocalResource = mock(LocalResource.class);
-        when(mockLocalResource.getId()).thenReturn(resourceId);
-        when(mockLocalResource.getPath()).thenReturn(resourcePath);
-        when(mockLocalResource.getResource()).thenReturn(mockDatasetResource);
-
-        mockResources.put(resourcePath, mockLocalResource);
-
-        // Create a mock file reference
-        FileReference mockFileRef = mock(FileReference.class);
-        when(mockFileRef.getRelativePath()).thenReturn(resourcePath);
-        when(mockIOManager.resolveAbsolutePath(resourcePath)).thenReturn(mockFileRef);
-    }
-
-    /**
-     * Generates a standardized resource path for a given partition, dataset, and index.
-     * Format: storage/partition_X/DbName/ScopeName/DatasetName/ReplicationFactor/ResourceID_IndexName
-     * This ensures consistent resource path formatting throughout the tests.
-     *
-     * Note: The replication factor is always 0 in production environments, so we match that here.
-     */
-    private String getResourcePath(int partition, String datasetName, int resourceId, String indexName) {
-        // Format: storage/partition_X/DbName/ScopeName/DatasetName/ReplicationFactor/IndexName
-        // Always use 0 for replication factor, and include resourceId as part of the index name
-        return String.format("%s/partition_%d/%s/%s/%s/0/%s_%s", STORAGE_DIR, partition, DB_NAME, SCOPE_NAME,
-                datasetName, resourceId, indexName);
-    }
-
-    /**
-     * Cleans up after tests by shutting down the executor service.
-     */
-    @After
-    public void tearDown() throws Exception {
-        executorService.shutdownNow();
-        executorService.awaitTermination(5, TimeUnit.SECONDS);
-        datasetLifecycleManager.stop(false, null);
-    }
-
-    /**
-     * Tests concurrent registration and opening of indexes.
-     * This test verifies that:
-     * 1. Multiple threads can concurrently register and open indexes without causing errors
-     * 2. Re-registration of an already registered index returns the same index instance
-     * 3. Indexes are properly activated when opened
-     * 4. All register and open operations properly maintain index state
-     */
-    @Test
-    public void testConcurrentRegisterIfAbsentAndOpen() throws Exception {
-        LOGGER.info("Starting testConcurrentRegisterAndOpen");
-
-        final CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS);
-        final AtomicInteger successCount = new AtomicInteger(0);
-        final AtomicInteger errorCount = new AtomicInteger(0);
-        final Map<String, IIndex> firstRegistrations = new ConcurrentHashMap<>();
-        final List<Future<?>> futures = new ArrayList<>();
-
-        // Register half of the resources upfront
-        int preRegisteredCount = createdResourcePaths.size() / 2;
-        for (int i = 0; i < preRegisteredCount; i++) {
-            String resourcePath = createdResourcePaths.get(i);
-            IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
-            firstRegistrations.put(resourcePath, index);
-        }
-
-        // Create threads that will randomly register/re-register and open indexes
-        for (int i = 0; i < NUM_THREADS; i++) {
-            futures.add(executorService.submit(() -> {
-                try {
-                    barrier.await(); // Ensure all threads start at the same time
-
-                    Random random = new Random();
-                    for (int j = 0; j < NUM_OPERATIONS_PER_THREAD; j++) {
-                        // Pick a random resource path
-                        String resourcePath = createdResourcePaths.get(random.nextInt(createdResourcePaths.size()));
-
-                        // Randomly choose between register or open
-                        if (random.nextBoolean()) {
-                            IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
-                            assertNotNull("Index should not be null after register", index);
-
-                            // If this path was previously registered, verify it's the same instance
-                            IIndex firstInstance = firstRegistrations.get(resourcePath);
-                            if (firstInstance != null) {
-                                assertSame("Re-registration should return the same index instance", firstInstance,
-                                        index);
-                            } else {
-                                // First time registering this path, record the instance
-                                firstRegistrations.putIfAbsent(resourcePath, index);
-                            }
-
-                            successCount.incrementAndGet();
-                        } else {
-                            try {
-                                // Try to register first if not already registered
-                                if (!firstRegistrations.containsKey(resourcePath)) {
-                                    IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
-                                    firstRegistrations.putIfAbsent(resourcePath, index);
-                                }
-
-                                // Now open it
-                                datasetLifecycleManager.open(resourcePath);
-                                ILSMIndex index = (ILSMIndex) datasetLifecycleManager.get(resourcePath);
-                                assertNotNull("Index should not be null after open", index);
-                                assertTrue("Index should be activated after open", ((MockIndex) index).isActivated());
-                                successCount.incrementAndGet();
-                            } catch (Exception e) {
-                                throw e;
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    errorCount.incrementAndGet();
-                }
-            }));
-        }
-
-        // Wait for all threads to complete
-        for (Future<?> future : futures) {
-            future.get();
-        }
-
-        assertEquals("No unexpected errors should occur", 0, errorCount.get());
-        LOGGER.info("Completed testConcurrentRegisterAndOpen: {} successful operations", successCount.get());
-        assertTrue("Some operations should succeed", successCount.get() > 0);
-
-        // Verify all resources are now registered
-        for (String resourcePath : createdResourcePaths) {
-            IIndex index = firstRegistrations.get(resourcePath);
-            assertNotNull("All resources should be registered", index);
-
-            // Verify one final time that re-registration returns the same instance
-            IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
-            assertSame("Final re-registration should return the same index instance", index, reRegisteredIndex);
-        }
-    }
-
-    /**
-     * Tests concurrent register, open, and unregister operations.
-     * This test verifies that:
-     * 1. Multiple threads can concurrently perform all lifecycle operations (register, open, unregister)
-     * 2. Re-registration of an already registered index returns the same index instance
-     * 3. Resources can be properly unregistered after being opened and closed
-     * 4. Concurrent lifecycle operations do not interfere with each other
-     */
-    @Test
-    public void testConcurrentRegisterIfAbsentOpenAndUnregister() throws Exception {
-        LOGGER.info("Starting testConcurrentRegisterOpenAndUnregister with {} threads", NUM_THREADS);
-
-        final Map<String, IIndex> registeredInstances = new HashMap<>();
-        final AtomicInteger successCount = new AtomicInteger(0);
-        final AtomicInteger expectedErrorCount = new AtomicInteger(0);
-        final AtomicInteger unexpectedErrorCount = new AtomicInteger(0);
-
-        List<Future<?>> futures = new ArrayList<>();
-        for (int i = 0; i < NUM_THREADS; i++) {
-            futures.add(executorService.submit(() -> {
-                try {
-                    final Random random = new Random();
-                    for (int j = 0; j < NUM_OPERATIONS_PER_THREAD; j++) {
-                        // Pick a random resource path
-                        String resourcePath = createdResourcePaths.get(random.nextInt(createdResourcePaths.size()));
-
-                        try {
-                            // Randomly choose between register, open, or unregister
-                            int op = random.nextInt(3);
-                            if (op == 0) {
-                                // Register index
-                                LOGGER.debug("Thread {} registering resource {}", Thread.currentThread().getId(),
-                                        resourcePath);
-
-                                IIndex index = datasetLifecycleManager.get(resourcePath);
-                                synchronized (registeredInstances) {
-                                    registeredInstances.put(resourcePath, index);
-                                }
-
-                                LOGGER.debug("Thread {} registered resource {}", Thread.currentThread().getId(),
-                                        resourcePath);
-                                successCount.incrementAndGet();
-                            } else if (op == 1) {
-                                // Open index
-                                LOGGER.debug("Thread {} opening resource {}", Thread.currentThread().getId(),
-                                        resourcePath);
-
-                                IIndex index;
-                                synchronized (registeredInstances) {
-                                    index = registeredInstances.get(resourcePath);
-                                }
-
-                                if (index != null) {
-                                    try {
-                                        datasetLifecycleManager.open(resourcePath);
-                                        LOGGER.debug("Thread {} opened resource {}", Thread.currentThread().getId(),
-                                                resourcePath);
-                                        successCount.incrementAndGet();
-                                    } catch (HyracksDataException e) {
-                                        if (e.getMessage() != null
-                                                && e.getMessage().contains("HYR0104: Index does not exist")) {
-                                            LOGGER.debug("Thread {} failed to open unregistered resource {}: {}",
-                                                    Thread.currentThread().getId(), resourcePath, e.getMessage());
-                                            expectedErrorCount.incrementAndGet();
-                                        } else {
-                                            LOGGER.error("Thread {} failed to open resource {}: {}",
-                                                    Thread.currentThread().getId(), resourcePath, e.getMessage(), e);
-                                            unexpectedErrorCount.incrementAndGet();
-                                        }
-                                    }
-                                }
-                            } else {
-                                // Unregister index
-                                LOGGER.debug("Thread {} unregistering resource {}", Thread.currentThread().getId(),
-                                        resourcePath);
-
-                                try {
-                                    datasetLifecycleManager.unregister(resourcePath);
-                                    LOGGER.debug("Thread {} unregistered resource {}", Thread.currentThread().getId(),
-                                            resourcePath);
-                                    synchronized (registeredInstances) {
-                                        registeredInstances.remove(resourcePath);
-                                    }
-                                    successCount.incrementAndGet();
-                                } catch (HyracksDataException e) {
-                                    if (e.getMessage() != null
-                                            && (e.getMessage().contains("HYR0104: Index does not exist")
-                                                    || e.getMessage().contains("HYR0105: Cannot drop in-use index"))) {
-                                        LOGGER.debug("Thread {} failed to unregister resource {}: {}",
-                                                Thread.currentThread().getId(), resourcePath, e.getMessage());
-                                        expectedErrorCount.incrementAndGet();
-                                    } else {
-                                        LOGGER.error("Thread {} failed to unregister resource {}: {}",
-                                                Thread.currentThread().getId(), resourcePath, e.getMessage(), e);
-                                        unexpectedErrorCount.incrementAndGet();
-                                    }
-                                }
-                            }
-                        } catch (Exception e) {
-                            LOGGER.error("Thread {} encountered error on resource {}: {}",
-                                    Thread.currentThread().getId(), resourcePath, e.getMessage(), e);
-                            unexpectedErrorCount.incrementAndGet();
-                        }
-                    }
-                } catch (Exception e) {
-                    LOGGER.error("Thread {} encountered unexpected error: {}", Thread.currentThread().getId(),
-                            e.getMessage(), e);
-                    unexpectedErrorCount.incrementAndGet();
-                }
-            }));
-        }
-
-        // Wait for all threads to complete
-        for (Future<?> future : futures) {
-            future.get();
-        }
-
-        // Print final stats
-        int remainingRegistered = registeredInstances.size();
-        LOGGER.info(
-                "testConcurrentRegisterOpenAndUnregister completed - Success: {}, Expected Errors: {}, Unexpected Errors: {}, Remaining registered: {}",
-                successCount.get(), expectedErrorCount.get(), unexpectedErrorCount.get(), remainingRegistered);
-
-        // Check results
-        assertEquals("No unexpected errors should occur", 0, unexpectedErrorCount.get());
-        assertTrue("Some operations should succeed", successCount.get() > 0);
-        LOGGER.info("Expected errors occurred: {} - these are normal in concurrent environment",
-                expectedErrorCount.get());
-    }
-
-    /**
-     * Tests behavior when re-registering already registered indexes.
-     * This test verifies that:
-     * 1. Registering an index returns a valid index instance
-     * 2. Re-registering the same index returns the same instance (not a new one)
-     * 3. Concurrent re-registrations and opens work correctly
-     * 4. The identity of index instances is preserved throughout operations
-     */
-    @Test
-    public void testRegisterIfAbsentAlreadyRegisteredIndex() throws Exception {
-        LOGGER.info("Starting testRegisterAlreadyRegisteredIndex");
-
-        // Register all resources first
-        Map<String, IIndex> firstRegistrations = new HashMap<>();
-        for (String resourcePath : createdResourcePaths) {
-            IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
-            assertNotNull("First registration should succeed", index);
-            firstRegistrations.put(resourcePath, index);
-
-            // Verify the returned index is the same as our mock index
-            MockIndex mockIndex = mockIndexes.get(resourcePath);
-            assertSame("Register should return our mock index for " + resourcePath, mockIndex, index);
-        }
-
-        // Try to register again - should return the same instances without re-registering
-        for (String resourcePath : createdResourcePaths) {
-            IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
-            assertNotNull("Re-registration should succeed", reRegisteredIndex);
-            // Verify we get back the same instance (no re-registration occurred)
-            assertSame("Re-registration should return the same index instance", firstRegistrations.get(resourcePath),
-                    reRegisteredIndex);
-
-            // Double check it's still our mock index
-            MockIndex mockIndex = mockIndexes.get(resourcePath);
-            assertSame("Re-register should still return our mock index for " + resourcePath, mockIndex,
-                    reRegisteredIndex);
-        }
-
-        // Now try concurrent open and re-register operations
-        final CountDownLatch startLatch = new CountDownLatch(1);
-        final CountDownLatch completionLatch = new CountDownLatch(NUM_THREADS);
-        final AtomicInteger openSuccesses = new AtomicInteger(0);
-        final ConcurrentHashMap<String, Integer> reRegisterCounts = new ConcurrentHashMap<>();
-
-        for (int i = 0; i < NUM_THREADS; i++) {
-            final int threadId = i;
-            executorService.submit(() -> {
-                try {
-                    startLatch.await();
-
-                    // Even threads try to open, odd threads try to re-register
-                    if (threadId % 2 == 0) {
-                        String resourcePath = createdResourcePaths.get(threadId % createdResourcePaths.size());
-                        datasetLifecycleManager.open(resourcePath);
-                        ILSMIndex index = (ILSMIndex) datasetLifecycleManager.get(resourcePath);
-                        assertNotNull("Index should not be null after open", index);
-                        assertTrue("Index should be activated after open", ((MockIndex) index).isActivated());
-                        openSuccesses.incrementAndGet();
-                    } else {
-                        String resourcePath = createdResourcePaths.get(threadId % createdResourcePaths.size());
-                        IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
-                        assertNotNull("Re-registration should succeed", reRegisteredIndex);
-                        // Keep track of re-registrations
-                        reRegisterCounts.compute(resourcePath, (k, v) -> v == null ? 1 : v + 1);
-                        // Verify it's the same instance as the original registration
-                        assertSame("Re-registration should return the same index instance",
-                                firstRegistrations.get(resourcePath), reRegisteredIndex);
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                } finally {
-                    completionLatch.countDown();
-                }
-            });
-        }
-
-        // Start all threads
-        startLatch.countDown();
-
-        // Wait for completion
-        completionLatch.await(10, TimeUnit.SECONDS);
-
-        // Verify results
-        assertEquals("Half of threads should succeed in opening", NUM_THREADS / 2, openSuccesses.get());
-
-        // Verify that re-registration attempts occurred for each resource
-        for (String path : createdResourcePaths) {
-            // Some resources should have been re-registered multiple times
-            Integer reRegCount = reRegisterCounts.get(path);
-            if (reRegCount != null) {
-                LOGGER.info("Resource path {} was re-registered {} times", path, reRegCount);
-            }
-        }
-
-        LOGGER.info("Completed testRegisterAlreadyRegisteredIndex: {} successful opens", openSuccesses.get());
-    }
-
-    /**
-     * Tests concurrent opening of the same index by multiple threads.
-     * This test verifies that:
-     * 1. Multiple threads can concurrently open the same index without errors
-     * 2. The index is properly activated after being opened
-     * 3. Concurrent opens do not interfere with each other
-     */
-    @Test
-    public void testConcurrentOpenSameIndex() throws Exception {
-        LOGGER.info("Starting testConcurrentOpenSameIndex");
-
-        final int THREADS = 10;
-        final String RESOURCE_PATH = createdResourcePaths.get(0); // First created resource path
-        final CountDownLatch startLatch = new CountDownLatch(1);
-        final CountDownLatch completionLatch = new CountDownLatch(THREADS);
-        final AtomicInteger errors = new AtomicInteger(0);
-
-        // Register the index first - REQUIRED before open
-        datasetLifecycleManager.registerIfAbsent(RESOURCE_PATH, null);
-
-        // Create threads that will all open the same index
-        for (int i = 0; i < THREADS; i++) {
-            executorService.submit(() -> {
-                try {
-                    startLatch.await();
-                    datasetLifecycleManager.open(RESOURCE_PATH);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    errors.incrementAndGet();
-                } finally {
-                    completionLatch.countDown();
-                }
-            });
-        }
-
-        // Start all threads simultaneously
-        startLatch.countDown();
-
-        // Wait for all threads to complete
-        completionLatch.await(10, TimeUnit.SECONDS);
-
-        assertEquals("No errors should occur when opening the same index concurrently", 0, errors.get());
-
-        // Verify the index is actually open
-        ILSMIndex index = (ILSMIndex) datasetLifecycleManager.get(RESOURCE_PATH);
-        assertNotNull("Index should be retrievable", index);
-        assertTrue("Index should be activated", ((MockIndex) index).isActivated());
-
-        LOGGER.info("Completed testConcurrentOpenSameIndex: index activated={}", ((MockIndex) index).isActivated());
-    }
-
-    /**
-     * Tests concurrent closing of an index after it has been opened.
-     * This test verifies that:
-     * 1. An index can be closed after being opened
-     * 2. Multiple threads can attempt to close the same index
-     * 3. The index is properly deactivated after being closed
-     */
-    @Test
-    public void testConcurrentCloseAfterOpen() throws Exception {
-        LOGGER.info("Starting testConcurrentCloseAfterOpen");
-
-        final String RESOURCE_PATH = createdResourcePaths.get(1); // Second created resource path
-        final CountDownLatch openLatch = new CountDownLatch(1);
-        final CountDownLatch closeLatch = new CountDownLatch(1);
-        final AtomicBoolean openSuccess = new AtomicBoolean(false);
-        final AtomicBoolean closeSuccess = new AtomicBoolean(false);
-
-        // Register the index first
-        datasetLifecycleManager.registerIfAbsent(RESOURCE_PATH, null);
-
-        // Thread to open the index
-        executorService.submit(() -> {
-            try {
-                datasetLifecycleManager.open(RESOURCE_PATH);
-                openSuccess.set(true);
-                openLatch.countDown();
-
-                // Wait a bit to simulate work with the open index
-                Thread.sleep(100);
-            } catch (Exception e) {
-                e.printStackTrace();
-                fail("Open operation failed: " + e.getMessage());
-            }
-        });
-
-        // Thread to close the index after it's opened
-        executorService.submit(() -> {
-            try {
-                openLatch.await(); // Wait for open to complete
-                datasetLifecycleManager.close(RESOURCE_PATH);
-                closeSuccess.set(true);
-                closeLatch.countDown();
-            } catch (Exception e) {
-                e.printStackTrace();
-                fail("Close operation failed: " + e.getMessage());
-            }
-        });
-
-        // Wait for operations to complete
-        closeLatch.await(5, TimeUnit.SECONDS);
-
-        assertTrue("Open operation should succeed", openSuccess.get());
-        assertTrue("Close operation should succeed", closeSuccess.get());
-
-        LOGGER.info("Completed testConcurrentCloseAfterOpen: openSuccess={}, closeSuccess={}", openSuccess.get(),
-                closeSuccess.get());
-    }
-
-    /**
-     * Tests the identity and uniqueness of mock objects and resources.
-     * This test verifies that:
-     * 1. Each resource path has a unique MockIndex instance
-     * 2. Each LocalResource has a unique ID
-     * 3. The createInstance method returns the correct MockIndex for each resource path
-     * 4. Registration returns the expected MockIndex instance
-     *
-     * This test is crucial for ensuring the test infrastructure itself is correctly set up.
-     */
-    @Test
-    public void testMockIndexIdentity() throws HyracksDataException {
-        LOGGER.info("Starting testMockIndexIdentity");
-
-        // Verify that the mockIndexes map is correctly populated
-        assertTrue("Mock indexes should be created", !mockIndexes.isEmpty());
-        assertEquals("Should have created mocks for all resource paths", createdResourcePaths.size(),
-                mockIndexes.size());
-
-        // Verify each local resource has a unique ID
-        Set<Long> resourceIds = new HashSet<>();
-        for (LocalResource resource : mockResources.values()) {
-            long id = resource.getId();
-            if (!resourceIds.add(id)) {
-                fail("Duplicate resource ID found: " + id + " for path: " + resource.getPath());
-            }
-        }
-
-        // For each resource path, verify the mock setup
-        for (String resourcePath : createdResourcePaths) {
-            // Check if we have a mock index for this path
-            MockIndex expectedMockIndex = mockIndexes.get(resourcePath);
-            assertNotNull("Should have a mock index for " + resourcePath, expectedMockIndex);
-
-            // Get the local resource
-            LocalResource lr = mockResourceRepository.get(resourcePath);
-            assertNotNull("Should have a local resource for " + resourcePath, lr);
-
-            // Print resource ID for verification
-            LOGGER.info("Resource path: {}, ID: {}", resourcePath, lr.getId());
-
-            // Get the dataset resource
-            DatasetLocalResource datasetResource = (DatasetLocalResource) lr.getResource();
-            assertNotNull("Should have a dataset resource for " + resourcePath, datasetResource);
-
-            // Call createInstance and verify we get our mock index
-            IIndex createdIndex = datasetResource.createInstance(mockServiceContext);
-            assertSame("createInstance should return our mock index for " + resourcePath, expectedMockIndex,
-                    createdIndex);
-
-            // Now register and verify we get the same mock index
-            IIndex registeredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
-            assertSame("Registered index should be our mock index for " + resourcePath, expectedMockIndex,
-                    registeredIndex);
-
-            // Confirm the reference equality of these objects
-            LOGGER.info("Path: {}", resourcePath);
-            LOGGER.info("  Expected mock: {}", System.identityHashCode(expectedMockIndex));
-            LOGGER.info("  Created instance: {}", System.identityHashCode(createdIndex));
-            LOGGER.info("  Registered instance: {}", System.identityHashCode(registeredIndex));
-        }
-
-        LOGGER.info("Completed testMockIndexIdentity: verified {} resources", createdResourcePaths.size());
-    }
-
-    /**
-     * Tests that unregistering indexes works correctly, but only after they have been properly registered and opened.
-     * This test verifies that:
-     * 1. Indexes can be registered and opened successfully
-     * 2. All indexes are properly activated after opening
-     * 3. Indexes can be unregistered after being registered and opened
-     * 4. After unregistering, the indexes are no longer retrievable
-     * 5. The correct lifecycle sequence (register → open → unregister) is enforced
-     */
-    @Test
-    public void testUnregisterAfterRegisterIfAbsentAndOpen() throws Exception {
-        LOGGER.info("Starting testUnregisterAfterRegisterAndOpen");
-
-        Map<String, IIndex> registeredIndexes = new HashMap<>();
-        int totalIndexes = createdResourcePaths.size();
-        AtomicInteger successfulUnregisters = new AtomicInteger(0);
-
-        // Step 1: Register and open all indexes sequentially
-        for (String resourcePath : createdResourcePaths) {
-            LOGGER.debug("Registering and opening: {}", resourcePath);
-
-            // Register the index
-            IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
-            assertNotNull("Index should be registered successfully", index);
-            registeredIndexes.put(resourcePath, index);
-
-            // After registration, we should have a DatasetResource
-            LocalResource resource = mockResources.get(resourcePath);
-            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
-            int datasetId = datasetLocalResource.getDatasetId();
-            int partition = datasetLocalResource.getPartition();
-
-            // Check if we have a DatasetResource in our map
-            DatasetResource datasetResource = datasetResourceMap.get(datasetId);
-            if (datasetResource != null) {
-                LOGGER.debug("Found DatasetResource for dataset {}", datasetId);
-
-                // See if there's an operation tracker for this partition already
-                ILSMOperationTracker existingTracker = null;
-                try {
-                    // Use reflection to get the datasetPrimaryOpTrackers map
-                    Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
-                    opTrackersField.setAccessible(true);
-                    Map<Integer, ILSMOperationTracker> opTrackers =
-                            (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
-
-                    // Check if we already have a tracker for this partition
-                    existingTracker = opTrackers.get(partition);
-                    LOGGER.debug("Existing tracker for partition {}: {}", partition, existingTracker);
-                } catch (Exception e) {
-                    LOGGER.error("Failed to access opTrackers field via reflection", e);
-                }
-            } else {
-                LOGGER.debug("No DatasetResource found for dataset {}", datasetId);
-            }
-
-            // Open the index
-            datasetLifecycleManager.open(resourcePath);
-
-            // Verify index is activated
-            MockIndex mockIndex = (MockIndex) index;
-            assertTrue("Index should be activated after opening: " + resourcePath, mockIndex.isActivated());
-
-            // Verify it's retrievable
-            long resourceId = resource.getId();
-            IIndex retrievedIndex = datasetLifecycleManager.getIndex(datasetId, resourceId);
-            assertSame("Retrieved index should be the same instance", index, retrievedIndex);
-        }
-
-        LOGGER.info("All {} indexes registered and opened successfully", totalIndexes);
-
-        // Step 2: Close and then unregister each index
-        for (String resourcePath : createdResourcePaths) {
-            LOGGER.debug("Closing and unregistering: {}", resourcePath);
-
-            // Verify the index is activated before closing
-            IIndex index = registeredIndexes.get(resourcePath);
-            MockIndex mockIndex = (MockIndex) index;
-            assertTrue("Index should be activated before closing: " + resourcePath, mockIndex.isActivated());
-
-            // Get resource information before closing
-            LocalResource resource = mockResources.get(resourcePath);
-            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
-            int datasetId = datasetLocalResource.getDatasetId();
-            int partition = datasetLocalResource.getPartition();
-
-            // Close the index first to ensure reference count goes to 0
-            LOGGER.debug("Closing index: {}", resourcePath);
-            datasetLifecycleManager.close(resourcePath);
-
-            // Brief pause to allow any asynchronous operations to complete
-            Thread.sleep(50);
-
-            // Get the operation tracker and verify it has 0 active operations
-            DatasetResource datasetResource = datasetResourceMap.get(datasetId);
-            ILSMOperationTracker opTracker = mockIndex.getOperationTracker();
-
-            LOGGER.debug("Before unregister: Resource path={}, datasetId={}, partition={}, tracker={}", resourcePath,
-                    datasetId, partition, opTracker);
-
-            // Unregister the index after it's closed
-            LOGGER.debug("Unregistering index: {}", resourcePath);
-            try {
-                datasetLifecycleManager.unregister(resourcePath);
-                successfulUnregisters.incrementAndGet();
-                LOGGER.debug("Successfully unregistered index: {}", resourcePath);
-            } catch (Exception e) {
-                LOGGER.error("Failed to unregister index: {}", resourcePath, e);
-                fail("Failed to unregister index: " + resourcePath + ": " + e.getMessage());
-            }
-
-            // Verify the index is no longer retrievable
-            try {
-                LocalResource resourceAfterUnregister = mockResources.get(resourcePath);
-                long resourceIdAfterUnregister = resourceAfterUnregister.getId();
-                IIndex retrievedIndexAfterUnregister =
-                        datasetLifecycleManager.getIndex(datasetId, resourceIdAfterUnregister);
-                Assert.assertNull("Index should not be retrievable after unregister: " + resourcePath,
-                        retrievedIndexAfterUnregister);
-            } catch (HyracksDataException e) {
-                // This is also an acceptable outcome if getIndex throws an exception for unregistered indexes
-                LOGGER.debug("Expected exception when retrieving unregistered index: {}", e.getMessage());
-            }
-        }
-
-        assertEquals("All indexes should be unregistered", totalIndexes, successfulUnregisters.get());
-        LOGGER.info("Completed testUnregisterAfterRegisterAndOpen: successfully unregistered {} indexes",
-                successfulUnregisters.get());
-    }
-
-    /**
-     * Tests that only one primary operation tracker is created per partition per dataset.
-     * This test verifies that:
-     * 1. When multiple indexes for the same dataset and partition are registered, they share the same operation tracker
-     * 2. Different partitions of the same dataset have different operation trackers
-     * 3. Different datasets have completely separate operation trackers
-     */
-    @Test
-    public void testPrimaryOperationTrackerSingularity() throws Exception {
-        LOGGER.info("Starting testPrimaryOperationTrackerSingularity");
-
-        // Maps to track operation trackers by dataset and partition
-        Map<Integer, Map<Integer, Object>> datasetToPartitionToTracker = new HashMap<>();
-        Map<String, IIndex> registeredIndexes = new HashMap<>();
-
-        // Step 1: Register all indexes first
-        for (String resourcePath : createdResourcePaths) {
-            LOGGER.debug("Registering index: {}", resourcePath);
-
-            // Register the index
-            IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
-            assertNotNull("Index should be registered successfully", index);
-            registeredIndexes.put(resourcePath, index);
-
-            // Get dataset and partition information
-            LocalResource resource = mockResources.get(resourcePath);
-            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
-            int datasetId = datasetLocalResource.getDatasetId();
-            int partition = datasetLocalResource.getPartition();
-
-            // Check if we have a DatasetResource in our map
-            DatasetResource datasetResource = datasetResourceMap.get(datasetId);
-            assertNotNull("DatasetResource should be created for dataset " + datasetId, datasetResource);
-
-            // At this point, operation tracker might not exist yet since it's created lazily during open()
-            try {
-                // Use reflection to get the datasetPrimaryOpTrackers map
-                Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
-                opTrackersField.setAccessible(true);
-                Map<Integer, ILSMOperationTracker> opTrackers =
-                        (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
-
-                // Check if tracker already exists (might be null)
-                Object existingTracker = opTrackers.get(partition);
-                LOGGER.debug("Before open(): Tracker for dataset {} partition {}: {}", datasetId, partition,
-                        existingTracker);
-            } catch (Exception e) {
-                LOGGER.error("Failed to access opTrackers field via reflection", e);
-            }
-        }
-
-        // Step 2: Open all indexes to trigger operation tracker creation
-        for (String resourcePath : createdResourcePaths) {
-            LOGGER.debug("Opening index: {}", resourcePath);
-
-            // Open the index to trigger operation tracker creation
-            datasetLifecycleManager.open(resourcePath);
-
-            // Get dataset and partition information
-            LocalResource resource = mockResources.get(resourcePath);
-            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
-            int datasetId = datasetLocalResource.getDatasetId();
-            int partition = datasetLocalResource.getPartition();
-
-            // Now the operation tracker should exist
-            DatasetResource datasetResource = datasetResourceMap.get(datasetId);
-            Object opTracker = null;
-            try {
-                // Use reflection to get the datasetPrimaryOpTrackers map
-                Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
-                opTrackersField.setAccessible(true);
-                Map<Integer, ILSMOperationTracker> opTrackers =
-                        (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
-
-                // Get the tracker for this partition
-                opTracker = opTrackers.get(partition);
-                assertNotNull("Operation tracker should exist after open() for partition " + partition, opTracker);
-
-                LOGGER.debug("After open(): Found tracker for dataset {} partition {}: {}", datasetId, partition,
-                        System.identityHashCode(opTracker));
-            } catch (Exception e) {
-                LOGGER.error("Failed to access opTrackers field via reflection", e);
-                fail("Failed to access operation trackers: " + e.getMessage());
-            }
-
-            // Store the tracker in our map for later comparison
-            datasetToPartitionToTracker.computeIfAbsent(datasetId, k -> new HashMap<>()).putIfAbsent(partition,
-                    opTracker);
-        }
-
-        // Step 3: Create and register secondary indexes for each dataset/partition
-        List<String> secondaryPaths = new ArrayList<>();
-        for (String resourcePath : createdResourcePaths) {
-            // Create a "secondary index" path by appending "-secondary" to the resource path
-            String secondaryPath = resourcePath + "-secondary";
-            secondaryPaths.add(secondaryPath);
-
-            // Create a mock resource for this secondary index
-            LocalResource originalResource = mockResources.get(resourcePath);
-            DatasetLocalResource originalDatasetLocalResource = (DatasetLocalResource) originalResource.getResource();
-            int datasetId = originalDatasetLocalResource.getDatasetId();
-            int partition = originalDatasetLocalResource.getPartition();
-
-            // Set up a new mock resource with the same dataset and partition but a different path
-            setupMockResource(secondaryPath, datasetId, partition, originalResource.getId() + 10000);
-
-            LOGGER.debug("Registering secondary index: {}", secondaryPath);
-
-            // Register the secondary index
-            IIndex secondaryIndex = datasetLifecycleManager.registerIfAbsent(secondaryPath, null);
-            assertNotNull("Secondary index should be registered successfully", secondaryIndex);
-            registeredIndexes.put(secondaryPath, secondaryIndex);
-        }
-
-        // Step 4: Open the secondary indexes to trigger operation tracker reuse
-        for (String secondaryPath : secondaryPaths) {
-            LOGGER.debug("Opening secondary index: {}", secondaryPath);
-
-            // Open the secondary index
-            datasetLifecycleManager.open(secondaryPath);
-
-            // Get dataset and partition information
-            LocalResource resource = mockResources.get(secondaryPath);
-            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
-            int datasetId = datasetLocalResource.getDatasetId();
-            int partition = datasetLocalResource.getPartition();
-
-            // Get the operation tracker after opening
-            DatasetResource datasetResource = datasetResourceMap.get(datasetId);
-            Object secondaryOpTracker = null;
-            try {
-                // Use reflection to get the datasetPrimaryOpTrackers map
-                Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
-                opTrackersField.setAccessible(true);
-                Map<Integer, ILSMOperationTracker> opTrackers =
-                        (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
-
-                // Get the tracker for this partition
-                secondaryOpTracker = opTrackers.get(partition);
-                assertNotNull("Operation tracker should exist for secondary index partition " + partition,
-                        secondaryOpTracker);
-
-                LOGGER.debug("After opening secondary: Found tracker for dataset {} partition {}: {}", datasetId,
-                        partition, System.identityHashCode(secondaryOpTracker));
-            } catch (Exception e) {
-                LOGGER.error("Failed to access opTrackers field via reflection", e);
-                fail("Failed to access operation trackers: " + e.getMessage());
-            }
-
-            // Verify this is the same tracker as the one used by the primary index
-            Object originalTracker = datasetToPartitionToTracker.get(datasetId).get(partition);
-            assertSame("Operation tracker should be reused for the same dataset/partition", originalTracker,
-                    secondaryOpTracker);
-        }
-
-        // Step 5: Verify that different partitions of the same dataset have different operation trackers
-        for (Map.Entry<Integer, Map<Integer, Object>> datasetEntry : datasetToPartitionToTracker.entrySet()) {
-            int datasetId = datasetEntry.getKey();
-            Map<Integer, Object> partitionTrackers = datasetEntry.getValue();
-
-            if (partitionTrackers.size() > 1) {
-                LOGGER.debug("Verifying different trackers for different partitions in dataset {}", datasetId);
-
-                // Collect all the tracker instances for this dataset
-                Set<Object> uniqueTrackers = new HashSet<>(partitionTrackers.values());
-
-                // The number of unique trackers should equal the number of partitions
-                assertEquals("Each partition should have its own operation tracker", partitionTrackers.size(),
-                        uniqueTrackers.size());
-            }
-        }
-
-        // Step 6: Verify that different datasets have different operation trackers for the same partition
-        if (datasetToPartitionToTracker.size() > 1) {
-            for (int partition = 0; partition < NUM_PARTITIONS; partition++) {
-                final int partitionId = partition;
-
-                // Collect all trackers for this partition across different datasets
-                List<Object> trackersForPartition = datasetToPartitionToTracker.entrySet().stream()
-                        .filter(entry -> entry.getValue().containsKey(partitionId))
-                        .map(entry -> entry.getValue().get(partitionId)).collect(Collectors.toList());
-
-                if (trackersForPartition.size() > 1) {
-                    LOGGER.debug("Verifying different trackers across datasets for partition {}", partitionId);
-
-                    // All trackers should be unique (no sharing between datasets)
-                    Set<Object> uniqueTrackers = new HashSet<>(trackersForPartition);
-                    assertEquals("Each dataset should have its own operation tracker for partition " + partitionId,
-                            trackersForPartition.size(), uniqueTrackers.size());
-                }
-            }
-        }
-
-        // Step 7: Clean up - close all indexes
-        for (String path : new ArrayList<>(registeredIndexes.keySet())) {
-            try {
-                datasetLifecycleManager.close(path);
-            } catch (Exception e) {
-                LOGGER.error("Error closing index {}", path, e);
-            }
-        }
-
-        LOGGER.info(
-                "Completed testPrimaryOperationTrackerSingularity: verified unique trackers for each dataset/partition combination");
-    }
-
-    /**
-     * Tests that resources cannot be opened after unregistering unless re-registered first.
-     * This test verifies:
-     * 1. Resources must be registered before they can be opened
-     * 2. After unregistering, resources cannot be opened until re-registered
-     * 3. Re-registration of an unregistered resource works correctly
-     */
-    @Test
-    public void testCannotOpenUnregisteredResource() throws Exception {
-        LOGGER.info("Starting testCannotOpenUnregisteredResource");
-
-        String resourcePath = createdResourcePaths.get(0);
-
-        // Register resource first
-        IIndex index = registerIfAbsentIndex(resourcePath);
-        assertNotNull("Index should be registered successfully", index);
-        LOGGER.debug("Registered resource {}", resourcePath);
-
-        // Unregister it
-        datasetLifecycleManager.unregister(resourcePath);
-        LOGGER.debug("Unregistered resource {}", resourcePath);
-
-        // Now try to open it - should fail
-        try {
-            datasetLifecycleManager.open(resourcePath);
-            fail("Should not be able to open an unregistered resource");
-        } catch (HyracksDataException e) {
-            LOGGER.debug("Caught expected exception: {}", e.getMessage());
-            assertTrue("Exception should mention index not existing", e.getMessage().contains("does not exist"));
-        }
-
-        // Re-register should work
-        MockIndex mockIndex = mockIndexes.get(resourcePath);
-        IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, mockIndex);
-        assertNotNull("Index should be re-registered successfully", reRegisteredIndex);
-        LOGGER.debug("Re-registered resource {}", resourcePath);
-
-        // Now open should work
-        datasetLifecycleManager.open(resourcePath);
-        LOGGER.debug("Successfully opened resource after re-registering");
-
-        LOGGER.info("Completed testCannotOpenUnregisteredResource");
-    }
-
-    /**
-     * Tests the full lifecycle of operation trackers to ensure they're properly created,
-     * maintained throughout the index lifecycle, and cleaned up at the appropriate time.
-     * This test verifies that:
-     * 1. Operation trackers are created when indexes are opened for the first time
-     * 2. Operation trackers remain stable during index usage
-     * 3. Operation trackers are properly shared among indexes of the same dataset/partition
-     * 4. Operation trackers are not prematurely nullified or cleared
-     * 5. Operation trackers are correctly removed when all indexes of a dataset/partition are unregistered
-     */
-    @Test
-    public void testOperationTrackerLifecycle() throws Exception {
-        // Register primary index
-        String dsName = "ds0";
-        int dsId = 101; // Updated from 0 to 101 to match our new datasetId scheme
-        int partition = 0;
-        String primaryIndexName = dsName;
-        String secondaryIndexName = dsName + "_idx";
-        String primaryResourcePath = getResourcePath(partition, dsName, 0, primaryIndexName);
-        String secondaryResourcePath = getResourcePath(partition, dsName, 1, secondaryIndexName);
-
-        LOGGER.debug("Registering primary index at {}", primaryResourcePath);
-        MockIndex mockPrimaryIndex = mockIndexes.get(primaryResourcePath);
-        IIndex registeredIndex = datasetLifecycleManager.registerIfAbsent(primaryResourcePath, mockPrimaryIndex);
-        assertNotNull("Index should be registered successfully", registeredIndex);
-
-        // Get the dataset resource and check if it has been created
-        DatasetResource datasetResource = datasetResourceMap.get(dsId);
-        assertNotNull("Dataset resource should be created during registration", datasetResource);
-
-        // Verify there's no operation tracker before opening
-        ILSMOperationTracker trackerBeforeOpen = getOperationTracker(datasetResource, partition);
-        LOGGER.debug("Operation tracker before open: {}", trackerBeforeOpen);
-
-        // Open primary index which should create and register an operation tracker
-        LOGGER.debug("Opening primary index at {}", primaryResourcePath);
-        datasetLifecycleManager.open(primaryResourcePath);
-
-        // Verify operation tracker exists after opening
-        ILSMOperationTracker trackerAfterOpen = getOperationTracker(datasetResource, partition);
-        LOGGER.debug("Operation tracker after primary open: {}", trackerAfterOpen);
-        assertNotNull("Operation tracker should be created after opening primary index", trackerAfterOpen);
-
-        // Verify the tracker is from our mock index
-        MockIndex mockIndex = mockIndexes.get(primaryResourcePath);
-        assertTrue("Mock index should be activated after open", mockIndex.isActivated());
-
-        // Compare with the mock tracker from the index
-        ILSMOperationTracker mockTracker = mockIndex.getOperationTracker();
-        LOGGER.debug("Mock tracker from index: {}", mockTracker);
-        assertEquals("Operation tracker should be the one from the mock index", mockTracker, trackerAfterOpen);
-
-        // Open secondary index which should reuse the same operation tracker
-        LOGGER.debug("Registering secondary index at {}", secondaryResourcePath);
-        MockIndex mockSecondaryIndex = mockIndexes.get(secondaryResourcePath);
-        datasetLifecycleManager.registerIfAbsent(secondaryResourcePath, mockSecondaryIndex);
-
-        LOGGER.debug("Opening secondary index at {}", secondaryResourcePath);
-        datasetLifecycleManager.open(secondaryResourcePath);
-
-        // Verify operation tracker is still the same after opening secondary
-        ILSMOperationTracker trackerAfterSecondaryOpen = getOperationTracker(datasetResource, partition);
-        LOGGER.debug("Operation tracker after secondary open: {}", trackerAfterSecondaryOpen);
-        assertNotNull("Operation tracker should still exist after opening secondary index", trackerAfterSecondaryOpen);
-        assertEquals("Should still be the same operation tracker", trackerAfterOpen, trackerAfterSecondaryOpen);
-
-        // Close primary index - should keep the tracker as secondary is still open
-        LOGGER.debug("Closing primary index at {}", primaryResourcePath);
-        datasetLifecycleManager.close(primaryResourcePath);
-
-        // Verify operation tracker still exists
-        ILSMOperationTracker trackerAfterPrimaryClose = getOperationTracker(datasetResource, partition);
-        LOGGER.debug("Operation tracker after primary close: {}", trackerAfterPrimaryClose);
-        assertNotNull("Operation tracker should still exist after closing primary index", trackerAfterPrimaryClose);
-        assertEquals("Should still be the same operation tracker", trackerAfterOpen, trackerAfterPrimaryClose);
-
-        // Close secondary index - should remove the tracker as all indexes are closed
-        LOGGER.debug("Closing secondary index at {}", secondaryResourcePath);
-        datasetLifecycleManager.close(secondaryResourcePath);
-
-        // Verify operation tracker is removed
-        ILSMOperationTracker trackerAfterSecondaryClose = getOperationTracker(datasetResource, partition);
-        LOGGER.debug("Operation tracker after secondary close: {}", trackerAfterSecondaryClose);
-
-        // Unregister indexes
-        LOGGER.debug("Unregistering primary index at {}", primaryResourcePath);
-        datasetLifecycleManager.unregister(primaryResourcePath);
-
-        LOGGER.debug("Unregistering secondary index at {}", secondaryResourcePath);
-        datasetLifecycleManager.unregister(secondaryResourcePath);
-
-        // Verify dataset resource is removed
-        DatasetResource datasetResourceAfterUnregister = datasetResourceMap.get(dsId);
-        LOGGER.debug("Dataset resource after unregister: {}", datasetResourceAfterUnregister);
-    }
-
-    /**
-     * Helper method to get the operation tracker for a dataset/partition.
-     * Uses reflection to access the private datasetPrimaryOpTrackers map in DatasetResource.
-     */
-    private ILSMOperationTracker getOperationTracker(DatasetResource datasetResource, int partition) {
-        try {
-            // Access the datasetPrimaryOpTrackers map through reflection
-            Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
-            opTrackersField.setAccessible(true);
-            @SuppressWarnings("unchecked")
-            Map<Integer, ILSMOperationTracker> opTrackers =
-                    (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
-
-            // Return the operation tracker for this partition, if it exists
-            if (opTrackers != null) {
-                ILSMOperationTracker tracker = opTrackers.get(partition);
-                if (tracker != null) {
-                    // Make sure we're getting a PrimaryIndexOperationTracker
-                    if (!(tracker instanceof PrimaryIndexOperationTracker)) {
-                        LOGGER.warn("Tracker is not a PrimaryIndexOperationTracker: {}", tracker.getClass().getName());
-                    }
-                }
-                return tracker;
-            }
-        } catch (Exception e) {
-            LOGGER.error("Failed to get operation tracker via reflection", e);
-        }
-        return null;
-    }
-
-    /**
-     * Tests that operation trackers are properly attached to indexes and survive open and close operations.
-     * This test verifies:
-     * 1. Operation trackers are properly created and associated with indexes
-     * 2. Operation trackers remain valid during the index lifecycle
-     * 3. Multiple opens and closes don't invalidate the operation tracker
-     */
-    @Test
-    public void testOperationTrackerAttachment() throws Exception {
-        LOGGER.info("Starting testOperationTrackerAttachment");
-
-        // Choose a specific resource path to test with
-        String resourcePath = createdResourcePaths.get(0);
-
-        // Get dataset and partition information for this resource
-        LocalResource resource = mockResources.get(resourcePath);
-        DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
-        int datasetId = datasetLocalResource.getDatasetId();
-        int partition = datasetLocalResource.getPartition();
-
-        // Register the resource first using our helper
-        IIndex index = registerIfAbsentIndex(resourcePath);
-        assertNotNull("Index should be registered successfully", index);
-        LOGGER.debug("Registered resource {}", resourcePath);
-
-        // Get the dataset resource
-        DatasetResource datasetResource = datasetResourceMap.get(datasetId);
-        assertNotNull("DatasetResource should exist after registration", datasetResource);
-
-        // Verify no operation tracker exists before opening
-        ILSMOperationTracker trackerBeforeOpen = getOperationTracker(datasetResource, partition);
-        LOGGER.debug("Operation tracker before open: {}", trackerBeforeOpen);
-
-        // Open the index - this should create and attach an operation tracker
-        datasetLifecycleManager.open(resourcePath);
-        LOGGER.debug("Opened resource {}", resourcePath);
-
-        // Verify the operation tracker exists and is attached to the MockIndex
-        ILSMOperationTracker trackerAfterOpen = getOperationTracker(datasetResource, partition);
-        assertNotNull("Operation tracker should exist after open", trackerAfterOpen);
-        LOGGER.debug("Operation tracker after open: {}", trackerAfterOpen);
-
-        // Get the mock index and verify its tracker matches
-        MockIndex mockIndex = mockIndexes.get(resourcePath);
-        assertTrue("Mock index should be activated", mockIndex.isActivated());
-
-        // Verify the mock index has the same operation tracker
-        ILSMOperationTracker indexTracker = mockIndex.getOperationTracker();
-        assertSame("Mock index should have the same operation tracker", trackerAfterOpen, indexTracker);
-
-        // Close and unregister for cleanup
-        datasetLifecycleManager.close(resourcePath);
-        datasetLifecycleManager.unregister(resourcePath);
-
-        LOGGER.info("Completed testOperationTrackerAttachment");
-    }
-
-    /**
-     * A mock implementation of ILSMIndex for testing
-     */
-    private static class MockIndex implements ILSMIndex {
-        private final String resourcePath;
-        private final int datasetId;
-        private boolean activated = false;
-        // Create a mock of the specific PrimaryIndexOperationTracker class instead of the generic interface
-        private PrimaryIndexOperationTracker mockTracker = mock(PrimaryIndexOperationTracker.class);
-        private final Map<Integer, DatasetResource> datasetResourceMap;
-
-        public MockIndex(String resourcePath, int datasetId, Map<Integer, DatasetResource> datasetResourceMap) {
-            this.resourcePath = resourcePath;
-            this.datasetId = datasetId;
-            this.datasetResourceMap = datasetResourceMap;
-        }
-
-        @Override
-        public void create() throws HyracksDataException {
-
-        }
-
-        @Override
-        public void activate() {
-            LOGGER.debug("Activating {}", this);
-            activated = true;
-        }
-
-        @Override
-        public void clear() throws HyracksDataException {
-
-        }
-
-        @Override
-        public void deactivate() throws HyracksDataException {
-            LOGGER.debug("Deactivating {}", this);
-            activated = false;
-        }
-
-        @Override
-        public void destroy() throws HyracksDataException {
-
-        }
-
-        @Override
-        public void purge() throws HyracksDataException {
-
-        }
-
-        @Override
-        public void deactivate(boolean flushOnExit) {
-            LOGGER.debug("Deactivating {} with flushOnExit={}", this, flushOnExit);
-            activated = false;
-        }
-
-        @Override
-        public ILSMIndexAccessor createAccessor(IIndexAccessParameters iap) throws HyracksDataException {
-            return null;
-        }
-
-        @Override
-        public void validate() throws HyracksDataException {
-
-        }
-
-        @Override
-        public IBufferCache getBufferCache() {
-            return null;
-        }
-
-        @Override
-        public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
-                boolean checkIfEmptyIndex, IPageWriteCallback callback) throws HyracksDataException {
-            return null;
-        }
-
-        @Override
-        public int getNumOfFilterFields() {
-            return 0;
-        }
-
-        public boolean isActivated() {
-            return activated;
-        }
-
-        @Override
-        public String toString() {
-            // Ensure this matches exactly the resourcePath used for registration
-            return resourcePath;
-        }
-
-        @Override
-        public boolean isCurrentMutableComponentEmpty() {
-            return true;
-        }
-
-        @Override
-        public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> diskComponents,
-                IReplicationJob.ReplicationOperation operation, LSMOperationType opType) throws HyracksDataException {
-
-        }
-
-        @Override
-        public boolean isMemoryComponentsAllocated() {
-            return false;
-        }
-
-        @Override
-        public void allocateMemoryComponents() throws HyracksDataException {
-
-        }
-
-        @Override
-        public ILSMMemoryComponent getCurrentMemoryComponent() {
-            return null;
-        }
-
-        @Override
-        public int getCurrentMemoryComponentIndex() {
-            return 0;
-        }
-
-        @Override
-        public List<ILSMMemoryComponent> getMemoryComponents() {
-            return List.of();
-        }
-
-        @Override
-        public boolean isDurable() {
-            return false;
-        }
-
-        @Override
-        public void updateFilter(ILSMIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException {
-
-        }
-
-        @Override
-        public ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException {
-            return null;
-        }
-
-        @Override
-        public int getNumberOfAllMemoryComponents() {
-            return 0;
-        }
-
-        @Override
-        public ILSMHarness getHarness() {
-            return null;
-        }
-
-        @Override
-        public String getIndexIdentifier() {
-            return "";
-        }
-
-        @Override
-        public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
-                boolean checkIfEmptyIndex, Map<String, Object> parameters) throws HyracksDataException {
-            return null;
-        }
-
-        @Override
-        public void resetCurrentComponentIndex() {
-
-        }
-
-        @Override
-        public IIndexDiskCacheManager getDiskCacheManager() {
-            return null;
-        }
-
-        @Override
-        public void scheduleCleanup(List<ILSMDiskComponent> inactiveDiskComponents) throws HyracksDataException {
-
-        }
-
-        @Override
-        public boolean isAtomic() {
-            return false;
-        }
-
-        @Override
-        public void commit() throws HyracksDataException {
-
-        }
-
-        @Override
-        public void abort() throws HyracksDataException {
-
-        }
-
-        @Override
-        public ILSMMergePolicy getMergePolicy() {
-            return null;
-        }
-
-        @Override
-        public ILSMOperationTracker getOperationTracker() {
-            // This is the key method that DatasetLifecycleManager calls during open()
-            // to get the operation tracker and register it
-
-            LOGGER.debug("getOperationTracker() called for index: {}", resourcePath);
-
-            // Get dataset ID and partition from the resource path
-            int partition = -1;
-
-            // Extract partition from resource path (storage/partition_X/...)
-            String[] parts = resourcePath.split("/");
-            for (int i = 0; i < parts.length; i++) {
-                if (parts[i].startsWith("partition_")) {
-                    try {
-                        partition = Integer.parseInt(parts[i].substring("partition_".length()));
-                        break;
-                    } catch (NumberFormatException e) {
-                        LOGGER.warn("Failed to parse partition number from {}", parts[i]);
-                    }
-                }
-            }
-
-            if (partition != -1) {
-                // Find the dataset resource from our map
-                DatasetResource datasetResource = datasetResourceMap.get(datasetId);
-                if (datasetResource != null) {
-                    try {
-                        // Access the datasetPrimaryOpTrackers map through reflection
-                        Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
-                        opTrackersField.setAccessible(true);
-                        @SuppressWarnings("unchecked")
-                        Map<Integer, ILSMOperationTracker> opTrackers =
-                                (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
-
-                        // Manual registration - simulate what DatasetLifecycleManager would do
-                        if (opTrackers != null && !opTrackers.containsKey(partition)) {
-                            LOGGER.debug(
-                                    "Directly adding tracker to datasetPrimaryOpTrackers for dataset {}, partition {}: {}",
-                                    datasetId, partition, mockTracker);
-                            opTrackers.put(partition, mockTracker);
-                        }
-                    } catch (Exception e) {
-                        LOGGER.error("Failed to access datasetPrimaryOpTrackers via reflection", e);
-                    }
-                }
-            }
-
-            return mockTracker;
-        }
-
-        @Override
-        public ILSMIOOperationCallback getIOOperationCallback() {
-            return null;
-        }
-
-        @Override
-        public List<ILSMDiskComponent> getDiskComponents() {
-            return List.of();
-        }
-
-        @Override
-        public boolean isPrimaryIndex() {
-            return false;
-        }
-
-        @Override
-        public void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException {
-
-        }
-
-        @Override
-        public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
-                throws HyracksDataException {
-
-        }
-
-        @Override
-        public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException {
-
-        }
-
-        @Override
-        public ILSMIOOperation createFlushOperation(ILSMIndexOperationContext ctx) throws HyracksDataException {
-            return null;
-        }
-
-        @Override
-        public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException {
-            return null;
-        }
-
-        @Override
-        public ILSMIOOperation createMergeOperation(ILSMIndexOperationContext ctx) throws HyracksDataException {
-            return null;
-        }
-
-        @Override
-        public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException {
-            return null;
-        }
-
-        @Override
-        public void addDiskComponent(ILSMDiskComponent index) throws HyracksDataException {
-
-        }
-
-        @Override
-        public void addBulkLoadedDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
-
-        }
-
-        @Override
-        public void subsumeMergedComponents(ILSMDiskComponent newComponent, List<ILSMComponent> mergedComponents)
-                throws HyracksDataException {
-
-        }
-
-        @Override
-        public void changeMutableComponent() {
-
-        }
-
-        @Override
-        public void changeFlushStatusForCurrentMutableCompoent(boolean needsFlush) {
-
-        }
-
-        @Override
-        public boolean hasFlushRequestForCurrentMutableComponent() {
-            return false;
-        }
-
-        @Override
-        public void getOperationalComponents(ILSMIndexOperationContext ctx) throws HyracksDataException {
-
-        }
-
-        @Override
-        public List<ILSMDiskComponent> getInactiveDiskComponents() {
-            return List.of();
-        }
-
-        @Override
-        public void addInactiveDiskComponent(ILSMDiskComponent diskComponent) {
-
-        }
-
-        @Override
-        public List<ILSMMemoryComponent> getInactiveMemoryComponents() {
-            return List.of();
-        }
-
-        @Override
-        public void addInactiveMemoryComponent(ILSMMemoryComponent memoryComponent) {
-
-        }
-
-        // Additional method implementations required by the interface
-        // would be implemented here with minimal functionality
-
-        /**
-         * Sets the operation tracker for this mock index.
-         * Added to support DatasetLifecycleManagerLazyRecoveryTest use cases.
-         */
-        public void setOperationTracker(PrimaryIndexOperationTracker tracker) {
-            this.mockTracker = tracker;
-        }
-    }
-
-    /**
-     * Enhanced version of the MockIndex implementation that provides better
-     * tracking of operation tracker state changes.
-     */
-    private static class EnhancedMockIndex extends MockIndex {
-        private PrimaryIndexOperationTracker assignedTracker;
-        private final List<String> trackerEvents = new ArrayList<>();
-
-        public EnhancedMockIndex(String resourcePath, int datasetId, Map<Integer, DatasetResource> datasetResourceMap) {
-            super(resourcePath, datasetId, datasetResourceMap);
-        }
-
-        @Override
-        public ILSMOperationTracker getOperationTracker() {
-            PrimaryIndexOperationTracker tracker = (PrimaryIndexOperationTracker) super.getOperationTracker();
-
-            if (assignedTracker == null) {
-                assignedTracker = tracker;
-                trackerEvents.add("Initial tracker assignment: " + tracker);
-            } else if (assignedTracker != tracker) {
-                trackerEvents.add("Tracker changed from " + assignedTracker + " to " + tracker);
-                assignedTracker = tracker;
-            }
-
-            return tracker;
-        }
-
-        public List<String> getTrackerEvents() {
-            return trackerEvents;
-        }
-    }
-
-    /**
-     * Tests specific race conditions that could lead to null operation trackers during unregistration.
-     * This test verifies:
-     * 1. Concurrent registration, opening, and unregistration don't create null operation trackers
-     * 2. The operation tracker remains valid throughout concurrent operations
-     * 3. Operation tracker cleanup occurs only when appropriate
-     */
-    @Test
-    public void testRaceConditionsWithOperationTracker() throws Exception {
-        LOGGER.info("Starting testRaceConditionsWithOperationTracker");
-
-        // Create a resource path for testing
-        String resourcePath = createdResourcePaths.get(0);
-
-        // Get dataset and partition information for this resource
-        LocalResource resource = mockResources.get(resourcePath);
-        DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
-        int datasetId = datasetLocalResource.getDatasetId();
-        int partition = datasetLocalResource.getPartition();
-
-        // Setup an enhanced mock index for this test to track operation tracker events
-        EnhancedMockIndex enhancedMockIndex = new EnhancedMockIndex(resourcePath, datasetId, datasetResourceMap);
-        mockIndexes.put(resourcePath, enhancedMockIndex);
-
-        // Register the resource to make it available
-        IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, enhancedMockIndex);
-        assertNotNull("Index should be registered successfully", index);
-        assertSame("Should get our enhanced mock index", enhancedMockIndex, index);
-
-        // Create multiple threads that will perform random operations on the same resource
-        final int NUM_CONCURRENT_THREADS = 10;
-        final int OPERATIONS_PER_THREAD = 20;
-        final CyclicBarrier startBarrier = new CyclicBarrier(NUM_CONCURRENT_THREADS);
-        final CountDownLatch completionLatch = new CountDownLatch(NUM_CONCURRENT_THREADS);
-
-        for (int i = 0; i < NUM_CONCURRENT_THREADS; i++) {
-            final int threadId = i;
-            executorService.submit(() -> {
-                try {
-                    startBarrier.await();
-                    Random random = new Random();
-
-                    for (int op = 0; op < OPERATIONS_PER_THREAD; op++) {
-                        // Perform a random operation:
-                        // 0 = check index existence
-                        // 1 = open
-                        // 2 = close
-                        // 3 = register
-                        // 4 = check operation tracker
-                        int operation = random.nextInt(5);
-
-                        switch (operation) {
-                            case 0:
-                                // Check if index exists
-                                IIndex existingIndex = datasetLifecycleManager.get(resourcePath);
-                                LOGGER.debug("Thread {} checked index existence: {}", threadId, existingIndex != null);
-                                break;
-                            case 1:
-                                // Open the index
-                                try {
-                                    datasetLifecycleManager.open(resourcePath);
-                                    LOGGER.debug("Thread {} opened index", threadId);
-                                } catch (Exception e) {
-                                    // This is expected occasionally since the resource might be unregistered
-                                    LOGGER.debug("Thread {} failed to open index: {}", threadId, e.getMessage());
-                                }
-                                break;
-                            case 2:
-                                // Close the index
-                                try {
-                                    datasetLifecycleManager.close(resourcePath);
-                                    LOGGER.debug("Thread {} closed index", threadId);
-                                } catch (Exception e) {
-                                    // This is expected occasionally
-                                    LOGGER.debug("Thread {} failed to close index: {}", threadId, e.getMessage());
-                                }
-                                break;
-                            case 3:
-                                // Register or re-register the index
-                                try {
-                                    IIndex reregisteredIndex =
-                                            datasetLifecycleManager.registerIfAbsent(resourcePath, enhancedMockIndex);
-                                    LOGGER.debug("Thread {} registered index: {}", threadId, reregisteredIndex != null);
-                                } catch (Exception e) {
-                                    LOGGER.debug("Thread {} failed to register index: {}", threadId, e.getMessage());
-                                }
-                                break;
-                            case 4:
-                                // Check operation tracker
-                                try {
-                                    // Get the dataset resource
-                                    DatasetResource datasetResource = datasetResourceMap.get(datasetId);
-                                    if (datasetResource != null) {
-                                        // Get and verify the operation tracker
-                                        ILSMOperationTracker tracker = getOperationTracker(datasetResource, partition);
-                                        if (tracker instanceof PrimaryIndexOperationTracker) {
-                                            LOGGER.debug("Thread {} found valid PrimaryIndexOperationTracker: {}",
-                                                    threadId, tracker);
-                                        } else if (tracker != null) {
-                                            LOGGER.warn("Thread {} found non-PrimaryIndexOperationTracker: {}",
-                                                    threadId, tracker.getClass().getName());
-                                        } else {
-                                            LOGGER.debug("Thread {} found no operation tracker", threadId);
-                                        }
-                                    }
-                                } catch (Exception e) {
-                                    LOGGER.debug("Thread {} error checking operation tracker: {}", threadId,
-                                            e.getMessage());
-                                }
-                                break;
-                        }
-
-                        // Small delay to increase chance of race conditions
-                        Thread.sleep(random.nextInt(5));
-                    }
-
-                    LOGGER.debug("Thread {} completed all operations", threadId);
-                    completionLatch.countDown();
-                } catch (Exception e) {
-                    LOGGER.error("Thread {} error during test: {}", threadId, e.getMessage(), e);
-                    completionLatch.countDown();
-                }
-            });
-        }
-
-        // Wait for all threads to complete
-        boolean allCompleted = completionLatch.await(30, TimeUnit.SECONDS);
-        if (!allCompleted) {
-            LOGGER.warn("Not all threads completed in time");
-        }
-
-        // Get the tracker events to verify behavior
-        List<String> trackerEvents = enhancedMockIndex.getTrackerEvents();
-        LOGGER.debug("Tracker events recorded: {}", trackerEvents.size());
-        for (String event : trackerEvents) {
-            LOGGER.debug("Tracker event: {}", event);
-        }
-
-        // Verify the index is still registered at the end
-        IIndex finalIndex = datasetLifecycleManager.get(resourcePath);
-        if (finalIndex != null) {
-            // Get the final operation tracker state if the index is still registered
-            DatasetResource datasetResource = datasetResourceMap.get(datasetId);
-            if (datasetResource != null) {
-                ILSMOperationTracker finalTracker = getOperationTracker(datasetResource, partition);
-                LOGGER.debug("Final operation tracker state: {}", finalTracker);
-
-                // If there's a valid tracker, it should be a PrimaryIndexOperationTracker
-                if (finalTracker != null) {
-                    assertTrue("Final tracker should be a PrimaryIndexOperationTracker",
-                            finalTracker instanceof PrimaryIndexOperationTracker);
-                }
-            }
-        }
-
-        LOGGER.info("Completed testRaceConditionsWithOperationTracker");
-    }
-
-    // Helper method to ensure consistent register approach
-    public IIndex registerIfAbsentIndex(String resourcePath) throws HyracksDataException {
-        MockIndex mockIndex = mockIndexes.get(resourcePath);
-        IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, mockIndex);
-        LOGGER.debug("Registered {} with mock index {}", resourcePath, mockIndex);
-        return index;
-    }
-
-    @Test
-    public void testBalancedOpenCloseUnregister() throws Exception {
-        LOGGER.info("Starting testBalancedOpenCloseUnregister");
-
-        // Select a resource to test with
-        String resourcePath = createdResourcePaths.get(0);
-
-        // Register the index
-        IIndex index = registerIfAbsentIndex(resourcePath);
-        assertNotNull("Index should be registered successfully", index);
-        LOGGER.info("Registered index: {}", resourcePath);
-
-        // Number of times to open/close
-        final int numOperations = 5;
-
-        // Open the index multiple times
-        for (int i = 0; i < numOperations; i++) {
-            datasetLifecycleManager.open(resourcePath);
-            LOGGER.info("Opened index {} (#{}/{})", resourcePath, i + 1, numOperations);
-        }
-
-        // Close the index the same number of times
-        for (int i = 0; i < numOperations; i++) {
-            datasetLifecycleManager.close(resourcePath);
-            LOGGER.info("Closed index {} (#{}/{})", resourcePath, i + 1, numOperations);
-        }
-
-        // At this point, reference count should be balanced and unregistration should succeed
-        try {
-            datasetLifecycleManager.unregister(resourcePath);
-            LOGGER.info("Successfully unregistered index {} after balanced open/close operations", resourcePath);
-
-            // After unregistration, simulate real behavior by making the repository return null for this path
-            // This is what would happen in a real environment - the resource would be removed from storage
-            mockResources.remove(resourcePath);
-            LOGGER.info("Removed resource {} from mock resources", resourcePath);
-        } catch (HyracksDataException e) {
-            fail("Failed to unregister index after balanced open/close: " + e.getMessage());
-        }
-
-        // Verify the index is actually unregistered by attempting to get it
-        IIndex retrievedIndex = datasetLifecycleManager.get(resourcePath);
-        assertNull("Index should no longer be registered after unregistration", retrievedIndex);
-
-        // Verify the resource is no longer in our mock repository
-        LocalResource retrievedResource = mockResourceRepository.get(resourcePath);
-        assertNull("Resource should be removed from repository after unregistration", retrievedResource);
-
-        // Try to open the unregistered index - should fail
-        boolean openFailed = false;
-        try {
-            datasetLifecycleManager.open(resourcePath);
-        } catch (HyracksDataException e) {
-            openFailed = true;
-            LOGGER.info("As expected, failed to open unregistered index: {}", e.getMessage());
-            assertTrue("Error should indicate the index doesn't exist",
-                    e.getMessage().contains("Index does not exist"));
-        }
-        assertTrue("Opening an unregistered index should fail", openFailed);
-    }
-
-    // Add these accessor methods at the end of the class
-
-    /**
-     * Returns the dataset lifecycle manager for testing
-     */
-    public DatasetLifecycleManager getDatasetLifecycleManager() {
-        return datasetLifecycleManager;
-    }
-
-    /**
-     * Returns the mock service context for testing
-     */
-    public INCServiceContext getMockServiceContext() {
-        return mockServiceContext;
-    }
-
-    /**
-     * Returns the list of created resource paths
-     */
-    public List<String> getCreatedResourcePaths() {
-        return createdResourcePaths;
-    }
-
-    /**
-     * Returns the map of mock resources
-     */
-    public Map<String, LocalResource> getMockResources() {
-        return mockResources;
-    }
-
-    /**
-     * Returns the mock recovery manager
-     */
-    public IRecoveryManager getMockRecoveryManager() {
-        return mockRecoveryManager;
-    }
-
-    /**
-     * Returns the mock index for a path
-     */
-    public MockIndex getMockIndex(String resourcePath) {
-        return mockIndexes.get(resourcePath);
-    }
-
-    /**
-     * Returns the mock resource repository
-     */
-    public ILocalResourceRepository getMockResourceRepository() {
-        return mockResourceRepository;
-    }
-
-    /**
-     * Creates a mock index for a resource path and adds it to the mockIndexes map.
-     * This is used by DatasetLifecycleManagerLazyRecoveryTest to ensure consistent mock creation.
-     */
-    public void createMockIndexForPath(String resourcePath, int datasetId, int partition,
-            PrimaryIndexOperationTracker opTracker) throws HyracksDataException {
-        // Extract needed dataset ID from the path or use provided one
-        MockIndex mockIndex = new MockIndex(resourcePath, datasetId, datasetResourceMap);
-
-        // Set the operation tracker directly if provided
-        if (opTracker != null) {
-            mockIndex.setOperationTracker(opTracker);
-        }
-
-        // Store the mock index
-        mockIndexes.put(resourcePath, mockIndex);
-
-        // Create an answer that returns this mock index
-        DatasetLocalResource mockDatasetResource = null;
-        for (Map.Entry<String, LocalResource> entry : mockResources.entrySet()) {
-            if (entry.getKey().equals(resourcePath)) {
-                mockDatasetResource = (DatasetLocalResource) entry.getValue().getResource();
-                break;
-            }
-        }
-
-        if (mockDatasetResource != null) {
-            when(mockDatasetResource.createInstance(any())).thenReturn(mockIndex);
-        }
-
-        LOGGER.debug("Created mock index for {}: {}", resourcePath, mockIndex);
-    }
-}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerLazyRecoveryTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerLazyRecoveryTest.java
deleted file mode 100644
index a4b5a55..0000000
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerLazyRecoveryTest.java
+++ /dev/null
@@ -1,1223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.context;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import org.apache.asterix.common.dataflow.DatasetLocalResource;
-import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.common.IIndex;
-import org.apache.hyracks.storage.common.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.LocalResource;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Tests the behavior of the DatasetLifecycleManager with lazy recovery enabled.
- * This test class focuses on verifying that lazy recovery works correctly.
- */
-public class DatasetLifecycleManagerLazyRecoveryTest {
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    private DatasetLifecycleManagerConcurrentTest concurrentTest;
-    private List<String> recoveredResourcePaths;
-    private Set<String> datasetPartitionPaths;
-    private Map<String, FileReference> pathToFileRefMap;
-    private ExecutorService executorService;
-
-    @Before
-    public void setUp() throws Exception {
-        // Create and initialize the base test
-        concurrentTest = new DatasetLifecycleManagerConcurrentTest();
-        concurrentTest.setUp();
-
-        // Enable lazy recovery
-        concurrentTest.setLazyRecoveryEnabled(true);
-        LOGGER.info("Lazy recovery enabled for all tests");
-
-        // Track recovered resources
-        recoveredResourcePaths = new ArrayList<>();
-        datasetPartitionPaths = new TreeSet<>();
-        pathToFileRefMap = new HashMap<>();
-
-        // Extract the dataset partition paths from the resource paths and log partition info
-        // These are the parent directories that contain multiple index files
-        // Format: storage/partition_X/DBName/ScopeName/DatasetName/ReplicationFactor/IndexName
-        // Note: The replication factor is ALWAYS 0 in production and should be 0 in all tests
-        Pattern pattern = Pattern.compile("(.*?/partition_(\\d+)/[^/]+/[^/]+/[^/]+/(\\d+))/.+");
-
-        // Map to collect resources by partition for logging
-        Map<Integer, List<String>> resourcesByPartition = new HashMap<>();
-
-        for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
-            Matcher matcher = pattern.matcher(resourcePath);
-            if (matcher.matches()) {
-                String partitionPath = matcher.group(1); // Now includes the replication factor
-                int partitionId = Integer.parseInt(matcher.group(2));
-                int replicationFactor = Integer.parseInt(matcher.group(3)); // Usually 0
-
-                LOGGER.info("Resource path: {}", resourcePath);
-                LOGGER.info("  Partition path: {}", partitionPath);
-                LOGGER.info("  Partition ID: {}", partitionId);
-                LOGGER.info("  Replication factor: {}", replicationFactor);
-
-                // Track resources by partition for logging
-                resourcesByPartition.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(resourcePath);
-
-                // Add to our partition paths for mocking
-                datasetPartitionPaths.add(partitionPath);
-
-                // Create a FileReference mock for this partition path
-                FileReference mockPartitionRef = mock(FileReference.class);
-                when(mockPartitionRef.getAbsolutePath()).thenReturn(partitionPath);
-                pathToFileRefMap.put(partitionPath, mockPartitionRef);
-            } else {
-                LOGGER.warn("Resource path doesn't match expected pattern: {}", resourcePath);
-            }
-        }
-
-        // Log resources by partition for debugging
-        LOGGER.info("Found {} dataset partition paths", datasetPartitionPaths.size());
-        for (Map.Entry<Integer, List<String>> entry : resourcesByPartition.entrySet()) {
-            LOGGER.info("Partition {}: {} resources", entry.getKey(), entry.getValue().size());
-            for (String path : entry.getValue()) {
-                LOGGER.info("  - {}", path);
-            }
-        }
-
-        // Mock the IOManager.resolve() method to return our mock FileReferences
-        IIOManager mockIOManager = concurrentTest.getMockServiceContext().getIoManager();
-        doAnswer(inv -> {
-            String path = inv.getArgument(0);
-            // For each possible partition path, check if this is the path being resolved
-            for (String partitionPath : datasetPartitionPaths) {
-                if (path.equals(partitionPath)) {
-                    LOGGER.info("Resolving path {} to FileReference", path);
-                    return pathToFileRefMap.get(partitionPath);
-                }
-            }
-            // If no match, create a new mock FileReference
-            FileReference mockFileRef = mock(FileReference.class);
-            when(mockFileRef.getAbsolutePath()).thenReturn(path);
-            when(mockFileRef.toString()).thenReturn(path);
-            return mockFileRef;
-        }).when(mockIOManager).resolve(anyString());
-
-        // Create executor service for concurrent tests
-        executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
-    }
-
-    /**
-     * Tests that when an index is opened with lazy recovery enabled, 
-     * all resources under the same dataset partition path are recovered together.
-     * This verifies the batch recovery behavior for all resources with the same prefix path.
-     */
-    @Test
-    public void testLazyRecoveryForEntirePartition() throws Exception {
-        LOGGER.info("Starting testLazyRecoveryForEntirePartition");
-
-        // Get resources for one dataset partition
-        String datasetPartitionPath = datasetPartitionPaths.iterator().next();
-        List<String> resourcesInPartition = concurrentTest.getCreatedResourcePaths().stream()
-                .filter(path -> path.startsWith(datasetPartitionPath)).collect(Collectors.toList());
-
-        LOGGER.info("Testing partition path: {}", datasetPartitionPath);
-        LOGGER.info("Resources in this partition: {}", resourcesInPartition);
-
-        // Mock the resource repository to track calls to getResources
-        ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
-
-        // Setup capture of resources that get recovered - using FileReference
-        doAnswer(inv -> {
-            List<FileReference> refs = inv.getArgument(1);
-            if (refs != null && !refs.isEmpty()) {
-                FileReference fileRef = refs.get(0);
-                String rootPath = fileRef.getAbsolutePath();
-                LOGGER.info("Looking up resources with FileReference: {}", fileRef);
-                LOGGER.info("FileReference absolute path: {}", rootPath);
-
-                // The rootPath from DatasetLifecycleManager now includes the replication factor
-                // e.g., storage/partition_1/Default/SDefault/ds0/0
-                LOGGER.info("Using corrected rootPath for resource lookup: {}", rootPath);
-
-                // Get all resources that match this root path
-                Map<Long, LocalResource> matchingResources = new HashMap<>();
-
-                // Convert our string-keyed map to long-keyed map as expected by the API
-                // Note: We need to match exactly, not just prefix, because the rootPath now includes the replication factor
-                concurrentTest.getMockResources().entrySet().stream()
-                        .filter(entry -> entry.getKey().equals(rootPath) || entry.getKey().startsWith(rootPath + "/"))
-                        .forEach(entry -> {
-                            LocalResource resource = entry.getValue();
-                            matchingResources.put(resource.getId(), resource);
-                            // Record which resources were recovered for our test verification
-                            recoveredResourcePaths.add(entry.getKey());
-                            LOGGER.info("Including resource for recovery: {} (ID: {})", entry.getKey(),
-                                    resource.getId());
-                        });
-
-                LOGGER.info("Found {} resources to recover", matchingResources.size());
-
-                // Return resources mapped by ID as the real implementation would
-                return matchingResources;
-            }
-            return Map.of();
-        }).when(mockRepository).getResources(any(), anyList());
-
-        // Setup capture for recoverIndexes calls
-        doAnswer(inv -> {
-            List<ILSMIndex> indexes = inv.getArgument(0);
-            if (indexes != null && !indexes.isEmpty()) {
-                LOGGER.info("Recovering {} indexes", indexes.size());
-                for (ILSMIndex index : indexes) {
-                    LOGGER.info("Recovering index: {}", index);
-                }
-            }
-            return null;
-        }).when(concurrentTest.getMockRecoveryManager()).recoverIndexes(anyList());
-
-        // Choose the first resource in the partition to register and open
-        String resourcePath = resourcesInPartition.get(0);
-
-        // Register the index
-        IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
-        assertNotNull("Index should be registered successfully", index);
-        LOGGER.info("Successfully registered index at {}", resourcePath);
-
-        // Open the index - should trigger lazy recovery
-        concurrentTest.getDatasetLifecycleManager().open(resourcePath);
-        LOGGER.info("Successfully opened index at {}", resourcePath);
-
-        // Verify the recovery manager was consulted
-        verify(concurrentTest.getMockRecoveryManager(), atLeastOnce()).isLazyRecoveryEnabled();
-
-        // Verify that all resources in the partition were considered for recovery
-        LOGGER.info("Verifying recovery for resources in partition: {}", resourcesInPartition);
-        LOGGER.info("Resources marked as recovered: {}", recoveredResourcePaths);
-
-        for (String path : resourcesInPartition) {
-            boolean wasRecovered = isResourcePathRecovered(path);
-            LOGGER.info("Resource {} - recovered: {}", path, wasRecovered);
-            assertTrue("Resource should have been considered for recovery: " + path, wasRecovered);
-        }
-
-        // Cleanup
-        concurrentTest.getDatasetLifecycleManager().close(resourcePath);
-        concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
-        concurrentTest.getMockResources().remove(resourcePath);
-
-        // Verify cleanup was successful
-        assertNull("Index should no longer be registered",
-                concurrentTest.getDatasetLifecycleManager().get(resourcePath));
-    }
-
-    /**
-     * Tests that lazy recovery is only performed the first time an index is opened
-     * and is skipped on subsequent opens.
-     */
-    @Test
-    public void testLazyRecoveryOnlyHappensOnce() throws Exception {
-        LOGGER.info("Starting testLazyRecoveryOnlyHappensOnce");
-
-        // Get a resource path
-        String resourcePath = concurrentTest.getCreatedResourcePaths().get(0);
-        LOGGER.info("Testing with resource: {}", resourcePath);
-
-        // Find the dataset partition path for this resource
-        String partitionPath = null;
-        for (String path : datasetPartitionPaths) {
-            if (resourcePath.startsWith(path)) {
-                partitionPath = path;
-                break;
-            }
-        }
-        LOGGER.info("Resource belongs to partition: {}", partitionPath);
-
-        // Track recovery calls
-        List<String> recoveredPaths = new ArrayList<>();
-
-        // Mock repository to track resource lookups during recovery
-        final Map<Integer, Integer> recoveryCount = new HashMap<>();
-        recoveryCount.put(0, 0); // First recovery count
-        recoveryCount.put(1, 0); // Second recovery count
-
-        ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
-
-        // Setup capture of resources that get recovered
-        doAnswer(inv -> {
-            List<FileReference> refs = inv.getArgument(1);
-            if (refs != null && !refs.isEmpty()) {
-                FileReference fileRef = refs.get(0);
-                String rootPath = fileRef.getAbsolutePath();
-
-                // Get all resources that match this root path
-                Map<Long, LocalResource> matchingResources = new HashMap<>();
-
-                concurrentTest.getMockResources().entrySet().stream()
-                        .filter(entry -> entry.getKey().startsWith(rootPath)).forEach(entry -> {
-                            LocalResource resource = entry.getValue();
-                            matchingResources.put(resource.getId(), resource);
-                            recoveredPaths.add(entry.getKey());
-                        });
-
-                // Increment recovery counter based on which open we're on
-                if (recoveryCount.get(1) == 0) {
-                    recoveryCount.put(0, recoveryCount.get(0) + matchingResources.size());
-                    LOGGER.info("First open recovery finding {} resources", matchingResources.size());
-                } else {
-                    recoveryCount.put(1, recoveryCount.get(1) + matchingResources.size());
-                    LOGGER.info("Second open recovery finding {} resources", matchingResources.size());
-                }
-
-                return matchingResources;
-            }
-            return Map.of();
-        }).when(mockRepository).getResources(any(), anyList());
-
-        // Mock recovery behavior to track calls
-        doAnswer(inv -> {
-            List<ILSMIndex> indexes = inv.getArgument(0);
-            if (indexes != null && !indexes.isEmpty()) {
-                for (ILSMIndex index : indexes) {
-                    String path = index.toString();
-                    LOGGER.info("Recovering index: {}", path);
-                }
-            }
-            return null;
-        }).when(concurrentTest.getMockRecoveryManager()).recoverIndexes(anyList());
-
-        // Register the index
-        IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
-        assertNotNull("Index should be registered successfully", index);
-
-        // First open - should trigger recovery
-        LOGGER.info("First open of index");
-        concurrentTest.getDatasetLifecycleManager().open(resourcePath);
-        LOGGER.info("First open recovery found {} resources", recoveryCount.get(0));
-
-        // Close the index
-        concurrentTest.getDatasetLifecycleManager().close(resourcePath);
-
-        // Second open - should NOT trigger recovery again
-        LOGGER.info("Second open of index");
-        recoveryCount.put(1, 0); // Reset second recovery counter
-        concurrentTest.getDatasetLifecycleManager().open(resourcePath);
-        LOGGER.info("Second open recovery found {} resources", recoveryCount.get(1));
-
-        // Verify recovery was skipped on second open
-        assertTrue("First open should recover resources", recoveryCount.get(0) > 0);
-        assertTrue("Recovery should not happen on subsequent opens",
-                recoveryCount.get(1) == 0 || recoveryCount.get(1) < recoveryCount.get(0));
-
-        // Cleanup
-        concurrentTest.getDatasetLifecycleManager().close(resourcePath);
-        concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
-        concurrentTest.getMockResources().remove(resourcePath);
-    }
-
-    /**
-     * A comprehensive test that runs multiple operations with lazy recovery enabled.
-     * This test verifies that lazy recovery works correctly across various scenarios:
-     * - Opening multiple resources in the same partition
-     * - Operations across multiple partitions
-     * - Reference counting with multiple open/close
-     * - Proper unregistration after use
-     */
-    @Test
-    public void testComprehensiveLazyRecoveryWithMultipleResources() throws Exception {
-        LOGGER.info("Starting testComprehensiveLazyRecoveryWithMultipleResources");
-
-        // Create sets to track which resources have been recovered
-        Set<String> recoveredResources = new TreeSet<>();
-        Set<String> recoveredPartitions = new TreeSet<>();
-
-        // Mock repository to track resource lookups during recovery
-        ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
-
-        // Setup capture of resources that get recovered
-        doAnswer(inv -> {
-            List<FileReference> refs = inv.getArgument(1);
-            if (refs != null && !refs.isEmpty()) {
-                FileReference fileRef = refs.get(0);
-                String rootPath = fileRef.getAbsolutePath();
-                LOGGER.info("Looking up resources with root path: {}", rootPath);
-
-                // Track this partition as being recovered
-                recoveredPartitions.add(rootPath);
-
-                // Get all resources that match this root path
-                Map<Long, LocalResource> matchingResources = new HashMap<>();
-
-                concurrentTest.getMockResources().entrySet().stream()
-                        .filter(entry -> entry.getKey().startsWith(rootPath)).forEach(entry -> {
-                            LocalResource resource = entry.getValue();
-                            matchingResources.put(resource.getId(), resource);
-                            recoveredResources.add(entry.getKey());
-                            LOGGER.info("Including resource for recovery: {} (ID: {})", entry.getKey(),
-                                    resource.getId());
-                        });
-
-                return matchingResources;
-            }
-            return Map.of();
-        }).when(mockRepository).getResources(any(), anyList());
-
-        // Group resources by partition for easier testing
-        Map<String, List<String>> resourcesByPartition = new HashMap<>();
-
-        for (String partitionPath : datasetPartitionPaths) {
-            List<String> resourcesInPartition = concurrentTest.getCreatedResourcePaths().stream()
-                    .filter(path -> path.startsWith(partitionPath)).collect(Collectors.toList());
-
-            resourcesByPartition.put(partitionPath, resourcesInPartition);
-        }
-
-        LOGGER.info("Found {} partitions with resources", resourcesByPartition.size());
-
-        // For each partition, perform a series of operations
-        for (Map.Entry<String, List<String>> entry : resourcesByPartition.entrySet()) {
-            String partitionPath = entry.getKey();
-            List<String> resources = entry.getValue();
-
-            LOGGER.info("Testing partition: {} with {} resources", partitionPath, resources.size());
-
-            // 1. Register and open the first resource - should trigger recovery for all resources in this partition
-            String firstResource = resources.get(0);
-            LOGGER.info("Registering and opening first resource: {}", firstResource);
-
-            IIndex firstIndex = concurrentTest.registerIfAbsentIndex(firstResource);
-            assertNotNull("First index should be registered successfully", firstIndex);
-
-            // Open the index - should trigger lazy recovery for all resources in this partition
-            concurrentTest.getDatasetLifecycleManager().open(firstResource);
-
-            // Verify partition was recovered
-            assertTrue("Partition should be marked as recovered", recoveredPartitions.contains(partitionPath));
-
-            // 2. Register and open a second resource if available - should NOT trigger recovery again
-            if (resources.size() > 1) {
-                // Clear recovery tracking before opening second resource
-                int beforeSize = recoveredResources.size();
-                String secondResource = resources.get(1);
-
-                LOGGER.info("Registering and opening second resource: {}", secondResource);
-                IIndex secondIndex = concurrentTest.registerIfAbsentIndex(secondResource);
-                assertNotNull("Second index should be registered successfully", secondIndex);
-
-                // Open the second index - should NOT trigger lazy recovery again for this partition
-                concurrentTest.getDatasetLifecycleManager().open(secondResource);
-
-                // Verify no additional recovery happened
-                int afterSize = recoveredResources.size();
-                LOGGER.info("Resources recovered before: {}, after: {}", beforeSize, afterSize);
-                // We might see some recovery of the specific resource, but not the whole partition again
-
-                // 3. Open and close the second resource multiple times to test reference counting
-                for (int i = 0; i < 3; i++) {
-                    concurrentTest.getDatasetLifecycleManager().close(secondResource);
-                    concurrentTest.getDatasetLifecycleManager().open(secondResource);
-                }
-
-                // Close and unregister the second resource
-                concurrentTest.getDatasetLifecycleManager().close(secondResource);
-                concurrentTest.getDatasetLifecycleManager().unregister(secondResource);
-                LOGGER.info("Successfully closed and unregistered second resource: {}", secondResource);
-            }
-
-            // 4. Close and unregister the first resource
-            concurrentTest.getDatasetLifecycleManager().close(firstResource);
-            concurrentTest.getDatasetLifecycleManager().unregister(firstResource);
-            LOGGER.info("Successfully closed and unregistered first resource: {}", firstResource);
-
-            // Verify both resources are truly unregistered
-            for (String resource : resources.subList(0, Math.min(2, resources.size()))) {
-                assertNull("Resource should be unregistered: " + resource,
-                        concurrentTest.getDatasetLifecycleManager().get(resource));
-            }
-        }
-
-        // Summary of recovery
-        LOGGER.info("Recovered {} partitions out of {}", recoveredPartitions.size(), datasetPartitionPaths.size());
-        LOGGER.info("Recovered {} resources out of {}", recoveredResources.size(),
-                concurrentTest.getCreatedResourcePaths().size());
-    }
-
-    /**
-     * Tests that open operation succeeds after unregister + register sequence.
-     * This verifies the operation tracker is properly reset when re-registering.
-     */
-    @Test
-    public void testUnregisterRegisterIfAbsentOpenSequence() throws Exception {
-        LOGGER.info("Starting testUnregisterRegisterOpenSequence");
-
-        // Get a resource path
-        String resourcePath = concurrentTest.getCreatedResourcePaths().get(0);
-        LOGGER.info("Testing with resource: {}", resourcePath);
-
-        // First lifecycle - register, open, close, unregister
-        IIndex firstIndex = concurrentTest.registerIfAbsentIndex(resourcePath);
-        assertNotNull("Index should be registered successfully in first lifecycle", firstIndex);
-
-        concurrentTest.getDatasetLifecycleManager().open(resourcePath);
-        LOGGER.info("Successfully opened index in first lifecycle");
-
-        concurrentTest.getDatasetLifecycleManager().close(resourcePath);
-        concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
-        LOGGER.info("Successfully closed and unregistered index in first lifecycle");
-
-        // Second lifecycle - register again, open, close, unregister
-        IIndex secondIndex = concurrentTest.registerIfAbsentIndex(resourcePath);
-        assertNotNull("Index should be registered successfully in second lifecycle", secondIndex);
-
-        // This open should succeed if the operation tracker was properly reset
-        try {
-            concurrentTest.getDatasetLifecycleManager().open(resourcePath);
-            LOGGER.info("Successfully opened index in second lifecycle");
-        } catch (Exception e) {
-            LOGGER.error("Failed to open index in second lifecycle", e);
-            fail("Open should succeed in second lifecycle: " + e.getMessage());
-        }
-
-        // Cleanup
-        concurrentTest.getDatasetLifecycleManager().close(resourcePath);
-        concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
-        LOGGER.info("Successfully completed two full lifecycles with the same resource");
-    }
-
-    /**
-     * Tests that recovery works correctly across different partitions.
-     * This test specifically verifies that partition IDs are correctly propagated
-     * during the recovery process.
-     */
-    @Test
-    public void testRecoveryAcrossPartitions() throws Exception {
-        LOGGER.info("Starting testRecoveryAcrossPartitions");
-
-        // Skip test if we don't have at least 2 partitions
-        if (datasetPartitionPaths.size() < 2) {
-            LOGGER.info("Skipping testRecoveryAcrossPartitions - need at least 2 partitions");
-            return;
-        }
-
-        // Create maps to track recovery by partition
-        Map<Integer, Boolean> partitionRecovered = new HashMap<>();
-        Map<Integer, Set<String>> resourcesRecoveredByPartition = new HashMap<>();
-
-        // Find resources from different partitions
-        Map<Integer, String> resourcesByPartition = new HashMap<>();
-        // Format: storage/partition_X/DBName/ScopeName/DatasetName/ReplicationFactor/IndexName
-        Pattern pattern = Pattern.compile(".*?/partition_(\\d+)/.+");
-
-        for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
-            Matcher matcher = pattern.matcher(resourcePath);
-            if (matcher.matches()) {
-                int partitionId = Integer.parseInt(matcher.group(1));
-                resourcesByPartition.putIfAbsent(partitionId, resourcePath);
-                partitionRecovered.put(partitionId, false);
-                resourcesRecoveredByPartition.put(partitionId, new TreeSet<>());
-            }
-        }
-
-        LOGGER.info("Found resources in {} different partitions: {}", resourcesByPartition.size(),
-                resourcesByPartition.keySet());
-
-        // Mock repository to track which partition's resources are being recovered
-        ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
-
-        // Setup capture of resources that get recovered
-        doAnswer(inv -> {
-            List<FileReference> refs = inv.getArgument(1);
-            if (refs != null && !refs.isEmpty()) {
-                FileReference fileRef = refs.get(0);
-                String rootPath = fileRef.getAbsolutePath();
-                LOGGER.info("Looking up resources with root path: {}", rootPath);
-
-                // The rootPath now includes the replication factor
-                LOGGER.info("Using corrected rootPath for resource lookup: {}", rootPath);
-
-                // Get the partition ID from the path
-                Matcher matcher = Pattern.compile(".*?/partition_(\\d+)/.+").matcher(rootPath);
-                if (matcher.matches()) {
-                    int partitionId = Integer.parseInt(matcher.group(1));
-                    partitionRecovered.put(partitionId, true);
-                    LOGGER.info("Recovery requested for partition {}", partitionId);
-                } else {
-                    LOGGER.warn("Could not extract partition ID from path: {}", rootPath);
-                }
-
-                // Get all resources that match this root path
-                Map<Long, LocalResource> matchingResources = new HashMap<>();
-
-                concurrentTest.getMockResources().entrySet().stream()
-                        .filter(entry -> entry.getKey().equals(rootPath) || entry.getKey().startsWith(rootPath + "/"))
-                        .forEach(entry -> {
-                            LocalResource resource = entry.getValue();
-                            matchingResources.put(resource.getId(), resource);
-
-                            // Extract partition ID from path
-                            Matcher m = Pattern.compile(".*?/partition_(\\d+)/.+").matcher(entry.getKey());
-                            if (m.matches()) {
-                                int partitionId = Integer.parseInt(m.group(1));
-                                resourcesRecoveredByPartition.get(partitionId).add(entry.getKey());
-                                LOGGER.info("Resource {} added to recovery for partition {}", entry.getKey(),
-                                        partitionId);
-                            }
-                        });
-
-                LOGGER.info("Found {} resources to recover", matchingResources.size());
-                return matchingResources;
-            }
-            return Map.of();
-        }).when(mockRepository).getResources(any(), anyList());
-
-        // Open resources from each partition
-        for (Map.Entry<Integer, String> entry : resourcesByPartition.entrySet()) {
-            int partitionId = entry.getKey();
-            String resourcePath = entry.getValue();
-
-            LOGGER.info("Testing recovery for partition {} with resource {}", partitionId, resourcePath);
-
-            // Register and open the resource
-            IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
-            assertNotNull("Index should be registered successfully", index);
-
-            // Get the LocalResource and verify its partition ID
-            LocalResource localResource = concurrentTest.getMockResources().get(resourcePath);
-            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
-            int resourcePartitionId = datasetLocalResource.getPartition();
-
-            LOGGER.info("Resource {} has partition ID {} (expected {})", resourcePath, resourcePartitionId,
-                    partitionId);
-
-            // Open the index - should trigger lazy recovery for this partition
-            concurrentTest.getDatasetLifecycleManager().open(resourcePath);
-            LOGGER.info("Successfully opened index for partition {}", partitionId);
-
-            // Verify this partition was recovered
-            assertTrue("Partition " + partitionId + " should be marked as recovered",
-                    partitionRecovered.get(partitionId));
-
-            // Verify resources for this partition were recovered
-            Set<String> recoveredResources = resourcesRecoveredByPartition.get(partitionId);
-            assertTrue("At least one resource should be recovered for partition " + partitionId,
-                    !recoveredResources.isEmpty());
-
-            // Cleanup
-            concurrentTest.getDatasetLifecycleManager().close(resourcePath);
-            concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
-        }
-
-        // Summary of recovery by partition
-        for (Map.Entry<Integer, Boolean> entry : partitionRecovered.entrySet()) {
-            int partitionId = entry.getKey();
-            boolean recovered = entry.getValue();
-            int resourceCount = resourcesRecoveredByPartition.get(partitionId).size();
-
-            LOGGER.info("Partition {}: Recovered={}, Resources recovered={}", partitionId, recovered, resourceCount);
-        }
-    }
-
-    /**
-     * Tests that the partition ID in DatasetLocalResource objects is correctly set.
-     * This test specifically verifies that partition IDs match the resource path.
-     */
-    @Test
-    public void testPartitionIdMatchesPath() throws Exception {
-        LOGGER.info("Starting testPartitionIdMatchesPath");
-
-        // Find resources from different partitions
-        Map<Integer, Set<String>> resourcesByPartition = new HashMap<>();
-        Pattern pattern = Pattern.compile(".*?/partition_(\\d+)/.+");
-
-        // First, collect all resources by partition
-        for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
-            Matcher matcher = pattern.matcher(resourcePath);
-            if (matcher.matches()) {
-                int partitionId = Integer.parseInt(matcher.group(1));
-                resourcesByPartition.computeIfAbsent(partitionId, k -> new TreeSet<>()).add(resourcePath);
-            }
-        }
-
-        boolean incorrectPartitionIdFound = false;
-
-        // Examine each resource and check its partition ID
-        for (Map.Entry<Integer, Set<String>> entry : resourcesByPartition.entrySet()) {
-            int expectedPartitionId = entry.getKey();
-            Set<String> resources = entry.getValue();
-
-            LOGGER.info("Checking {} resources for partition {}", resources.size(), expectedPartitionId);
-
-            for (String resourcePath : resources) {
-                LocalResource localResource = concurrentTest.getMockResources().get(resourcePath);
-                if (localResource != null) {
-                    DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
-                    int actualPartitionId = datasetLocalResource.getPartition();
-
-                    LOGGER.info("Resource: {}, Expected partition: {}, Actual partition: {}", resourcePath,
-                            expectedPartitionId, actualPartitionId);
-
-                    if (expectedPartitionId != actualPartitionId) {
-                        LOGGER.error("PARTITION ID MISMATCH for resource {}! Expected: {}, Actual: {}", resourcePath,
-                                expectedPartitionId, actualPartitionId);
-                        incorrectPartitionIdFound = true;
-                    }
-                } else {
-                    LOGGER.warn("Resource not found in mockResources: {}", resourcePath);
-                }
-            }
-        }
-
-        // This assertion will fail if any partition ID is incorrect
-        assertFalse("Incorrect partition IDs found in resources", incorrectPartitionIdFound);
-
-        // Now test DatasetLifecycleManager.getDatasetLocalResource directly
-        LOGGER.info("Testing DatasetLifecycleManager.getDatasetLocalResource directly");
-
-        // Find a resource in partition 1 (if available)
-        Set<String> partition1Resources = resourcesByPartition.getOrDefault(1, Collections.emptySet());
-        if (!partition1Resources.isEmpty()) {
-            String resourcePath = partition1Resources.iterator().next();
-            LOGGER.info("Testing getDatasetLocalResource with partition 1 resource: {}", resourcePath);
-
-            // Register the resource
-            IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
-            assertNotNull("Index should be registered successfully", index);
-
-            // Directly access the DatasetLocalResource to check partition ID
-            // We'll need to use reflection since getDatasetLocalResource is private
-            try {
-                // Get the private method
-                Method getDatasetLocalResourceMethod =
-                        DatasetLifecycleManager.class.getDeclaredMethod("getDatasetLocalResource", String.class);
-                getDatasetLocalResourceMethod.setAccessible(true);
-
-                // Invoke the method
-                DatasetLocalResource result = (DatasetLocalResource) getDatasetLocalResourceMethod
-                        .invoke(concurrentTest.getDatasetLifecycleManager(), resourcePath);
-
-                assertNotNull("DatasetLocalResource should not be null", result);
-                int actualPartitionId = result.getPartition();
-                LOGGER.info("getDatasetLocalResource for {} returned partition ID: {}", resourcePath,
-                        actualPartitionId);
-                assertEquals("Partition ID should be 1", 1, actualPartitionId);
-            } catch (Exception e) {
-                LOGGER.error("Error accessing getDatasetLocalResource method", e);
-                fail("Failed to access getDatasetLocalResource method: " + e.getMessage());
-            }
-
-            // Clean up
-            concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
-        } else {
-            LOGGER.info("No resources found in partition 1, skipping direct test");
-        }
-    }
-
-    /**
-     * Tests that the mock resources are correctly set up with the proper partition IDs.
-     * This test directly examines the mockResources map to verify the mocks.
-     */
-    @Test
-    public void testMockResourcesSetup() throws Exception {
-        LOGGER.info("Starting testMockResourcesSetup");
-
-        // Extract partition IDs from paths and compare with DatasetLocalResource partitions
-        Pattern pattern = Pattern.compile(".*?/partition_(\\d+)/.+");
-        int correctResources = 0;
-        int incorrectResources = 0;
-
-        LOGGER.info("Total mock resources: {}", concurrentTest.getMockResources().size());
-
-        // Print out all setupMockResource calls that would be needed to recreate the resources
-        for (Map.Entry<String, LocalResource> entry : concurrentTest.getMockResources().entrySet()) {
-            String resourcePath = entry.getKey();
-            LocalResource localResource = entry.getValue();
-
-            // Extract the expected partition ID from the path
-            Matcher matcher = pattern.matcher(resourcePath);
-            if (matcher.matches()) {
-                int expectedPartitionId = Integer.parseInt(matcher.group(1));
-
-                // Get the actual partition ID from the resource
-                Object resourceObj = localResource.getResource();
-                if (resourceObj instanceof DatasetLocalResource) {
-                    DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resourceObj;
-                    int actualPartitionId = datasetLocalResource.getPartition();
-                    int datasetId = datasetLocalResource.getDatasetId();
-                    long resourceId = localResource.getId();
-
-                    // Log resource details
-                    LOGGER.info(
-                            "Resource: {}, Expected Partition: {}, Actual Partition: {}, Dataset ID: {}, Resource ID: {}",
-                            resourcePath, expectedPartitionId, actualPartitionId, datasetId, resourceId);
-
-                    // Generate the setupMockResource call that would create this resource
-                    LOGGER.info("setupMockResource(\"{}\", {}, {}, {});", resourcePath, datasetId, expectedPartitionId,
-                            resourceId);
-
-                    if (expectedPartitionId == actualPartitionId) {
-                        correctResources++;
-                    } else {
-                        incorrectResources++;
-                        LOGGER.error("PARTITION MISMATCH in mock resources: {} (expected: {}, actual: {})",
-                                resourcePath, expectedPartitionId, actualPartitionId);
-                    }
-                } else {
-                    LOGGER.warn("Resource object is not a DatasetLocalResource: {}", resourceObj);
-                }
-            } else {
-                LOGGER.warn("Resource path doesn't match expected pattern: {}", resourcePath);
-            }
-        }
-
-        LOGGER.info("Resources with correct partition IDs: {}", correctResources);
-        LOGGER.info("Resources with incorrect partition IDs: {}", incorrectResources);
-
-        // Fail the test if any resources have incorrect partition IDs
-        assertEquals("All resources should have correct partition IDs", 0, incorrectResources);
-
-        // Now test a direct mock creation and retrieval to see if that works correctly
-        LOGGER.info("Testing direct mock creation and retrieval");
-
-        // Create a test mock resource with partition 1
-        String testPath = "test/partition_1/Default/SDefault/testDs/0/testIndex"; // 0 is replication factor (always 0)
-        int testDatasetId = 999;
-        int testPartition = 1;
-        long testResourceId = 12345;
-
-        // Create a mock dataset local resource
-        DatasetLocalResource mockDatasetResource = mock(DatasetLocalResource.class);
-        when(mockDatasetResource.getDatasetId()).thenReturn(testDatasetId);
-        when(mockDatasetResource.getPartition()).thenReturn(testPartition);
-
-        // Create a mock LSMINDEX (not just IIndex) since recovery expects ILSMIndex
-        ILSMIndex mockIndex = mock(ILSMIndex.class);
-        when(mockIndex.toString()).thenReturn(testPath);
-
-        // Add necessary behavior for LSM operations
-        when(mockIndex.isDurable()).thenReturn(true);
-        when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-
-        // Allow this resource to be created
-        when(mockDatasetResource.createInstance(any())).thenReturn(mockIndex);
-
-        // Create a mock local resource
-        LocalResource mockLocalResource = mock(LocalResource.class);
-        when(mockLocalResource.getId()).thenReturn(testResourceId);
-        when(mockLocalResource.getPath()).thenReturn(testPath);
-        when(mockLocalResource.getResource()).thenReturn(mockDatasetResource);
-
-        // Add to mock resources map
-        concurrentTest.getMockResources().put(testPath, mockLocalResource);
-
-        // Create a mock file reference
-        FileReference mockFileRef = mock(FileReference.class);
-        when(mockFileRef.getRelativePath()).thenReturn(testPath);
-        when(mockFileRef.getAbsolutePath()).thenReturn(testPath);
-        when(concurrentTest.getMockServiceContext().getIoManager().resolveAbsolutePath(testPath))
-                .thenReturn(mockFileRef);
-    }
-
-    /**
-     * Creates a mock resource for testing.
-     */
-    private void setupMockResourceForTest(String resourcePath, int datasetId, int partition, long resourceId)
-            throws HyracksDataException {
-        // Create a mock dataset local resource
-        DatasetLocalResource mockDatasetResource = mock(DatasetLocalResource.class);
-        when(mockDatasetResource.getDatasetId()).thenReturn(datasetId);
-        when(mockDatasetResource.getPartition()).thenReturn(partition);
-
-        // Ensure we have a dataset resource
-        DatasetResource datasetResource = concurrentTest.getDatasetLifecycleManager().getDatasetLifecycle(datasetId);
-
-        // Create op tracker if needed
-        PrimaryIndexOperationTracker opTracker = datasetResource.getOpTracker(partition);
-        if (opTracker == null) {
-            opTracker = mock(PrimaryIndexOperationTracker.class);
-            datasetResource.setPrimaryIndexOperationTracker(partition, opTracker);
-            LOGGER.info("Created operation tracker for dataset {} partition {}", datasetId, partition);
-        }
-
-        // Create a mock local resource
-        LocalResource mockLocalResource = mock(LocalResource.class);
-        when(mockLocalResource.getId()).thenReturn(resourceId);
-        when(mockLocalResource.getPath()).thenReturn(resourcePath);
-        when(mockLocalResource.getResource()).thenReturn(mockDatasetResource);
-
-        // Add to mock resources map
-        concurrentTest.getMockResources().put(resourcePath, mockLocalResource);
-
-        // Create a mock file reference
-        FileReference mockFileRef = mock(FileReference.class);
-        when(mockFileRef.getRelativePath()).thenReturn(resourcePath);
-        when(mockFileRef.getAbsolutePath()).thenReturn(resourcePath);
-        when(concurrentTest.getMockServiceContext().getIoManager().resolveAbsolutePath(resourcePath))
-                .thenReturn(mockFileRef);
-
-        // Ensure the mock index is the one that will be returned during registration
-        concurrentTest.createMockIndexForPath(resourcePath, datasetId, partition, opTracker);
-    }
-
-    /**
-     * Tests that unregister operation succeeds during lazy recovery.
-     * This test verifies that resources are correctly unregistered during lazy recovery.
-     */
-    @Test
-    public void testConcurrentUnregisterDuringLazyRecovery() throws Exception {
-        LOGGER.info("Starting testConcurrentUnregisterDuringLazyRecovery");
-
-        // We need at least 6 resources for a good test
-        final int REQUIRED_RESOURCES = 6;
-
-        // Get multiple resources from the same dataset and partition
-        List<String> resourcesInSamePartition = findResourcesInSamePartition(REQUIRED_RESOURCES);
-
-        // If we don't have enough resources, create more
-        if (resourcesInSamePartition.size() < REQUIRED_RESOURCES) {
-            LOGGER.info("Only found {} resources, creating additional resources", resourcesInSamePartition.size());
-
-            // Get dataset and partition info from the existing resources, or create new ones
-            int datasetId;
-            int partitionId;
-            String datasetName;
-
-            if (!resourcesInSamePartition.isEmpty()) {
-                // Use the same dataset and partition as existing resources
-                String existingResource = resourcesInSamePartition.get(0);
-                LocalResource localResource = concurrentTest.getMockResources().get(existingResource);
-                DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
-                datasetId = datasetLocalResource.getDatasetId();
-                partitionId = datasetLocalResource.getPartition();
-
-                // Extract dataset name from the existing resource path
-                Pattern datasetPattern = Pattern.compile(".*?/partition_\\d+/[^/]+/[^/]+/([^/]+)/\\d+/.*");
-                Matcher matcher = datasetPattern.matcher(existingResource);
-                if (matcher.matches()) {
-                    datasetName = matcher.group(1);
-                    LOGGER.info("Using dataset name from existing resource: {}", datasetName);
-                } else {
-                    datasetName = "testDs";
-                    LOGGER.warn("Could not extract dataset name from {}, using default: {}", existingResource,
-                            datasetName);
-                }
-            } else {
-                // Create new dataset and partition
-                datasetId = 999; // Use a unique dataset ID
-                partitionId = 1; // Use partition 1
-                datasetName = "testDs";
-            }
-
-            LOGGER.info("Creating resources for dataset {} partition {}", datasetId, partitionId);
-
-            // Create additional resources until we reach the required count
-            for (int i = resourcesInSamePartition.size(); i < REQUIRED_RESOURCES; i++) {
-                String indexName = "test_index_" + i;
-                // Use the dataset name extracted from existing resources
-                String resourcePath = String.format("%s/partition_%d/%s/%s/%s/0/%s", "storage", partitionId, "Default",
-                        "SDefault", datasetName, indexName);
-
-                // Create new mock resources
-                setupMockResourceForTest(resourcePath, datasetId, partitionId, datasetId * 1000L + i);
-                resourcesInSamePartition.add(resourcePath);
-                // LOGGER.info("Created additional resource: {}", resourcePath);
-            }
-        }
-
-        LOGGER.info("Have {} resources for test", resourcesInSamePartition.size());
-
-        // Register ALL resources
-        List<String> registeredResources = new ArrayList<>();
-        for (String resourcePath : resourcesInSamePartition) {
-            IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
-            assertNotNull("Index should be registered successfully: " + resourcePath, index);
-            registeredResources.add(resourcePath);
-            LOGGER.info("Registered resource: {}", resourcePath);
-        }
-
-        // We'll use 1/3 of resources for unregistering
-        int unregisterCount = Math.max(2, resourcesInSamePartition.size() / 3);
-
-        // Split resources: one for opening, several for unregistering, rest for recovery
-        String resourceForOpen = registeredResources.get(0);
-        List<String> resourcesToUnregister = registeredResources.subList(1, 1 + unregisterCount);
-        List<String> otherResources = registeredResources.subList(1 + unregisterCount, registeredResources.size());
-
-        LOGGER.info("Using {} resources: 1 for open, {} for unregistering, {} for verification",
-                registeredResources.size(), resourcesToUnregister.size(), otherResources.size());
-
-        // Get the dataset and partition ID for tracking
-        LocalResource localResourceA = concurrentTest.getMockResources().get(resourceForOpen);
-        DatasetLocalResource datasetLocalResourceA = (DatasetLocalResource) localResourceA.getResource();
-        int datasetId = datasetLocalResourceA.getDatasetId();
-        int partitionId = datasetLocalResourceA.getPartition();
-
-        LOGGER.info("Test using dataset {} partition {}", datasetId, partitionId);
-
-        // Set up mock for getResources to return actual resources during recovery
-        ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
-        doAnswer(inv -> {
-            List<FileReference> refs = inv.getArgument(1);
-            if (refs != null && !refs.isEmpty()) {
-                // Create a map of resources to return
-                Map<Long, LocalResource> result = new HashMap<>();
-                // Add our test resources from the same partition
-                for (String path : registeredResources) {
-                    LocalResource res = concurrentTest.getMockResources().get(path);
-                    if (res != null) {
-                        result.put(res.getId(), res);
-                        LOGGER.info("Adding resource to recovery batch: {}", path);
-                    }
-                }
-                return result;
-            }
-            return Collections.emptyMap();
-        }).when(mockRepository).getResources(any(), any());
-
-        // Ensure operation tracker is properly initialized for the dataset resources
-        DatasetResource datasetResource = concurrentTest.getDatasetLifecycleManager().getDatasetLifecycle(datasetId);
-        if (datasetResource.getOpTracker(partitionId) == null) {
-            PrimaryIndexOperationTracker opTracker = mock(PrimaryIndexOperationTracker.class);
-            datasetResource.setPrimaryIndexOperationTracker(partitionId, opTracker);
-            LOGGER.info("Created and set operation tracker for dataset {} partition {}", datasetId, partitionId);
-        }
-
-        // Verify operation tracker exists before proceeding
-        assertNotNull("Operation tracker should be initialized for partition " + partitionId,
-                datasetResource.getOpTracker(partitionId));
-
-        // Execute operations with controlled timing
-        CountDownLatch recoveryStartedLatch = new CountDownLatch(1);
-        CountDownLatch unregisterCompleteLatch = new CountDownLatch(1);
-
-        // Mock the recovery to signal when it starts
-        IRecoveryManager mockRecoveryManager = concurrentTest.getMockRecoveryManager();
-        doAnswer(invocation -> {
-            List<ILSMIndex> indexes = invocation.getArgument(0);
-            LOGGER.info("Recovery started with {} indexes", indexes.size());
-
-            // Signal that recovery has started
-            recoveryStartedLatch.countDown();
-
-            // Wait for unregister to complete before continuing
-            LOGGER.info("Waiting for unregister to complete");
-            unregisterCompleteLatch.await(10, TimeUnit.SECONDS);
-            LOGGER.info("Continuing with recovery after unregister");
-
-            // Mark resources as recovered
-            for (ILSMIndex index : indexes) {
-                String path = index.toString();
-                recoveredResourcePaths.add(path);
-                // Remove individual resource logs
-            }
-            // Add a summary after the loop
-            LOGGER.info("Marked {} resources as recovered", indexes.size());
-
-            return null;
-        }).when(mockRecoveryManager).recoverIndexes(anyList());
-
-        // Thread 1: Open resource for open (will trigger recovery for all)
-        Future<?> openFuture = executorService.submit(() -> {
-            try {
-                LOGGER.info("Thread 1: Opening resource to trigger recovery...");
-                concurrentTest.getDatasetLifecycleManager().open(resourceForOpen);
-                LOGGER.info("Thread 1: Successfully opened resource");
-            } catch (Exception e) {
-                LOGGER.error("Thread 1: Error opening resource", e);
-            }
-        });
-
-        // Wait for recovery to start
-        LOGGER.info("Waiting for recovery to start");
-        boolean recoveryStarted = recoveryStartedLatch.await(10, TimeUnit.SECONDS);
-        assertTrue("Recovery should have started", recoveryStarted);
-        LOGGER.info("Recovery has started");
-
-        // Thread 2: Unregister multiple resources while recovery is happening
-        Future<?> unregisterFuture = executorService.submit(() -> {
-            try {
-                LOGGER.info("Thread 2: Unregistering {} resources...", resourcesToUnregister.size());
-
-                for (String resourcePath : resourcesToUnregister) {
-                    concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
-                }
-
-                // Signal that unregister is complete
-                unregisterCompleteLatch.countDown();
-                LOGGER.info("Thread 2: Successfully unregistered all resources");
-            } catch (Exception e) {
-                LOGGER.error("Thread 2: Error unregistering resources", e);
-                // Ensure we don't deadlock in case of failure
-                unregisterCompleteLatch.countDown();
-            }
-        });
-
-        // Wait for unregister to complete
-        unregisterFuture.get(50, TimeUnit.SECONDS);
-
-        // Wait for open to complete
-        openFuture.get(10, TimeUnit.SECONDS);
-
-        // Verify expectations
-        LOGGER.info("Checking final state");
-
-        // Check which resources were recovered
-        List<String> recoveredResources = new ArrayList<>();
-        List<String> nonRecoveredResources = new ArrayList<>();
-
-        for (String resourcePath : registeredResources) {
-            if (isResourcePathRecovered(resourcePath)) {
-                recoveredResources.add(resourcePath);
-            } else {
-                nonRecoveredResources.add(resourcePath);
-            }
-        }
-
-        LOGGER.info("Results: {} resources recovered, {} resources not recovered", recoveredResources.size(),
-                nonRecoveredResources.size());
-
-        // Resource for open should have been opened successfully
-        assertTrue("Resource for open should be recovered", isResourcePathRecovered(resourceForOpen));
-
-        // Resources to unregister should be unregistered
-        for (String resourcePath : resourcesToUnregister) {
-            IIndex indexAfter = concurrentTest.getDatasetLifecycleManager().get(resourcePath);
-            assertNull("Resource should be unregistered: " + resourcePath, indexAfter);
-        }
-
-        // Other resources should either be recovered or not, depending on timing
-        LOGGER.info("Completed testConcurrentUnregisterDuringLazyRecovery");
-    }
-
-    /**
-     * Helper method to find resources that belong to the same dataset and partition.
-     * 
-     * @param minCount Minimum number of resources needed
-     * @return List of resource paths from the same dataset and partition
-     */
-    private List<String> findResourcesInSamePartition(int minCount) {
-        // Group resources by dataset ID and partition ID
-        Map<String, List<String>> resourcesByDatasetAndPartition = new HashMap<>();
-
-        LOGGER.info("Looking for at least {} resources in the same partition", minCount);
-
-        for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
-            LocalResource localResource = concurrentTest.getMockResources().get(resourcePath);
-            if (localResource != null) {
-                DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
-                int datasetId = datasetLocalResource.getDatasetId();
-                int partitionId = datasetLocalResource.getPartition();
-
-                // Create a composite key using datasetId and partitionId
-                String key = datasetId + "_" + partitionId;
-                resourcesByDatasetAndPartition.computeIfAbsent(key, k -> new ArrayList<>()).add(resourcePath);
-            }
-        }
-
-        // Just log summary, not details
-        LOGGER.info("Found {} groups of resources by dataset/partition", resourcesByDatasetAndPartition.size());
-
-        // Find a group with enough resources
-        for (List<String> resources : resourcesByDatasetAndPartition.values()) {
-            if (resources.size() >= minCount) {
-                String firstResource = resources.get(0);
-                LocalResource localResource = concurrentTest.getMockResources().get(firstResource);
-                DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
-
-                LOGGER.info("Found suitable group with {} resources (dataset={}, partition={})", resources.size(),
-                        datasetLocalResource.getDatasetId(), datasetLocalResource.getPartition());
-                return resources;
-            }
-        }
-
-        // If no group has enough resources, return the one with the most
-        List<String> bestGroup = resourcesByDatasetAndPartition.values().stream()
-                .max(Comparator.comparingInt(List::size)).orElse(Collections.emptyList());
-
-        if (!bestGroup.isEmpty()) {
-            String firstResource = bestGroup.get(0);
-            LocalResource localResource = concurrentTest.getMockResources().get(firstResource);
-            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
-
-            LOGGER.info("Using best available group: {} resources (dataset={}, partition={})", bestGroup.size(),
-                    datasetLocalResource.getDatasetId(), datasetLocalResource.getPartition());
-        } else {
-            LOGGER.warn("Could not find any resources in the same partition");
-        }
-
-        return bestGroup;
-    }
-
-    /**
-     * Helper method to check if a resource path is contained in the recovered paths.
-     * This handles potential differences between the resource path string and the index.toString() format.
-     */
-    private boolean isResourcePathRecovered(String resourcePath) {
-        // Direct match check
-        if (recoveredResourcePaths.contains(resourcePath)) {
-            return true;
-        }
-
-        // For each recovered path, check if it contains the resource path
-        for (String recoveredPath : recoveredResourcePaths) {
-            if (recoveredPath.equals(resourcePath) ||
-            // Check if the recoveredPath contains the resourcePath (for formatted toString values)
-                    recoveredPath.contains(resourcePath)) {
-                return true;
-            }
-        }
-        return false;
-    }
-}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 320d089..cb4e068 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -138,13 +138,9 @@
 
     @Override
     public LocalResource get(String relativePath) throws HyracksDataException {
-        LocalResource resource = getLocalResourceFromCache(relativePath);
-        if (resource != null) {
-            return resource;
-        }
         beforeReadAccess();
         try {
-            resource = resourceCache.getIfPresent(relativePath);
+            LocalResource resource = resourceCache.getIfPresent(relativePath);
             if (resource == null) {
                 FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
                 resource = readLocalResource(resourceFile);
@@ -158,20 +154,6 @@
         }
     }
 
-    private LocalResource getLocalResourceFromCache(String relativePath) {
-        LocalResource resource;
-        beforeWriteAccess();
-        try {
-            resource = resourceCache.getIfPresent(relativePath);
-            if (resource != null) {
-                return resource;
-            }
-        } finally {
-            afterWriteAccess();
-        }
-        return null;
-    }
-
     @SuppressWarnings("squid:S1181")
     @Override
     public void insert(LocalResource resource) throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
index 3f87c6d..f5edc74 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
@@ -68,7 +68,7 @@
         ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory();
         localResourceRepository = localResourceRepositoryFactory.createRepository();
         resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
-        lcManager = new IndexLifecycleManager(appCtx, localResourceRepository);
+        lcManager = new IndexLifecycleManager();
     }
 
     public void close() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
index b03f54f..8a67c71 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
@@ -109,11 +109,6 @@
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
-        IIndex registeredIndex = lcManager.registerIfAbsent(resourceRelPath, index);
-        if (registeredIndex != index) {
-            // some other thread has registered the index
-            // indicating this is not the first time, the index is being created
-            throw new HyracksDataException("Index with resource ID " + resourceId + " already exists.");
-        }
+        lcManager.register(resourceRelPath, index);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 550ad24..b79c3b1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -20,25 +20,32 @@
 package org.apache.hyracks.storage.am.common.dataflow;
 
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.IResource;
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.storage.common.LocalResource;
-import org.apache.hyracks.util.annotations.NotThreadSafe;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
-@NotThreadSafe
 public class IndexDataflowHelper implements IIndexDataflowHelper {
 
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final INCServiceContext ctx;
     private final IResourceLifecycleManager<IIndex> lcManager;
     private final ILocalResourceRepository localResourceRepository;
     private final FileReference resourceRef;
     private IIndex index;
 
-    public IndexDataflowHelper(final INCServiceContext ctx, IStorageManager storageMgr, FileReference resourceRef) {
+    public IndexDataflowHelper(final INCServiceContext ctx, IStorageManager storageMgr, FileReference resourceRef)
+            throws HyracksDataException {
+        this.ctx = ctx;
         this.lcManager = storageMgr.getLifecycleManager(ctx);
         this.localResourceRepository = storageMgr.getLocalResourceRepository(ctx);
         this.resourceRef = resourceRef;
@@ -51,18 +58,56 @@
 
     @Override
     public void open() throws HyracksDataException {
-        index = lcManager.registerIfAbsent(resourceRef.getRelativePath(), index);
-        lcManager.open(resourceRef.getRelativePath());
+        //Get local resource file
+        synchronized (lcManager) {
+            index = lcManager.get(resourceRef.getRelativePath());
+            if (index == null) {
+                LocalResource lr = readIndex();
+                lcManager.register(lr.getPath(), index);
+            }
+            lcManager.open(resourceRef.getRelativePath());
+        }
+    }
+
+    private LocalResource readIndex() throws HyracksDataException {
+        // Get local resource
+        LocalResource lr = getResource();
+        if (lr == null) {
+            throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourceRef.getRelativePath());
+        }
+        IResource resource = lr.getResource();
+        index = resource.createInstance(ctx);
+        return lr;
     }
 
     @Override
     public void close() throws HyracksDataException {
-        lcManager.close(resourceRef.getRelativePath());
+        synchronized (lcManager) {
+            lcManager.close(resourceRef.getRelativePath());
+        }
     }
 
     @Override
     public void destroy() throws HyracksDataException {
-        lcManager.destroy(resourceRef.getRelativePath());
+        LOGGER.log(Level.INFO, "Dropping index " + resourceRef.getRelativePath() + " on node " + ctx.getNodeId());
+        synchronized (lcManager) {
+            index = lcManager.get(resourceRef.getRelativePath());
+            if (index != null) {
+                lcManager.unregister(resourceRef.getRelativePath());
+            } else {
+                readIndex();
+            }
+
+            if (getResourceId() != -1) {
+                localResourceRepository.delete(resourceRef.getRelativePath());
+            }
+            index.destroy();
+        }
+    }
+
+    private long getResourceId() throws HyracksDataException {
+        LocalResource lr = localResourceRepository.get(resourceRef.getRelativePath());
+        return lr == null ? -1 : lr.getId();
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index dd80bcb..ab301a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -25,33 +25,24 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.common.IIndex;
-import org.apache.hyracks.storage.common.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.IResource;
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
-import org.apache.hyracks.storage.common.LocalResource;
 
 public class IndexLifecycleManager implements IResourceLifecycleManager<IIndex>, ILifeCycleComponent {
     private static final long DEFAULT_MEMORY_BUDGET = 1024 * 1024 * 100; // 100 megabytes
 
     private final Map<String, IndexInfo> indexInfos;
     private final long memoryBudget;
-    private final INCServiceContext appCtx;
-    private final ILocalResourceRepository resourceRepository;
     private long memoryUsed;
 
-    public IndexLifecycleManager(INCServiceContext appCtx, ILocalResourceRepository resourceRepository) {
-        this(appCtx, resourceRepository, DEFAULT_MEMORY_BUDGET);
+    public IndexLifecycleManager() {
+        this(DEFAULT_MEMORY_BUDGET);
     }
 
-    public IndexLifecycleManager(INCServiceContext appCtx, ILocalResourceRepository resourceRepository,
-            long memoryBudget) {
-        this.appCtx = appCtx;
-        this.resourceRepository = resourceRepository;
+    public IndexLifecycleManager(long memoryBudget) {
         this.indexInfos = new HashMap<>();
         this.memoryBudget = memoryBudget;
         this.memoryUsed = 0;
@@ -168,34 +159,11 @@
     }
 
     @Override
-    public IIndex registerIfAbsent(String resourcePath, IIndex index) throws HyracksDataException {
+    public void register(String resourcePath, IIndex index) throws HyracksDataException {
         if (indexInfos.containsKey(resourcePath)) {
-            return indexInfos.get(resourcePath).index;
-        }
-        if (index == null) {
-            index = getOrCreate(resourcePath);
+            throw new HyracksDataException("Index with resource name " + resourcePath + " already exists.");
         }
         indexInfos.put(resourcePath, new IndexInfo(index));
-        return index;
-    }
-
-    public IIndex getOrCreate(String resourcePath) throws HyracksDataException {
-        IIndex index = get(resourcePath);
-        if (index == null) {
-            index = readIndex(resourcePath);
-            registerIfAbsent(resourcePath, index);
-        }
-        return index;
-    }
-
-    private IIndex readIndex(String resourcePath) throws HyracksDataException {
-        // Get local resource
-        LocalResource lr = resourceRepository.get(resourcePath);
-        if (lr == null) {
-            throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
-        }
-        IResource resource = lr.getResource();
-        return resource.createInstance(appCtx);
     }
 
     @Override
@@ -241,24 +209,4 @@
         }
         indexInfos.remove(resourcePath);
     }
-
-    @Override
-    public void destroy(String resourcePath) throws HyracksDataException {
-        IIndex index = get(resourcePath);
-        if (index != null) {
-            unregister(resourcePath);
-        } else {
-            readIndex(resourcePath);
-        }
-
-        if (getResourceId(resourcePath) != -1) {
-            resourceRepository.delete(resourcePath);
-        }
-        index.destroy();
-    }
-
-    private long getResourceId(String resourcePath) throws HyracksDataException {
-        LocalResource lr = resourceRepository.get(resourcePath);
-        return lr == null ? -1 : lr.getId();
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
index 1d74379..6200680 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
@@ -41,8 +41,10 @@
      *
      * @param resourceId
      * @param resource
+     * @throws HyracksDataException
+     *             if a resource is already registered with this resourceId
      */
-    public R registerIfAbsent(String resourceId, R resource) throws HyracksDataException;
+    public void register(String resourceId, R resource) throws HyracksDataException;
 
     /**
      * Opens a resource. The resource is moved to the open state
@@ -73,15 +75,8 @@
     /**
      * unregister a resource removing its resources in memory and on disk
      *
-     * @param resourcePath
+     * @param resourceId
      * @throws HyracksDataException
      */
-    public void unregister(String resourcePath) throws HyracksDataException;
-
-    /**
-     * delete the resource
-     * @param resourcePath
-     * @throws HyracksDataException
-     */
-    public void destroy(String resourcePath) throws HyracksDataException;
+    public void unregister(String resourceId) throws HyracksDataException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
index 1328bc4..0b4d2ed 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
@@ -59,7 +59,7 @@
 
     @Override
     public IResourceLifecycleManager<IIndex> getLifecycleManager(INCServiceContext ctx) {
-        return TestStorageManagerComponentHolder.getIndexLifecycleManager(ctx);
+        return TestStorageManagerComponentHolder.getIndexLifecycleManager();
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 1f7c0ab..7f696b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -78,9 +78,9 @@
         lcManager = null;
     }
 
-    public synchronized static IResourceLifecycleManager<IIndex> getIndexLifecycleManager(INCServiceContext ctx) {
+    public synchronized static IResourceLifecycleManager<IIndex> getIndexLifecycleManager() {
         if (lcManager == null) {
-            lcManager = new IndexLifecycleManager(ctx, getLocalResourceRepository());
+            lcManager = new IndexLifecycleManager();
         }
         return lcManager;
     }