[ASTERIXDB-3574][STO] Taking resource-level lock instead of global lock
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Currently, all dataset and index metadata operations (`create`,
`register`, `open`, `close`, and `unregister`) in
`DatasetLifecycleManager` are synchronized using a global lock,
leading to contention and limiting concurrency.
To improve performance, the global lock is replaced with
resource-specific locks where applicable. This allows operations
on different datasets or indexes to proceed in parallel.
Ext-ref: MB-65695
Change-Id: I9e3b931b363f082f0a7c69a0454adfd37937bb60
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19484
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 269f6f7..5035d25 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -284,7 +284,6 @@
long lsn = -1;
ILSMIndex index = null;
LocalResource localResource = null;
- DatasetLocalResource localResourceMetadata = null;
boolean foundWinner = false;
JobEntityCommits jobEntityWinners = null;
@@ -354,12 +353,11 @@
//if index is not registered into IndexLifeCycleManager,
//create the index using LocalMetadata stored in LocalResourceRepository
//get partition path in this node
- localResourceMetadata = (DatasetLocalResource) localResource.getResource();
index = (ILSMIndex) datasetLifecycleManager.get(localResource.getPath());
if (index == null) {
//#. create index instance and register to indexLifeCycleManager
- index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
- datasetLifecycleManager.register(localResource.getPath(), index);
+ index = (ILSMIndex) datasetLifecycleManager.registerIfAbsent(localResource.getPath(),
+ null);
datasetLifecycleManager.open(localResource.getPath());
try {
maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 952185b..7f3642b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -19,10 +19,10 @@
package org.apache.asterix.common.context;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.asterix.common.api.IIOBlockingOperation;
import org.apache.asterix.common.transactions.ILogManager;
@@ -57,8 +57,8 @@
private boolean durable;
public DatasetInfo(int datasetID, ILogManager logManager) {
- this.partitionIndexes = new HashMap<>();
- this.indexes = new HashMap<>();
+ this.partitionIndexes = new ConcurrentHashMap<>();
+ this.indexes = new ConcurrentHashMap<>();
this.partitionPendingIO = new Int2IntOpenHashMap();
this.setLastAccess(-1);
this.datasetID = datasetID;
@@ -200,13 +200,13 @@
return Collections.unmodifiableMap(indexes);
}
- public synchronized void addIndex(long resourceID, IndexInfo indexInfo) {
+ public void addIndex(long resourceID, IndexInfo indexInfo) {
indexes.put(resourceID, indexInfo);
partitionIndexes.computeIfAbsent(indexInfo.getPartition(), partition -> new HashSet<>()).add(indexInfo);
LOGGER.debug("registered reference to index {}", indexInfo);
}
- public synchronized void removeIndex(long resourceID) {
+ public void removeIndex(long resourceID) {
IndexInfo info = indexes.remove(resourceID);
if (info != null) {
partitionIndexes.get(info.getPartition()).remove(info);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 85916b0..c6edb4e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -23,10 +23,13 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.IntPredicate;
import java.util.function.Predicate;
@@ -62,13 +65,16 @@
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.IResource;
import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.hyracks.util.annotations.ThreadSafe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+@ThreadSafe
public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent {
private static final Logger LOGGER = LogManager.getLogger();
@@ -82,6 +88,8 @@
private final LogRecord waitLog;
protected final IDiskResourceCacheLockNotifier lockNotifier;
private volatile boolean stopped = false;
+ private final ReentrantReadWriteLock stopLock;
+ private final Map<String, ReentrantReadWriteLock> resourceLocks;
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
// all LSM-trees share the same virtual buffer cache list
private final List<IVirtualBufferCache> vbcs;
@@ -96,6 +104,7 @@
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
this.vbc = vbc;
+ this.stopLock = new ReentrantReadWriteLock();
int numMemoryComponents = storageProperties.getMemoryComponentsNum();
this.vbcs = new ArrayList<>(numMemoryComponents);
for (int i = 0; i < numMemoryComponents; i++) {
@@ -103,13 +112,14 @@
}
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
this.lockNotifier = lockNotifier;
+ this.resourceLocks = Collections.synchronizedMap(new WeakHashMap<>());
waitLog = new LogRecord();
waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
waitLog.computeAndSetLogSize();
}
@Override
- public synchronized ILSMIndex get(String resourcePath) throws HyracksDataException {
+ public ILSMIndex get(String resourcePath) throws HyracksDataException {
validateDatasetLifecycleManagerState();
int datasetID = getDIDfromResourcePath(resourcePath);
long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -117,7 +127,7 @@
}
@Override
- public synchronized ILSMIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
+ public ILSMIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
validateDatasetLifecycleManagerState();
DatasetResource datasetResource = datasets.get(datasetID);
if (datasetResource == null) {
@@ -127,16 +137,52 @@
}
@Override
- public synchronized void register(String resourcePath, IIndex index) throws HyracksDataException {
- validateDatasetLifecycleManagerState();
- int did = getDIDfromResourcePath(resourcePath);
- LocalResource resource = resourceRepository.get(resourcePath);
- DatasetResource datasetResource = datasets.get(did);
- lockNotifier.onRegister(resource, index);
- if (datasetResource == null) {
- datasetResource = getDatasetLifecycle(did);
+ public IIndex registerIfAbsent(String resourcePath, IIndex index) throws HyracksDataException {
+ stopLock.readLock().lock();
+ try {
+ validateDatasetLifecycleManagerState();
+
+ IIndex existingIndex = get(resourcePath);
+ if (existingIndex != null) {
+ return existingIndex;
+ }
+
+ ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
+ resourceLock.writeLock().lock();
+ try {
+ existingIndex = get(resourcePath);
+ if (existingIndex != null) {
+ return existingIndex;
+ }
+
+ if (index == null) {
+ index = getOrCreateIndex(resourcePath);
+ }
+
+ int datasetID = getDIDfromResourcePath(resourcePath);
+ LocalResource resource = resourceRepository.get(resourcePath);
+ lockNotifier.onRegister(resource, index);
+
+ DatasetResource datasetResource = datasets.get(datasetID);
+ if (datasetResource == null) {
+ datasetResource = getDatasetLifecycle(datasetID);
+ }
+
+ datasetResource.register(resource, (ILSMIndex) index);
+ } finally {
+ resourceLock.writeLock().unlock();
+ }
+ } finally {
+ stopLock.readLock().unlock();
}
- datasetResource.register(resource, (ILSMIndex) index);
+
+ return index;
+ }
+
+ private ReentrantReadWriteLock getResourceLock(String resourcePath) {
+ // create fair locks inorder to avoid starving.
+ // actually I believe there shouldn't be any kinda starving as the separate locks are created for each resource.
+ return resourceLocks.computeIfAbsent(resourcePath, k -> new ReentrantReadWriteLock(true));
}
protected int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
@@ -163,115 +209,261 @@
return (DatasetLocalResource) lr.getResource();
}
+ /**
+ * Concurrency considerations for dataset operations:
+ *
+ * This method requires dataset locks to handle concurrent operations:
+ *
+ * 1. Dataset-level locking:
+ * - The open() method works on all indexes of a dataset.
+ * - The first-time open() call triggers recoverIndex(), which acquires the dataset lock.
+ * - Race conditions may occur if index recovery happens while an index is being dropped.
+ *
+ * 2. Lock acquisition strategy:
+ * - Both the dataset lock and the local resource lock must be acquired.
+ * - This prevents register/open operations from occurring while an unregister operation is in progress.
+ * - These locks must be held until the unregister operation completes.
+ *
+ * 3. Possible race scenarios:
+ * Consider the following threads:
+ * - t1: unregister(ds/0/idx1)
+ * - t2: open(ds/0/idx1)
+ * - t3: unregister(ds/0/idx2)
+ * - t4: unregister(newDs/0/idx)
+ * - If t2 is running, t1 and t3 should wait (same dataset), but t4 can proceed (different dataset).
+ * - If t2 hasn't started yet (dataset lock not acquired), t1 and t3 could execute.
+ *
+ * 4. Race condition handling:
+ * - If t1 starts unregistering, t2 starts opening, and t3 starts unregistering simultaneously:
+ * - t2 takes the dataset lock.
+ * - Depending on the timing of registration completion, the resource repository may or may not contain the index.
+ * - If the index does not exist, an INDEX_DOES_NOT_EXIST error will occur.
+ *
+ * Note: At the compilation layer, exclusive dataset locks prevent DROP/UNREGISTER operations
+ * from colliding with OPEN/REGISTER operations.
+ */
@Override
- public synchronized void unregister(String resourcePath) throws HyracksDataException {
- validateDatasetLifecycleManagerState();
- int did = getDIDfromResourcePath(resourcePath);
- long resourceID = getResourceIDfromResourcePath(resourcePath);
- DatasetResource dsr = datasets.get(did);
- IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
-
- if (dsr == null || iInfo == null) {
- throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
- }
-
- lockNotifier.onUnregister(resourceID);
- PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
- if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
- if (LOGGER.isErrorEnabled()) {
- final String logMsg = String.format(
- "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
- resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
- LOGGER.error(logMsg);
- }
- throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
- StoragePathUtil.getIndexNameFromPath(resourcePath));
- }
-
- // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
- DatasetInfo dsInfo = dsr.getDatasetInfo();
- dsInfo.waitForIO();
- closeIndex(iInfo);
- dsInfo.removeIndex(resourceID);
- synchronized (dsInfo) {
- int referenceCount = dsInfo.getReferenceCount();
- boolean open = dsInfo.isOpen();
- boolean empty = dsInfo.getIndexes().isEmpty();
- if (referenceCount == 0 && open && empty && !dsInfo.isExternal()) {
- LOGGER.debug("removing dataset {} from cache", dsInfo.getDatasetID());
- removeDatasetFromCache(dsInfo.getDatasetID());
- } else {
- LOGGER.debug("keeping dataset {} in cache, ref count {}, open {}, indexes count: {}",
- dsInfo.getDatasetID(), referenceCount, open, dsInfo.getIndexes().size());
- }
- }
- }
-
- @Override
- public synchronized void open(String resourcePath) throws HyracksDataException {
- validateDatasetLifecycleManagerState();
- DatasetLocalResource localResource = getDatasetLocalResource(resourcePath);
- if (localResource == null) {
- throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
- }
- int did = getDIDfromResourcePath(resourcePath);
- long resourceID = getResourceIDfromResourcePath(resourcePath);
-
- lockNotifier.onOpen(resourceID);
+ public void unregister(String resourcePath) throws HyracksDataException {
+ stopLock.readLock().lock();
try {
- DatasetResource datasetResource = datasets.get(did);
- int partition = localResource.getPartition();
- if (shouldRecoverLazily(datasetResource, partition)) {
- performLocalRecovery(resourcePath, datasetResource, partition);
- } else {
- openResource(resourcePath, false);
+ validateDatasetLifecycleManagerState();
+ String datasetPartitionPath = StoragePathUtil.getDatasetPartitionPath(resourcePath);
+
+ ReentrantReadWriteLock partitionResourceLock = getResourceLock(datasetPartitionPath);
+ ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
+ partitionResourceLock.writeLock().lock();
+ try {
+ resourceLock.writeLock().lock();
+ try {
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
+ DatasetResource dsr = datasets.get(did);
+ IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+
+ if (dsr == null || iInfo == null) {
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
+ }
+
+ lockNotifier.onUnregister(resourceID);
+ PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+ if (iInfo.getReferenceCount() != 0
+ || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+ if (LOGGER.isErrorEnabled()) {
+ final String logMsg = String.format(
+ "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+ resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+ LOGGER.error(logMsg);
+ }
+ throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+ StoragePathUtil.getIndexNameFromPath(resourcePath));
+ }
+
+ // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+ DatasetInfo dsInfo = dsr.getDatasetInfo();
+ dsInfo.waitForIO();
+ closeIndex(iInfo);
+ dsInfo.removeIndex(resourceID);
+ synchronized (dsInfo) {
+ int referenceCount = dsInfo.getReferenceCount();
+ boolean open = dsInfo.isOpen();
+ boolean empty = dsInfo.getIndexes().isEmpty();
+ if (referenceCount == 0 && open && empty && !dsInfo.isExternal()) {
+ LOGGER.debug("removing dataset {} from cache", dsInfo.getDatasetID());
+ removeDatasetFromCache(dsInfo.getDatasetID());
+ } else {
+ LOGGER.debug("keeping dataset {} in cache, ref count {}, open {}, indexes count: {}",
+ dsInfo.getDatasetID(), referenceCount, open, dsInfo.getIndexes().size());
+ }
+ }
+ } finally {
+ resourceLock.writeLock().unlock();
+ }
+ } finally {
+ partitionResourceLock.writeLock().unlock();
}
} finally {
- lockNotifier.onClose(resourceID);
+ stopLock.readLock().unlock();
}
}
- private void performLocalRecovery(String resourcePath, DatasetResource datasetResource, int partition)
- throws HyracksDataException {
- LOGGER.debug("performing local recovery for dataset {} partition {}", datasetResource.getDatasetInfo(),
- partition);
- FileReference indexRootRef = StoragePathUtil.getIndexRootPath(serviceCtx.getIoManager(), resourcePath);
- Map<Long, LocalResource> resources = resourceRepository.getResources(r -> true, List.of(indexRootRef));
+ @Override
+ public void destroy(String resourcePath) throws HyracksDataException {
+ stopLock.readLock().lock();
+ try {
+ ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
+ resourceLock.writeLock().lock();
+ try {
+ LOGGER.info("Dropping index {} on node {}", resourcePath, serviceCtx.getNodeId());
+ IIndex index = get(resourcePath);
+ if (index != null) {
+ unregister(resourcePath);
+ } else {
+ index = readIndex(resourcePath);
+ }
+ if (getResourceId(resourcePath) != -1) {
+ resourceRepository.delete(resourcePath);
+ }
+ index.destroy();
+ } finally {
+ resourceLock.writeLock().unlock();
+ }
+ } finally {
+ stopLock.readLock().unlock();
+ }
+ }
- List<ILSMIndex> indexes = new ArrayList<>();
- for (LocalResource resource : resources.values()) {
- if (shouldSkipResource(resource)) {
- continue;
+ private long getResourceId(String resourcePath) throws HyracksDataException {
+ LocalResource lr = resourceRepository.get(resourcePath);
+ return lr == null ? -1 : lr.getId();
+ }
+
+ @Override
+ public void open(String resourcePath) throws HyracksDataException {
+ stopLock.readLock().lock();
+ try {
+ validateDatasetLifecycleManagerState();
+
+ DatasetLocalResource localResource = getDatasetLocalResource(resourcePath);
+ if (localResource == null) {
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
}
- ILSMIndex index = getOrCreateIndex(resource);
- boolean undoTouch = !resourcePath.equals(resource.getPath());
- openResource(resource.getPath(), undoTouch);
- indexes.add(index);
- }
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
- if (!indexes.isEmpty()) {
- recoveryMgr.recoverIndexes(indexes);
- }
+ // Notify before opening a resource
+ lockNotifier.onOpen(resourceID);
+ try {
+ DatasetResource datasetResource = datasets.get(did);
+ int partition = localResource.getPartition();
+ boolean lazyRecover = shouldRecoverLazily(datasetResource, partition);
- datasetResource.markRecovered(partition);
+ if (!lazyRecover) {
+ openResource(resourcePath, false);
+ return;
+ }
+
+ // Perform local recovery by taking a lock on the root resource
+ boolean recoveredByCurrentThread = performLocalRecovery(resourcePath, datasetResource, partition);
+
+ /*
+ * Concurrent Access Scenario for Index Resource Recovery:
+ * ------------------------------------------------------
+ * When multiple threads attempt to open the same index resource, a race
+ * condition can occur.
+ *
+ * Example:
+ * - Thread1 (handling ds/0/idx1) and Thread2 (handling ds/0/idx2) may both
+ * try to recover dataset indexes.
+ * - Thread1 enters `ensureIndexOpenAndConsistent()`, detects that the resource
+ * is not recovered, and starts recovery.
+ * - Before Thread1 marks recovery as complete, Thread2 also checks and finds
+ * the resource not recovered, so it attempts recovery too.
+ *
+ * However, index recovery is an expensive operation and should be performed by
+ * only one thread. To prevent multiple recoveries, `ensureIndexOpenAndConsistent()`
+ * must be synchronized on the root dataset resource. This ensures that only one
+ * thread performs the recovery, while others wait.
+ *
+ * Behavior After Synchronization:
+ * - Thread1 recovers and marks the index as recovered.
+ * - Thread2, after acquiring the lock, sees that the index is already recovered.
+ * - It returns `false` from `ensureIndexOpenAndConsistent()` and proceeds to call
+ * `open()`.
+ * - Since `open()` is idempotent, if the index is already open (`iInfo.isOpen()`),
+ * it does nothing except incrementing open stats.
+ */
+ if (!recoveredByCurrentThread) {
+ openResource(resourcePath, false);
+ }
+ } finally {
+ lockNotifier.onClose(resourceID);
+ }
+ } finally {
+ stopLock.readLock().unlock();
+ }
}
- private boolean shouldSkipResource(LocalResource resource) {
+ private boolean performLocalRecovery(String resourcePath, DatasetResource datasetResource, int partition)
+ throws HyracksDataException {
+
+ String indexRootRefPath = StoragePathUtil.getDatasetPartitionPath(resourcePath);
+ ReentrantReadWriteLock resourceLock = getResourceLock(indexRootRefPath);
+ FileReference indexRootRef = serviceCtx.getIoManager().resolve(indexRootRefPath);
+ resourceLock.writeLock().lock();
+ try {
+ if (!shouldRecoverLazily(datasetResource, partition)) {
+ return false;
+ }
+ LOGGER.debug("performing local recovery for dataset {} partition {}", datasetResource.getDatasetInfo(),
+ partition);
+ Map<Long, LocalResource> resources = resourceRepository.getResources(r -> true, List.of(indexRootRef));
+
+ List<ILSMIndex> indexes = new ArrayList<>();
+ for (LocalResource resource : resources.values()) {
+ if (shouldSkipRecoveringResource(resource)) {
+ continue;
+ }
+
+ ILSMIndex index = (ILSMIndex) registerIfAbsent(resource.getPath(), null);
+ boolean undoTouch = !resourcePath.equals(resource.getPath());
+ openResource(resource.getPath(), undoTouch);
+ indexes.add(index);
+ }
+
+ if (!indexes.isEmpty()) {
+ recoveryMgr.recoverIndexes(indexes);
+ }
+
+ datasetResource.markRecovered(partition);
+ return true;
+ } finally {
+ resourceLock.writeLock().unlock();
+ }
+ }
+
+ private boolean shouldSkipRecoveringResource(LocalResource resource) {
DatasetLocalResource lr = (DatasetLocalResource) resource.getResource();
return MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId())
|| (lr.getResource() instanceof LSMBTreeLocalResource
&& ((LSMBTreeLocalResource) lr.getResource()).isSecondaryNoIncrementalMaintenance());
}
- private ILSMIndex getOrCreateIndex(LocalResource resource) throws HyracksDataException {
- ILSMIndex index = get(resource.getPath());
- if (index == null) {
- DatasetLocalResource lr = (DatasetLocalResource) resource.getResource();
- index = (ILSMIndex) lr.createInstance(serviceCtx);
- register(resource.getPath(), index);
+ private IIndex getOrCreateIndex(String resourcePath) throws HyracksDataException {
+ IIndex index = get(resourcePath);
+ if (index != null) {
+ return index;
}
- return index;
+ return readIndex(resourcePath);
+ }
+
+ private IIndex readIndex(String resourcePath) throws HyracksDataException {
+ LocalResource lr = resourceRepository.get(resourcePath);
+ if (lr == null) {
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
+ }
+ IResource resource = lr.getResource();
+ return resource.createInstance(serviceCtx);
}
private void openResource(String resourcePath, boolean undoTouch) throws HyracksDataException {
@@ -300,11 +492,15 @@
boolean indexTouched = false;
try {
if (!iInfo.isOpen()) {
- ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
- synchronized (opTracker) {
- iInfo.getIndex().activate();
+ synchronized (iInfo) {
+ if (!iInfo.isOpen()) {
+ ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
+ synchronized (opTracker) {
+ iInfo.getIndex().activate();
+ }
+ iInfo.setOpen(true);
+ }
}
- iInfo.setOpen(true);
}
iInfo.touch();
indexTouched = true;
@@ -334,15 +530,7 @@
if (dsr != null) {
return dsr;
}
- synchronized (datasets) {
- dsr = datasets.get(did);
- if (dsr == null) {
- DatasetInfo dsInfo = new DatasetInfo(did, logManager);
- dsr = new DatasetResource(dsInfo);
- datasets.put(did, dsr);
- }
- return dsr;
- }
+ return datasets.computeIfAbsent(did, k -> new DatasetResource(new DatasetInfo(did, logManager)));
}
@Override
@@ -351,46 +539,62 @@
}
@Override
- public synchronized void close(String resourcePath) throws HyracksDataException {
- DatasetResource dsr = null;
- IndexInfo iInfo = null;
+ public void close(String resourcePath) throws HyracksDataException {
+ stopLock.readLock().lock();
try {
- validateDatasetLifecycleManagerState();
- int did = getDIDfromResourcePath(resourcePath);
- long resourceID = getResourceIDfromResourcePath(resourcePath);
- dsr = datasets.get(did);
- if (dsr == null) {
- throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
+ DatasetResource dsr = null;
+ IndexInfo iInfo = null;
+
+ // A resource lock may not be necessary if the unregister case does not need handling.
+ ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
+ resourceLock.writeLock().lock();
+ try {
+ validateDatasetLifecycleManagerState();
+
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
+
+ dsr = datasets.get(did);
+ if (dsr == null) {
+ throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
+ }
+
+ iInfo = dsr.getIndexInfo(resourceID);
+ if (iInfo == null) {
+ throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
+ }
+
+ lockNotifier.onClose(resourceID);
+ } finally {
+ // Regardless of any exceptions thrown in the try block (e.g., missing index),
+ // we must ensure that the index and dataset are marked as untouched.
+ if (iInfo != null) {
+ iInfo.untouch();
+ }
+ if (dsr != null) {
+ dsr.untouch();
+ }
+ resourceLock.writeLock().unlock();
}
- iInfo = dsr.getIndexInfo(resourceID);
- if (iInfo == null) {
- throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
- }
- lockNotifier.onClose(resourceID);
} finally {
- // Regardless of what exception is thrown in the try-block (e.g., line 279),
- // we have to un-touch the index and dataset.
- if (iInfo != null) {
- iInfo.untouch();
- }
- if (dsr != null) {
- dsr.untouch();
- }
+ stopLock.readLock().unlock();
}
}
@Override
- public synchronized List<IIndex> getOpenResources() {
- List<IndexInfo> openIndexesInfo = getOpenIndexesInfo();
- List<IIndex> openIndexes = new ArrayList<>();
- for (IndexInfo iInfo : openIndexesInfo) {
- openIndexes.add(iInfo.getIndex());
+ public List<IIndex> getOpenResources() {
+ synchronized (datasets) {
+ List<IndexInfo> openIndexesInfo = getOpenIndexesInfo();
+ List<IIndex> openIndexes = new ArrayList<>();
+ for (IndexInfo iInfo : openIndexesInfo) {
+ openIndexes.add(iInfo.getIndex());
+ }
+ return openIndexes;
}
- return openIndexes;
}
@Override
- public synchronized List<IndexInfo> getOpenIndexesInfo() {
+ public List<IndexInfo> getOpenIndexesInfo() {
List<IndexInfo> openIndexesInfo = new ArrayList<>();
for (DatasetResource dsr : datasets.values()) {
for (IndexInfo iInfo : dsr.getIndexes().values()) {
@@ -412,27 +616,50 @@
}
@Override
- public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path) {
+ public PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String resourcePath) {
DatasetResource dataset = getDatasetLifecycle(datasetId);
PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition);
- if (opTracker == null) {
- populateOpTrackerAndIdGenerator(dataset, partition, path);
- opTracker = dataset.getOpTracker(partition);
+ if (opTracker != null) {
+ return opTracker;
}
- return opTracker;
+ ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
+ resourceLock.writeLock().lock();
+ try {
+ opTracker = dataset.getOpTracker(partition);
+ if (opTracker != null) {
+ return opTracker;
+ }
+ populateOpTrackerAndIdGenerator(dataset, partition, resourcePath);
+ opTracker = dataset.getOpTracker(partition);
+ return opTracker;
+ } finally {
+ resourceLock.writeLock().unlock();
+ }
}
@Override
- public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path) {
+ public ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path) {
DatasetResource dataset = datasets.get(datasetId);
ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition);
- if (generator == null) {
+ if (generator != null) {
+ return generator;
+ }
+ ReentrantReadWriteLock resourceLock = getResourceLock(path);
+ resourceLock.writeLock().lock();
+ try {
+ generator = dataset.getComponentIdGenerator(partition);
+ if (generator != null) {
+ return generator;
+ }
populateOpTrackerAndIdGenerator(dataset, partition, path);
generator = dataset.getComponentIdGenerator(partition);
+ return generator;
+ } finally {
+ resourceLock.writeLock().unlock();
}
- return generator;
}
+ // this function is not being used.
@Override
public synchronized IRateLimiter getRateLimiter(int datasetId, int partition, long writeRateLimit) {
DatasetResource dataset = datasets.get(datasetId);
@@ -444,7 +671,7 @@
}
@Override
- public synchronized boolean isRegistered(int datasetId) {
+ public boolean isRegistered(int datasetId) {
return datasets.containsKey(datasetId);
}
@@ -476,34 +703,39 @@
}
@Override
- public synchronized void flushAllDatasets() throws HyracksDataException {
+ public void flushAllDatasets() throws HyracksDataException {
flushAllDatasets(partition -> true);
}
@Override
- public synchronized void flushAllDatasets(IntPredicate partitions) throws HyracksDataException {
- for (DatasetResource dsr : datasets.values()) {
- if (dsr.getDatasetInfo().isOpen()) {
- flushDatasetOpenIndexes(dsr, partitions, false);
+ public void flushAllDatasets(IntPredicate partitions) throws HyracksDataException {
+ synchronized (datasets) {
+ for (DatasetResource dsr : datasets.values()) {
+ if (dsr.getDatasetInfo().isOpen()) {
+ flushDatasetOpenIndexes(dsr, partitions, false);
+ }
}
}
}
@Override
- public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
- DatasetResource dsr = datasets.get(datasetId);
- if (dsr != null) {
- flushDatasetOpenIndexes(dsr, p -> true, asyncFlush);
+ public void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
+ synchronized (datasets) {
+ DatasetResource dsr = datasets.get(datasetId);
+ if (dsr != null) {
+ flushDatasetOpenIndexes(dsr, p -> true, asyncFlush);
+ }
}
}
@Override
- public synchronized void asyncFlushMatchingIndexes(Predicate<ILSMIndex> indexPredicate)
- throws HyracksDataException {
- for (DatasetResource dsr : datasets.values()) {
- for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
- synchronized (opTracker) {
- asyncFlush(dsr, opTracker, indexPredicate);
+ public void asyncFlushMatchingIndexes(Predicate<ILSMIndex> indexPredicate) throws HyracksDataException {
+ synchronized (datasets) {
+ for (DatasetResource dsr : datasets.values()) {
+ for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
+ synchronized (opTracker) {
+ asyncFlush(dsr, opTracker, indexPredicate);
+ }
}
}
}
@@ -594,38 +826,45 @@
}
@Override
- public synchronized void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException {
- ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
- for (DatasetResource dsr : openDatasets) {
- if (dsr.isOpen() && datasetsToClose.contains(dsr.getDatasetID())) {
- closeDataset(dsr);
+ public void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException {
+ synchronized (datasets) {
+ for (DatasetResource dsr : datasets.values()) {
+ if (dsr.isOpen() && datasetsToClose.contains(dsr.getDatasetID())) {
+ closeDataset(dsr);
+ }
}
}
}
@Override
- public synchronized void closeAllDatasets() throws HyracksDataException {
- ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
- for (DatasetResource dsr : openDatasets) {
- if (dsr.isOpen()) {
- closeDataset(dsr);
+ public void closeAllDatasets() throws HyracksDataException {
+ synchronized (datasets) {
+ for (DatasetResource dsr : datasets.values()) {
+ if (dsr.isOpen()) {
+ closeDataset(dsr);
+ }
}
}
}
@Override
public synchronized void stop(boolean dumpState, OutputStream outputStream) throws IOException {
- if (stopped) {
- return;
- }
- if (dumpState) {
- dumpState(outputStream);
- }
+ stopLock.writeLock().lock();
+ try {
+ if (stopped) {
+ return;
+ }
+ if (dumpState) {
+ dumpState(outputStream);
+ }
- closeAllDatasets();
+ closeAllDatasets();
- datasets.clear();
- stopped = true;
+ datasets.clear();
+ stopped = true;
+ } finally {
+ stopLock.writeLock().unlock();
+ }
}
@Override
@@ -665,9 +904,11 @@
@Override
public void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions)
throws HyracksDataException {
- for (DatasetResource dsr : datasets.values()) {
- if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
- flushDatasetOpenIndexes(dsr, partitions, false);
+ synchronized (datasets) {
+ for (DatasetResource dsr : datasets.values()) {
+ if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
+ flushDatasetOpenIndexes(dsr, partitions, false);
+ }
}
}
}
@@ -720,47 +961,60 @@
//TODO refactor this method with unregister method
@Override
- public synchronized void closeIfOpen(String resourcePath) throws HyracksDataException {
- validateDatasetLifecycleManagerState();
- int did = getDIDfromResourcePath(resourcePath);
- long resourceID = getResourceIDfromResourcePath(resourcePath);
+ public void closeIfOpen(String resourcePath) throws HyracksDataException {
+ stopLock.readLock().lock();
+ try {
+ validateDatasetLifecycleManagerState();
+ ReentrantReadWriteLock resourceLock = getResourceLock(resourcePath);
+ resourceLock.writeLock().lock();
+ try {
+ int did = getDIDfromResourcePath(resourcePath);
+ long resourceID = getResourceIDfromResourcePath(resourcePath);
- DatasetResource dsr = datasets.get(did);
- IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
+ DatasetResource dsr = datasets.get(did);
+ IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
- if (dsr == null || iInfo == null) {
- return;
- }
+ if (dsr == null || iInfo == null) {
+ return;
+ }
- PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
- if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
- if (LOGGER.isErrorEnabled()) {
- final String logMsg = String.format(
- "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
- resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
- LOGGER.error(logMsg);
+ PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
+ if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+ if (LOGGER.isErrorEnabled()) {
+ final String logMsg = String.format(
+ "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)",
+ resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations());
+ LOGGER.error(logMsg);
+ }
+ throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
+ StoragePathUtil.getIndexNameFromPath(resourcePath));
+ }
+
+ // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+ DatasetInfo dsInfo = dsr.getDatasetInfo();
+ dsInfo.waitForIO();
+ closeIndex(iInfo);
+ dsInfo.removeIndex(resourceID);
+ synchronized (dsInfo) {
+ if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
+ && !dsInfo.isExternal()) {
+ removeDatasetFromCache(dsInfo.getDatasetID());
+ }
+ }
+ } finally {
+ resourceLock.writeLock().unlock();
}
- throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
- StoragePathUtil.getIndexNameFromPath(resourcePath));
- }
-
- // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
- DatasetInfo dsInfo = dsr.getDatasetInfo();
- dsInfo.waitForIO();
- closeIndex(iInfo);
- dsInfo.removeIndex(resourceID);
- synchronized (dsInfo) {
- if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
- && !dsInfo.isExternal()) {
- removeDatasetFromCache(dsInfo.getDatasetID());
- }
+ } finally {
+ stopLock.readLock().unlock();
}
}
@Override
- public synchronized void closePartition(int partitionId) {
- for (DatasetResource ds : datasets.values()) {
- ds.removePartition(partitionId);
+ public void closePartition(int partitionId) {
+ synchronized (datasets) {
+ for (DatasetResource ds : datasets.values()) {
+ ds.removePartition(partitionId);
+ }
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 8e3081d..0c8f141 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -19,10 +19,10 @@
package org.apache.asterix.common.context;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
@@ -54,9 +54,9 @@
public DatasetResource(DatasetInfo datasetInfo) {
this.datasetInfo = datasetInfo;
- this.datasetPrimaryOpTrackers = new HashMap<>();
- this.datasetComponentIdGenerators = new HashMap<>();
- this.datasetRateLimiters = new HashMap<>();
+ this.datasetPrimaryOpTrackers = new ConcurrentHashMap<>();
+ this.datasetComponentIdGenerators = new ConcurrentHashMap<>();
+ this.datasetRateLimiters = new ConcurrentHashMap<>();
this.recoveredPartitions = new HashSet<>();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 2fdfba4..9f0f5c7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -139,11 +139,9 @@
return ResourceReference.of(fileAbsolutePath).getFileRelativePath().toString();
}
- public static FileReference getIndexRootPath(IIOManager ioManager, String relativePath)
- throws HyracksDataException {
+ public static String getDatasetPartitionPath(String relativePath) {
int separatorIndex = relativePath.lastIndexOf(File.separatorChar);
- String parentDirectory = relativePath.substring(0, separatorIndex);
- return ioManager.resolve(parentDirectory);
+ return relativePath.substring(0, separatorIndex);
}
/**
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerConcurrentTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerConcurrentTest.java
new file mode 100644
index 0000000..d15a9d4
--- /dev/null
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerConcurrentTest.java
@@ -0,0 +1,2104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.config.StorageProperties;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.replication.IReplicationJob;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.cloud.IIndexDiskCacheManager;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexAccessParameters;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Concurrent tests for the DatasetLifecycleManager class.
+ *
+ * This test class verifies that the DatasetLifecycleManager properly handles
+ * concurrent register, open, close, and unregister operations on indexes.
+ *
+ * The tests use mock objects to simulate the index lifecycle and verify:
+ * 1. Thread safety of operations
+ * 2. Correct handling of re-registration attempts
+ * 3. Proper activation/deactivation during open/close
+ * 4. Correct identity preservation for index instances
+ * 5. Concurrent access to shared resources
+ *
+ * Each test method focuses on a specific aspect of concurrent behavior:
+ * - testConcurrentRegisterAndOpen: Tests register and open operations
+ * - testConcurrentRegisterOpenAndUnregister: Tests the full lifecycle
+ * - testRegisterAlreadyRegisteredIndex: Tests re-registration behavior
+ * - testConcurrentOpenSameIndex: Tests opening the same index concurrently
+ * - testConcurrentCloseAfterOpen: Tests closing after open operations
+ * - testMockIndexIdentity: Tests the mock setup itself
+ */
+public class DatasetLifecycleManagerConcurrentTest {
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ // Configuration constants
+ private static final int NUM_DATASETS = 3;
+ private static final int NUM_PARTITIONS = 2;
+ private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors();
+ private static final int NUM_OPERATIONS_PER_THREAD = 20;
+
+ // For resource paths
+ private static final String DB_NAME = "Default";
+ private static final String SCOPE_NAME = "SDefault";
+ private static final String STORAGE_DIR = "storage";
+
+ private DatasetLifecycleManager datasetLifecycleManager;
+ private INCServiceContext mockServiceContext;
+ private StorageProperties mockStorageProperties;
+ private ILocalResourceRepository mockResourceRepository;
+ private IRecoveryManager mockRecoveryManager;
+ private ILogManager mockLogManager;
+ private IVirtualBufferCache mockVBC;
+ private IIndexCheckpointManagerProvider mockCheckpointProvider;
+ private IDiskResourceCacheLockNotifier mockLockNotifier;
+ private IIOManager mockIOManager;
+
+ private Map<String, MockIndex> mockIndexes;
+ private Map<String, LocalResource> mockResources;
+
+ private ExecutorService executorService;
+ private List<String> createdResourcePaths;
+
+ private Map<Integer, DatasetResource> datasetResourceMap;
+
+ // Add this field after the other private fields
+ private boolean lazyRecoveryEnabled = false;
+
+ /**
+ * Sets up the test environment with mock objects and resources.
+ * This includes:
+ * 1. Creating mock service context and other components
+ * 2. Setting up mock resources with unique IDs
+ * 3. Creating the DatasetLifecycleManager with mock dependencies
+ * 4. Setting up a thread pool for concurrent operations
+ */
+ @Before
+ public void setUp() throws Exception {
+ // Initialize collections
+ mockIndexes = new ConcurrentHashMap<>();
+ mockResources = new ConcurrentHashMap<>();
+ createdResourcePaths = new ArrayList<>();
+ datasetResourceMap = new HashMap<>();
+
+ // Set up mocks
+ mockServiceContext = mock(INCServiceContext.class);
+ mockStorageProperties = mock(StorageProperties.class);
+ mockResourceRepository = mock(ILocalResourceRepository.class);
+ mockRecoveryManager = mock(IRecoveryManager.class);
+ mockLogManager = mock(ILogManager.class);
+ mockVBC = mock(IVirtualBufferCache.class);
+ mockCheckpointProvider = mock(IIndexCheckpointManagerProvider.class);
+ mockLockNotifier = mock(IDiskResourceCacheLockNotifier.class);
+ mockIOManager = mock(IIOManager.class);
+
+ // Mock behavior
+ when(mockServiceContext.getIoManager()).thenReturn(mockIOManager);
+ when(mockStorageProperties.getMemoryComponentsNum()).thenReturn(2);
+ when(mockRecoveryManager.isLazyRecoveryEnabled()).thenReturn(lazyRecoveryEnabled);
+
+ // Create DatasetLifecycleManager FIRST
+ datasetLifecycleManager =
+ new DatasetLifecycleManager(mockServiceContext, mockStorageProperties, mockResourceRepository,
+ mockRecoveryManager, mockLogManager, mockVBC, mockCheckpointProvider, mockLockNotifier);
+
+ // NEXT get the datasets map via reflection
+ try {
+ Field datasetsField = DatasetLifecycleManager.class.getDeclaredField("datasets");
+ datasetsField.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ Map<Integer, DatasetResource> datasets =
+ (Map<Integer, DatasetResource>) datasetsField.get(datasetLifecycleManager);
+ if (datasets != null) {
+ LOGGER.info("Accessed datasets map successfully, found {} entries", datasets.size());
+ // Store a direct reference to the actual map
+ datasetResourceMap = datasets;
+ } else {
+ LOGGER.warn("Retrieved datasets map is null");
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to access datasets field via reflection", e);
+ }
+
+ // FINALLY setup mock resources (so they get the real datasets map)
+ setupMockResources();
+
+ executorService = Executors.newFixedThreadPool(NUM_THREADS);
+ }
+
+ /**
+ * Helper method to set the lazy recovery flag and reconfigure the mock
+ */
+ public void setLazyRecoveryEnabled(boolean enabled) {
+ this.lazyRecoveryEnabled = enabled;
+ when(mockRecoveryManager.isLazyRecoveryEnabled()).thenReturn(enabled);
+ LOGGER.info("Set lazy recovery enabled to: {}", enabled);
+ }
+
+ /**
+ * Creates mock resources for testing.
+ * For each combination of dataset and partition:
+ * 1. Creates primary and secondary indexes with unique resource IDs
+ * 2. Sets up the repository to return the appropriate resources
+ * 3. Verifies that all resources have unique IDs
+ *
+ * This is crucial for testing the register/open/unregister lifecycle,
+ * as it ensures each resource path maps to a unique mock index.
+ */
+ private void setupMockResources() throws HyracksDataException {
+ LOGGER.debug("Setting up mock resources");
+
+ // Setup mock resources for different datasets and partitions
+ for (int i = 0; i < NUM_DATASETS; i++) {
+ // Use datasetId starting from 101 to avoid reserved IDs (below 100)
+ int datasetId = 101 + i;
+ String datasetName = "ds" + i;
+ for (int j = 0; j < NUM_PARTITIONS; j++) {
+ // Create primary index
+ String primaryPath = getResourcePath(j, datasetName, 0, datasetName);
+ setupMockResource(primaryPath, datasetId, j, datasetId * 100 + j * 10);
+
+ // Create secondary index
+ String secondaryPath = getResourcePath(j, datasetName, 1, datasetName + "_idx");
+ setupMockResource(secondaryPath, datasetId, j, datasetId * 100 + j * 10 + 1);
+
+ createdResourcePaths.add(primaryPath);
+ createdResourcePaths.add(secondaryPath);
+ }
+ }
+
+ // Wire up the mockResourceRepository to return our mock resources
+ when(mockResourceRepository.get(anyString())).thenAnswer(invocation -> {
+ String path = invocation.getArgument(0);
+ LocalResource resource = mockResources.get(path);
+ LOGGER.debug("get({}) returning {}", path, resource);
+ return resource;
+ });
+
+ LOGGER.debug("Created {} mock resources", mockResources.size());
+ }
+
+ /**
+ * Sets up a mock resource for a specific resource path.
+ * For each resource path, this method:
+ * 1. Creates a unique MockIndex instance
+ * 2. Creates a mock DatasetLocalResource that returns the MockIndex
+ * 3. Creates a mock LocalResource with the unique resourceId
+ * 4. Sets up the IO manager to resolve the resource path
+ *
+ * The key pattern here is that each resource path must map to a unique
+ * MockIndex, and each resource must have a unique ID.
+ */
+ private void setupMockResource(String resourcePath, int datasetId, int partition, long resourceId)
+ throws HyracksDataException {
+
+ // Create and store a mock index, passing the datasetResourceMap
+ MockIndex mockIndex = new MockIndex(resourcePath, datasetId, datasetResourceMap);
+ mockIndexes.put(resourcePath, mockIndex);
+
+ // Create a mock dataset local resource
+ DatasetLocalResource mockDatasetResource = mock(DatasetLocalResource.class);
+ when(mockDatasetResource.getDatasetId()).thenReturn(datasetId);
+ when(mockDatasetResource.getPartition()).thenReturn(partition);
+
+ // Important: We need to ensure each createInstance call returns the specific MockIndex
+ // for this resource path
+ when(mockDatasetResource.createInstance(mockServiceContext)).thenAnswer(invocation -> {
+ return mockIndexes.get(resourcePath);
+ });
+
+ // Create a mock local resource
+ LocalResource mockLocalResource = mock(LocalResource.class);
+ when(mockLocalResource.getId()).thenReturn(resourceId);
+ when(mockLocalResource.getPath()).thenReturn(resourcePath);
+ when(mockLocalResource.getResource()).thenReturn(mockDatasetResource);
+
+ mockResources.put(resourcePath, mockLocalResource);
+
+ // Create a mock file reference
+ FileReference mockFileRef = mock(FileReference.class);
+ when(mockFileRef.getRelativePath()).thenReturn(resourcePath);
+ when(mockIOManager.resolveAbsolutePath(resourcePath)).thenReturn(mockFileRef);
+ }
+
+ /**
+ * Generates a standardized resource path for a given partition, dataset, and index.
+ * Format: storage/partition_X/DbName/ScopeName/DatasetName/ReplicationFactor/ResourceID_IndexName
+ * This ensures consistent resource path formatting throughout the tests.
+ *
+ * Note: The replication factor is always 0 in production environments, so we match that here.
+ */
+ private String getResourcePath(int partition, String datasetName, int resourceId, String indexName) {
+ // Format: storage/partition_X/DbName/ScopeName/DatasetName/ReplicationFactor/IndexName
+ // Always use 0 for replication factor, and include resourceId as part of the index name
+ return String.format("%s/partition_%d/%s/%s/%s/0/%s_%s", STORAGE_DIR, partition, DB_NAME, SCOPE_NAME,
+ datasetName, resourceId, indexName);
+ }
+
+ /**
+ * Cleans up after tests by shutting down the executor service.
+ */
+ @After
+ public void tearDown() throws Exception {
+ executorService.shutdownNow();
+ executorService.awaitTermination(5, TimeUnit.SECONDS);
+ datasetLifecycleManager.stop(false, null);
+ }
+
+ /**
+ * Tests concurrent registration and opening of indexes.
+ * This test verifies that:
+ * 1. Multiple threads can concurrently register and open indexes without causing errors
+ * 2. Re-registration of an already registered index returns the same index instance
+ * 3. Indexes are properly activated when opened
+ * 4. All register and open operations properly maintain index state
+ */
+ @Test
+ public void testConcurrentRegisterIfAbsentAndOpen() throws Exception {
+ LOGGER.info("Starting testConcurrentRegisterAndOpen");
+
+ final CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS);
+ final AtomicInteger successCount = new AtomicInteger(0);
+ final AtomicInteger errorCount = new AtomicInteger(0);
+ final Map<String, IIndex> firstRegistrations = new ConcurrentHashMap<>();
+ final List<Future<?>> futures = new ArrayList<>();
+
+ // Register half of the resources upfront
+ int preRegisteredCount = createdResourcePaths.size() / 2;
+ for (int i = 0; i < preRegisteredCount; i++) {
+ String resourcePath = createdResourcePaths.get(i);
+ IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+ firstRegistrations.put(resourcePath, index);
+ }
+
+ // Create threads that will randomly register/re-register and open indexes
+ for (int i = 0; i < NUM_THREADS; i++) {
+ futures.add(executorService.submit(() -> {
+ try {
+ barrier.await(); // Ensure all threads start at the same time
+
+ Random random = new Random();
+ for (int j = 0; j < NUM_OPERATIONS_PER_THREAD; j++) {
+ // Pick a random resource path
+ String resourcePath = createdResourcePaths.get(random.nextInt(createdResourcePaths.size()));
+
+ // Randomly choose between register or open
+ if (random.nextBoolean()) {
+ IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+ assertNotNull("Index should not be null after register", index);
+
+ // If this path was previously registered, verify it's the same instance
+ IIndex firstInstance = firstRegistrations.get(resourcePath);
+ if (firstInstance != null) {
+ assertSame("Re-registration should return the same index instance", firstInstance,
+ index);
+ } else {
+ // First time registering this path, record the instance
+ firstRegistrations.putIfAbsent(resourcePath, index);
+ }
+
+ successCount.incrementAndGet();
+ } else {
+ try {
+ // Try to register first if not already registered
+ if (!firstRegistrations.containsKey(resourcePath)) {
+ IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+ firstRegistrations.putIfAbsent(resourcePath, index);
+ }
+
+ // Now open it
+ datasetLifecycleManager.open(resourcePath);
+ ILSMIndex index = (ILSMIndex) datasetLifecycleManager.get(resourcePath);
+ assertNotNull("Index should not be null after open", index);
+ assertTrue("Index should be activated after open", ((MockIndex) index).isActivated());
+ successCount.incrementAndGet();
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ errorCount.incrementAndGet();
+ }
+ }));
+ }
+
+ // Wait for all threads to complete
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ assertEquals("No unexpected errors should occur", 0, errorCount.get());
+ LOGGER.info("Completed testConcurrentRegisterAndOpen: {} successful operations", successCount.get());
+ assertTrue("Some operations should succeed", successCount.get() > 0);
+
+ // Verify all resources are now registered
+ for (String resourcePath : createdResourcePaths) {
+ IIndex index = firstRegistrations.get(resourcePath);
+ assertNotNull("All resources should be registered", index);
+
+ // Verify one final time that re-registration returns the same instance
+ IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+ assertSame("Final re-registration should return the same index instance", index, reRegisteredIndex);
+ }
+ }
+
+ /**
+ * Tests concurrent register, open, and unregister operations.
+ * This test verifies that:
+ * 1. Multiple threads can concurrently perform all lifecycle operations (register, open, unregister)
+ * 2. Re-registration of an already registered index returns the same index instance
+ * 3. Resources can be properly unregistered after being opened and closed
+ * 4. Concurrent lifecycle operations do not interfere with each other
+ */
+ @Test
+ public void testConcurrentRegisterIfAbsentOpenAndUnregister() throws Exception {
+ LOGGER.info("Starting testConcurrentRegisterOpenAndUnregister with {} threads", NUM_THREADS);
+
+ final Map<String, IIndex> registeredInstances = new HashMap<>();
+ final AtomicInteger successCount = new AtomicInteger(0);
+ final AtomicInteger expectedErrorCount = new AtomicInteger(0);
+ final AtomicInteger unexpectedErrorCount = new AtomicInteger(0);
+
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < NUM_THREADS; i++) {
+ futures.add(executorService.submit(() -> {
+ try {
+ final Random random = new Random();
+ for (int j = 0; j < NUM_OPERATIONS_PER_THREAD; j++) {
+ // Pick a random resource path
+ String resourcePath = createdResourcePaths.get(random.nextInt(createdResourcePaths.size()));
+
+ try {
+ // Randomly choose between register, open, or unregister
+ int op = random.nextInt(3);
+ if (op == 0) {
+ // Register index
+ LOGGER.debug("Thread {} registering resource {}", Thread.currentThread().getId(),
+ resourcePath);
+
+ IIndex index = datasetLifecycleManager.get(resourcePath);
+ synchronized (registeredInstances) {
+ registeredInstances.put(resourcePath, index);
+ }
+
+ LOGGER.debug("Thread {} registered resource {}", Thread.currentThread().getId(),
+ resourcePath);
+ successCount.incrementAndGet();
+ } else if (op == 1) {
+ // Open index
+ LOGGER.debug("Thread {} opening resource {}", Thread.currentThread().getId(),
+ resourcePath);
+
+ IIndex index;
+ synchronized (registeredInstances) {
+ index = registeredInstances.get(resourcePath);
+ }
+
+ if (index != null) {
+ try {
+ datasetLifecycleManager.open(resourcePath);
+ LOGGER.debug("Thread {} opened resource {}", Thread.currentThread().getId(),
+ resourcePath);
+ successCount.incrementAndGet();
+ } catch (HyracksDataException e) {
+ if (e.getMessage() != null
+ && e.getMessage().contains("HYR0104: Index does not exist")) {
+ LOGGER.debug("Thread {} failed to open unregistered resource {}: {}",
+ Thread.currentThread().getId(), resourcePath, e.getMessage());
+ expectedErrorCount.incrementAndGet();
+ } else {
+ LOGGER.error("Thread {} failed to open resource {}: {}",
+ Thread.currentThread().getId(), resourcePath, e.getMessage(), e);
+ unexpectedErrorCount.incrementAndGet();
+ }
+ }
+ }
+ } else {
+ // Unregister index
+ LOGGER.debug("Thread {} unregistering resource {}", Thread.currentThread().getId(),
+ resourcePath);
+
+ try {
+ datasetLifecycleManager.unregister(resourcePath);
+ LOGGER.debug("Thread {} unregistered resource {}", Thread.currentThread().getId(),
+ resourcePath);
+ synchronized (registeredInstances) {
+ registeredInstances.remove(resourcePath);
+ }
+ successCount.incrementAndGet();
+ } catch (HyracksDataException e) {
+ if (e.getMessage() != null
+ && (e.getMessage().contains("HYR0104: Index does not exist")
+ || e.getMessage().contains("HYR0105: Cannot drop in-use index"))) {
+ LOGGER.debug("Thread {} failed to unregister resource {}: {}",
+ Thread.currentThread().getId(), resourcePath, e.getMessage());
+ expectedErrorCount.incrementAndGet();
+ } else {
+ LOGGER.error("Thread {} failed to unregister resource {}: {}",
+ Thread.currentThread().getId(), resourcePath, e.getMessage(), e);
+ unexpectedErrorCount.incrementAndGet();
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Thread {} encountered error on resource {}: {}",
+ Thread.currentThread().getId(), resourcePath, e.getMessage(), e);
+ unexpectedErrorCount.incrementAndGet();
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Thread {} encountered unexpected error: {}", Thread.currentThread().getId(),
+ e.getMessage(), e);
+ unexpectedErrorCount.incrementAndGet();
+ }
+ }));
+ }
+
+ // Wait for all threads to complete
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ // Print final stats
+ int remainingRegistered = registeredInstances.size();
+ LOGGER.info(
+ "testConcurrentRegisterOpenAndUnregister completed - Success: {}, Expected Errors: {}, Unexpected Errors: {}, Remaining registered: {}",
+ successCount.get(), expectedErrorCount.get(), unexpectedErrorCount.get(), remainingRegistered);
+
+ // Check results
+ assertEquals("No unexpected errors should occur", 0, unexpectedErrorCount.get());
+ assertTrue("Some operations should succeed", successCount.get() > 0);
+ LOGGER.info("Expected errors occurred: {} - these are normal in concurrent environment",
+ expectedErrorCount.get());
+ }
+
+ /**
+ * Tests behavior when re-registering already registered indexes.
+ * This test verifies that:
+ * 1. Registering an index returns a valid index instance
+ * 2. Re-registering the same index returns the same instance (not a new one)
+ * 3. Concurrent re-registrations and opens work correctly
+ * 4. The identity of index instances is preserved throughout operations
+ */
+ @Test
+ public void testRegisterIfAbsentAlreadyRegisteredIndex() throws Exception {
+ LOGGER.info("Starting testRegisterAlreadyRegisteredIndex");
+
+ // Register all resources first
+ Map<String, IIndex> firstRegistrations = new HashMap<>();
+ for (String resourcePath : createdResourcePaths) {
+ IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+ assertNotNull("First registration should succeed", index);
+ firstRegistrations.put(resourcePath, index);
+
+ // Verify the returned index is the same as our mock index
+ MockIndex mockIndex = mockIndexes.get(resourcePath);
+ assertSame("Register should return our mock index for " + resourcePath, mockIndex, index);
+ }
+
+ // Try to register again - should return the same instances without re-registering
+ for (String resourcePath : createdResourcePaths) {
+ IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+ assertNotNull("Re-registration should succeed", reRegisteredIndex);
+ // Verify we get back the same instance (no re-registration occurred)
+ assertSame("Re-registration should return the same index instance", firstRegistrations.get(resourcePath),
+ reRegisteredIndex);
+
+ // Double check it's still our mock index
+ MockIndex mockIndex = mockIndexes.get(resourcePath);
+ assertSame("Re-register should still return our mock index for " + resourcePath, mockIndex,
+ reRegisteredIndex);
+ }
+
+ // Now try concurrent open and re-register operations
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch completionLatch = new CountDownLatch(NUM_THREADS);
+ final AtomicInteger openSuccesses = new AtomicInteger(0);
+ final ConcurrentHashMap<String, Integer> reRegisterCounts = new ConcurrentHashMap<>();
+
+ for (int i = 0; i < NUM_THREADS; i++) {
+ final int threadId = i;
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+
+ // Even threads try to open, odd threads try to re-register
+ if (threadId % 2 == 0) {
+ String resourcePath = createdResourcePaths.get(threadId % createdResourcePaths.size());
+ datasetLifecycleManager.open(resourcePath);
+ ILSMIndex index = (ILSMIndex) datasetLifecycleManager.get(resourcePath);
+ assertNotNull("Index should not be null after open", index);
+ assertTrue("Index should be activated after open", ((MockIndex) index).isActivated());
+ openSuccesses.incrementAndGet();
+ } else {
+ String resourcePath = createdResourcePaths.get(threadId % createdResourcePaths.size());
+ IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+ assertNotNull("Re-registration should succeed", reRegisteredIndex);
+ // Keep track of re-registrations
+ reRegisterCounts.compute(resourcePath, (k, v) -> v == null ? 1 : v + 1);
+ // Verify it's the same instance as the original registration
+ assertSame("Re-registration should return the same index instance",
+ firstRegistrations.get(resourcePath), reRegisteredIndex);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ // Start all threads
+ startLatch.countDown();
+
+ // Wait for completion
+ completionLatch.await(10, TimeUnit.SECONDS);
+
+ // Verify results
+ assertEquals("Half of threads should succeed in opening", NUM_THREADS / 2, openSuccesses.get());
+
+ // Verify that re-registration attempts occurred for each resource
+ for (String path : createdResourcePaths) {
+ // Some resources should have been re-registered multiple times
+ Integer reRegCount = reRegisterCounts.get(path);
+ if (reRegCount != null) {
+ LOGGER.info("Resource path {} was re-registered {} times", path, reRegCount);
+ }
+ }
+
+ LOGGER.info("Completed testRegisterAlreadyRegisteredIndex: {} successful opens", openSuccesses.get());
+ }
+
+ /**
+ * Tests concurrent opening of the same index by multiple threads.
+ * This test verifies that:
+ * 1. Multiple threads can concurrently open the same index without errors
+ * 2. The index is properly activated after being opened
+ * 3. Concurrent opens do not interfere with each other
+ */
+ @Test
+ public void testConcurrentOpenSameIndex() throws Exception {
+ LOGGER.info("Starting testConcurrentOpenSameIndex");
+
+ final int THREADS = 10;
+ final String RESOURCE_PATH = createdResourcePaths.get(0); // First created resource path
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch completionLatch = new CountDownLatch(THREADS);
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ // Register the index first - REQUIRED before open
+ datasetLifecycleManager.registerIfAbsent(RESOURCE_PATH, null);
+
+ // Create threads that will all open the same index
+ for (int i = 0; i < THREADS; i++) {
+ executorService.submit(() -> {
+ try {
+ startLatch.await();
+ datasetLifecycleManager.open(RESOURCE_PATH);
+ } catch (Exception e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ // Start all threads simultaneously
+ startLatch.countDown();
+
+ // Wait for all threads to complete
+ completionLatch.await(10, TimeUnit.SECONDS);
+
+ assertEquals("No errors should occur when opening the same index concurrently", 0, errors.get());
+
+ // Verify the index is actually open
+ ILSMIndex index = (ILSMIndex) datasetLifecycleManager.get(RESOURCE_PATH);
+ assertNotNull("Index should be retrievable", index);
+ assertTrue("Index should be activated", ((MockIndex) index).isActivated());
+
+ LOGGER.info("Completed testConcurrentOpenSameIndex: index activated={}", ((MockIndex) index).isActivated());
+ }
+
+ /**
+ * Tests concurrent closing of an index after it has been opened.
+ * This test verifies that:
+ * 1. An index can be closed after being opened
+ * 2. Multiple threads can attempt to close the same index
+ * 3. The index is properly deactivated after being closed
+ */
+ @Test
+ public void testConcurrentCloseAfterOpen() throws Exception {
+ LOGGER.info("Starting testConcurrentCloseAfterOpen");
+
+ final String RESOURCE_PATH = createdResourcePaths.get(1); // Second created resource path
+ final CountDownLatch openLatch = new CountDownLatch(1);
+ final CountDownLatch closeLatch = new CountDownLatch(1);
+ final AtomicBoolean openSuccess = new AtomicBoolean(false);
+ final AtomicBoolean closeSuccess = new AtomicBoolean(false);
+
+ // Register the index first
+ datasetLifecycleManager.registerIfAbsent(RESOURCE_PATH, null);
+
+ // Thread to open the index
+ executorService.submit(() -> {
+ try {
+ datasetLifecycleManager.open(RESOURCE_PATH);
+ openSuccess.set(true);
+ openLatch.countDown();
+
+ // Wait a bit to simulate work with the open index
+ Thread.sleep(100);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Open operation failed: " + e.getMessage());
+ }
+ });
+
+ // Thread to close the index after it's opened
+ executorService.submit(() -> {
+ try {
+ openLatch.await(); // Wait for open to complete
+ datasetLifecycleManager.close(RESOURCE_PATH);
+ closeSuccess.set(true);
+ closeLatch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Close operation failed: " + e.getMessage());
+ }
+ });
+
+ // Wait for operations to complete
+ closeLatch.await(5, TimeUnit.SECONDS);
+
+ assertTrue("Open operation should succeed", openSuccess.get());
+ assertTrue("Close operation should succeed", closeSuccess.get());
+
+ LOGGER.info("Completed testConcurrentCloseAfterOpen: openSuccess={}, closeSuccess={}", openSuccess.get(),
+ closeSuccess.get());
+ }
+
+ /**
+ * Tests the identity and uniqueness of mock objects and resources.
+ * This test verifies that:
+ * 1. Each resource path has a unique MockIndex instance
+ * 2. Each LocalResource has a unique ID
+ * 3. The createInstance method returns the correct MockIndex for each resource path
+ * 4. Registration returns the expected MockIndex instance
+ *
+ * This test is crucial for ensuring the test infrastructure itself is correctly set up.
+ */
+ @Test
+ public void testMockIndexIdentity() throws HyracksDataException {
+ LOGGER.info("Starting testMockIndexIdentity");
+
+ // Verify that the mockIndexes map is correctly populated
+ assertTrue("Mock indexes should be created", !mockIndexes.isEmpty());
+ assertEquals("Should have created mocks for all resource paths", createdResourcePaths.size(),
+ mockIndexes.size());
+
+ // Verify each local resource has a unique ID
+ Set<Long> resourceIds = new HashSet<>();
+ for (LocalResource resource : mockResources.values()) {
+ long id = resource.getId();
+ if (!resourceIds.add(id)) {
+ fail("Duplicate resource ID found: " + id + " for path: " + resource.getPath());
+ }
+ }
+
+ // For each resource path, verify the mock setup
+ for (String resourcePath : createdResourcePaths) {
+ // Check if we have a mock index for this path
+ MockIndex expectedMockIndex = mockIndexes.get(resourcePath);
+ assertNotNull("Should have a mock index for " + resourcePath, expectedMockIndex);
+
+ // Get the local resource
+ LocalResource lr = mockResourceRepository.get(resourcePath);
+ assertNotNull("Should have a local resource for " + resourcePath, lr);
+
+ // Print resource ID for verification
+ LOGGER.info("Resource path: {}, ID: {}", resourcePath, lr.getId());
+
+ // Get the dataset resource
+ DatasetLocalResource datasetResource = (DatasetLocalResource) lr.getResource();
+ assertNotNull("Should have a dataset resource for " + resourcePath, datasetResource);
+
+ // Call createInstance and verify we get our mock index
+ IIndex createdIndex = datasetResource.createInstance(mockServiceContext);
+ assertSame("createInstance should return our mock index for " + resourcePath, expectedMockIndex,
+ createdIndex);
+
+ // Now register and verify we get the same mock index
+ IIndex registeredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+ assertSame("Registered index should be our mock index for " + resourcePath, expectedMockIndex,
+ registeredIndex);
+
+ // Confirm the reference equality of these objects
+ LOGGER.info("Path: {}", resourcePath);
+ LOGGER.info(" Expected mock: {}", System.identityHashCode(expectedMockIndex));
+ LOGGER.info(" Created instance: {}", System.identityHashCode(createdIndex));
+ LOGGER.info(" Registered instance: {}", System.identityHashCode(registeredIndex));
+ }
+
+ LOGGER.info("Completed testMockIndexIdentity: verified {} resources", createdResourcePaths.size());
+ }
+
+ /**
+ * Tests that unregistering indexes works correctly, but only after they have been properly registered and opened.
+ * This test verifies that:
+ * 1. Indexes can be registered and opened successfully
+ * 2. All indexes are properly activated after opening
+ * 3. Indexes can be unregistered after being registered and opened
+ * 4. After unregistering, the indexes are no longer retrievable
+ * 5. The correct lifecycle sequence (register → open → unregister) is enforced
+ */
+ @Test
+ public void testUnregisterAfterRegisterIfAbsentAndOpen() throws Exception {
+ LOGGER.info("Starting testUnregisterAfterRegisterAndOpen");
+
+ Map<String, IIndex> registeredIndexes = new HashMap<>();
+ int totalIndexes = createdResourcePaths.size();
+ AtomicInteger successfulUnregisters = new AtomicInteger(0);
+
+ // Step 1: Register and open all indexes sequentially
+ for (String resourcePath : createdResourcePaths) {
+ LOGGER.debug("Registering and opening: {}", resourcePath);
+
+ // Register the index
+ IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+ assertNotNull("Index should be registered successfully", index);
+ registeredIndexes.put(resourcePath, index);
+
+ // After registration, we should have a DatasetResource
+ LocalResource resource = mockResources.get(resourcePath);
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
+ int datasetId = datasetLocalResource.getDatasetId();
+ int partition = datasetLocalResource.getPartition();
+
+ // Check if we have a DatasetResource in our map
+ DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+ if (datasetResource != null) {
+ LOGGER.debug("Found DatasetResource for dataset {}", datasetId);
+
+ // See if there's an operation tracker for this partition already
+ ILSMOperationTracker existingTracker = null;
+ try {
+ // Use reflection to get the datasetPrimaryOpTrackers map
+ Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+ opTrackersField.setAccessible(true);
+ Map<Integer, ILSMOperationTracker> opTrackers =
+ (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
+
+ // Check if we already have a tracker for this partition
+ existingTracker = opTrackers.get(partition);
+ LOGGER.debug("Existing tracker for partition {}: {}", partition, existingTracker);
+ } catch (Exception e) {
+ LOGGER.error("Failed to access opTrackers field via reflection", e);
+ }
+ } else {
+ LOGGER.debug("No DatasetResource found for dataset {}", datasetId);
+ }
+
+ // Open the index
+ datasetLifecycleManager.open(resourcePath);
+
+ // Verify index is activated
+ MockIndex mockIndex = (MockIndex) index;
+ assertTrue("Index should be activated after opening: " + resourcePath, mockIndex.isActivated());
+
+ // Verify it's retrievable
+ long resourceId = resource.getId();
+ IIndex retrievedIndex = datasetLifecycleManager.getIndex(datasetId, resourceId);
+ assertSame("Retrieved index should be the same instance", index, retrievedIndex);
+ }
+
+ LOGGER.info("All {} indexes registered and opened successfully", totalIndexes);
+
+ // Step 2: Close and then unregister each index
+ for (String resourcePath : createdResourcePaths) {
+ LOGGER.debug("Closing and unregistering: {}", resourcePath);
+
+ // Verify the index is activated before closing
+ IIndex index = registeredIndexes.get(resourcePath);
+ MockIndex mockIndex = (MockIndex) index;
+ assertTrue("Index should be activated before closing: " + resourcePath, mockIndex.isActivated());
+
+ // Get resource information before closing
+ LocalResource resource = mockResources.get(resourcePath);
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
+ int datasetId = datasetLocalResource.getDatasetId();
+ int partition = datasetLocalResource.getPartition();
+
+ // Close the index first to ensure reference count goes to 0
+ LOGGER.debug("Closing index: {}", resourcePath);
+ datasetLifecycleManager.close(resourcePath);
+
+ // Brief pause to allow any asynchronous operations to complete
+ Thread.sleep(50);
+
+ // Get the operation tracker and verify it has 0 active operations
+ DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+ ILSMOperationTracker opTracker = mockIndex.getOperationTracker();
+
+ LOGGER.debug("Before unregister: Resource path={}, datasetId={}, partition={}, tracker={}", resourcePath,
+ datasetId, partition, opTracker);
+
+ // Unregister the index after it's closed
+ LOGGER.debug("Unregistering index: {}", resourcePath);
+ try {
+ datasetLifecycleManager.unregister(resourcePath);
+ successfulUnregisters.incrementAndGet();
+ LOGGER.debug("Successfully unregistered index: {}", resourcePath);
+ } catch (Exception e) {
+ LOGGER.error("Failed to unregister index: {}", resourcePath, e);
+ fail("Failed to unregister index: " + resourcePath + ": " + e.getMessage());
+ }
+
+ // Verify the index is no longer retrievable
+ try {
+ LocalResource resourceAfterUnregister = mockResources.get(resourcePath);
+ long resourceIdAfterUnregister = resourceAfterUnregister.getId();
+ IIndex retrievedIndexAfterUnregister =
+ datasetLifecycleManager.getIndex(datasetId, resourceIdAfterUnregister);
+ Assert.assertNull("Index should not be retrievable after unregister: " + resourcePath,
+ retrievedIndexAfterUnregister);
+ } catch (HyracksDataException e) {
+ // This is also an acceptable outcome if getIndex throws an exception for unregistered indexes
+ LOGGER.debug("Expected exception when retrieving unregistered index: {}", e.getMessage());
+ }
+ }
+
+ assertEquals("All indexes should be unregistered", totalIndexes, successfulUnregisters.get());
+ LOGGER.info("Completed testUnregisterAfterRegisterAndOpen: successfully unregistered {} indexes",
+ successfulUnregisters.get());
+ }
+
+ /**
+ * Tests that only one primary operation tracker is created per partition per dataset.
+ * This test verifies that:
+ * 1. When multiple indexes for the same dataset and partition are registered, they share the same operation tracker
+ * 2. Different partitions of the same dataset have different operation trackers
+ * 3. Different datasets have completely separate operation trackers
+ */
+ @Test
+ public void testPrimaryOperationTrackerSingularity() throws Exception {
+ LOGGER.info("Starting testPrimaryOperationTrackerSingularity");
+
+ // Maps to track operation trackers by dataset and partition
+ Map<Integer, Map<Integer, Object>> datasetToPartitionToTracker = new HashMap<>();
+ Map<String, IIndex> registeredIndexes = new HashMap<>();
+
+ // Step 1: Register all indexes first
+ for (String resourcePath : createdResourcePaths) {
+ LOGGER.debug("Registering index: {}", resourcePath);
+
+ // Register the index
+ IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, null);
+ assertNotNull("Index should be registered successfully", index);
+ registeredIndexes.put(resourcePath, index);
+
+ // Get dataset and partition information
+ LocalResource resource = mockResources.get(resourcePath);
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
+ int datasetId = datasetLocalResource.getDatasetId();
+ int partition = datasetLocalResource.getPartition();
+
+ // Check if we have a DatasetResource in our map
+ DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+ assertNotNull("DatasetResource should be created for dataset " + datasetId, datasetResource);
+
+ // At this point, operation tracker might not exist yet since it's created lazily during open()
+ try {
+ // Use reflection to get the datasetPrimaryOpTrackers map
+ Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+ opTrackersField.setAccessible(true);
+ Map<Integer, ILSMOperationTracker> opTrackers =
+ (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
+
+ // Check if tracker already exists (might be null)
+ Object existingTracker = opTrackers.get(partition);
+ LOGGER.debug("Before open(): Tracker for dataset {} partition {}: {}", datasetId, partition,
+ existingTracker);
+ } catch (Exception e) {
+ LOGGER.error("Failed to access opTrackers field via reflection", e);
+ }
+ }
+
+ // Step 2: Open all indexes to trigger operation tracker creation
+ for (String resourcePath : createdResourcePaths) {
+ LOGGER.debug("Opening index: {}", resourcePath);
+
+ // Open the index to trigger operation tracker creation
+ datasetLifecycleManager.open(resourcePath);
+
+ // Get dataset and partition information
+ LocalResource resource = mockResources.get(resourcePath);
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
+ int datasetId = datasetLocalResource.getDatasetId();
+ int partition = datasetLocalResource.getPartition();
+
+ // Now the operation tracker should exist
+ DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+ Object opTracker = null;
+ try {
+ // Use reflection to get the datasetPrimaryOpTrackers map
+ Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+ opTrackersField.setAccessible(true);
+ Map<Integer, ILSMOperationTracker> opTrackers =
+ (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
+
+ // Get the tracker for this partition
+ opTracker = opTrackers.get(partition);
+ assertNotNull("Operation tracker should exist after open() for partition " + partition, opTracker);
+
+ LOGGER.debug("After open(): Found tracker for dataset {} partition {}: {}", datasetId, partition,
+ System.identityHashCode(opTracker));
+ } catch (Exception e) {
+ LOGGER.error("Failed to access opTrackers field via reflection", e);
+ fail("Failed to access operation trackers: " + e.getMessage());
+ }
+
+ // Store the tracker in our map for later comparison
+ datasetToPartitionToTracker.computeIfAbsent(datasetId, k -> new HashMap<>()).putIfAbsent(partition,
+ opTracker);
+ }
+
+ // Step 3: Create and register secondary indexes for each dataset/partition
+ List<String> secondaryPaths = new ArrayList<>();
+ for (String resourcePath : createdResourcePaths) {
+ // Create a "secondary index" path by appending "-secondary" to the resource path
+ String secondaryPath = resourcePath + "-secondary";
+ secondaryPaths.add(secondaryPath);
+
+ // Create a mock resource for this secondary index
+ LocalResource originalResource = mockResources.get(resourcePath);
+ DatasetLocalResource originalDatasetLocalResource = (DatasetLocalResource) originalResource.getResource();
+ int datasetId = originalDatasetLocalResource.getDatasetId();
+ int partition = originalDatasetLocalResource.getPartition();
+
+ // Set up a new mock resource with the same dataset and partition but a different path
+ setupMockResource(secondaryPath, datasetId, partition, originalResource.getId() + 10000);
+
+ LOGGER.debug("Registering secondary index: {}", secondaryPath);
+
+ // Register the secondary index
+ IIndex secondaryIndex = datasetLifecycleManager.registerIfAbsent(secondaryPath, null);
+ assertNotNull("Secondary index should be registered successfully", secondaryIndex);
+ registeredIndexes.put(secondaryPath, secondaryIndex);
+ }
+
+ // Step 4: Open the secondary indexes to trigger operation tracker reuse
+ for (String secondaryPath : secondaryPaths) {
+ LOGGER.debug("Opening secondary index: {}", secondaryPath);
+
+ // Open the secondary index
+ datasetLifecycleManager.open(secondaryPath);
+
+ // Get dataset and partition information
+ LocalResource resource = mockResources.get(secondaryPath);
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
+ int datasetId = datasetLocalResource.getDatasetId();
+ int partition = datasetLocalResource.getPartition();
+
+ // Get the operation tracker after opening
+ DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+ Object secondaryOpTracker = null;
+ try {
+ // Use reflection to get the datasetPrimaryOpTrackers map
+ Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+ opTrackersField.setAccessible(true);
+ Map<Integer, ILSMOperationTracker> opTrackers =
+ (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
+
+ // Get the tracker for this partition
+ secondaryOpTracker = opTrackers.get(partition);
+ assertNotNull("Operation tracker should exist for secondary index partition " + partition,
+ secondaryOpTracker);
+
+ LOGGER.debug("After opening secondary: Found tracker for dataset {} partition {}: {}", datasetId,
+ partition, System.identityHashCode(secondaryOpTracker));
+ } catch (Exception e) {
+ LOGGER.error("Failed to access opTrackers field via reflection", e);
+ fail("Failed to access operation trackers: " + e.getMessage());
+ }
+
+ // Verify this is the same tracker as the one used by the primary index
+ Object originalTracker = datasetToPartitionToTracker.get(datasetId).get(partition);
+ assertSame("Operation tracker should be reused for the same dataset/partition", originalTracker,
+ secondaryOpTracker);
+ }
+
+ // Step 5: Verify that different partitions of the same dataset have different operation trackers
+ for (Map.Entry<Integer, Map<Integer, Object>> datasetEntry : datasetToPartitionToTracker.entrySet()) {
+ int datasetId = datasetEntry.getKey();
+ Map<Integer, Object> partitionTrackers = datasetEntry.getValue();
+
+ if (partitionTrackers.size() > 1) {
+ LOGGER.debug("Verifying different trackers for different partitions in dataset {}", datasetId);
+
+ // Collect all the tracker instances for this dataset
+ Set<Object> uniqueTrackers = new HashSet<>(partitionTrackers.values());
+
+ // The number of unique trackers should equal the number of partitions
+ assertEquals("Each partition should have its own operation tracker", partitionTrackers.size(),
+ uniqueTrackers.size());
+ }
+ }
+
+ // Step 6: Verify that different datasets have different operation trackers for the same partition
+ if (datasetToPartitionToTracker.size() > 1) {
+ for (int partition = 0; partition < NUM_PARTITIONS; partition++) {
+ final int partitionId = partition;
+
+ // Collect all trackers for this partition across different datasets
+ List<Object> trackersForPartition = datasetToPartitionToTracker.entrySet().stream()
+ .filter(entry -> entry.getValue().containsKey(partitionId))
+ .map(entry -> entry.getValue().get(partitionId)).collect(Collectors.toList());
+
+ if (trackersForPartition.size() > 1) {
+ LOGGER.debug("Verifying different trackers across datasets for partition {}", partitionId);
+
+ // All trackers should be unique (no sharing between datasets)
+ Set<Object> uniqueTrackers = new HashSet<>(trackersForPartition);
+ assertEquals("Each dataset should have its own operation tracker for partition " + partitionId,
+ trackersForPartition.size(), uniqueTrackers.size());
+ }
+ }
+ }
+
+ // Step 7: Clean up - close all indexes
+ for (String path : new ArrayList<>(registeredIndexes.keySet())) {
+ try {
+ datasetLifecycleManager.close(path);
+ } catch (Exception e) {
+ LOGGER.error("Error closing index {}", path, e);
+ }
+ }
+
+ LOGGER.info(
+ "Completed testPrimaryOperationTrackerSingularity: verified unique trackers for each dataset/partition combination");
+ }
+
+ /**
+ * Tests that resources cannot be opened after unregistering unless re-registered first.
+ * This test verifies:
+ * 1. Resources must be registered before they can be opened
+ * 2. After unregistering, resources cannot be opened until re-registered
+ * 3. Re-registration of an unregistered resource works correctly
+ */
+ @Test
+ public void testCannotOpenUnregisteredResource() throws Exception {
+ LOGGER.info("Starting testCannotOpenUnregisteredResource");
+
+ String resourcePath = createdResourcePaths.get(0);
+
+ // Register resource first
+ IIndex index = registerIfAbsentIndex(resourcePath);
+ assertNotNull("Index should be registered successfully", index);
+ LOGGER.debug("Registered resource {}", resourcePath);
+
+ // Unregister it
+ datasetLifecycleManager.unregister(resourcePath);
+ LOGGER.debug("Unregistered resource {}", resourcePath);
+
+ // Now try to open it - should fail
+ try {
+ datasetLifecycleManager.open(resourcePath);
+ fail("Should not be able to open an unregistered resource");
+ } catch (HyracksDataException e) {
+ LOGGER.debug("Caught expected exception: {}", e.getMessage());
+ assertTrue("Exception should mention index not existing", e.getMessage().contains("does not exist"));
+ }
+
+ // Re-register should work
+ MockIndex mockIndex = mockIndexes.get(resourcePath);
+ IIndex reRegisteredIndex = datasetLifecycleManager.registerIfAbsent(resourcePath, mockIndex);
+ assertNotNull("Index should be re-registered successfully", reRegisteredIndex);
+ LOGGER.debug("Re-registered resource {}", resourcePath);
+
+ // Now open should work
+ datasetLifecycleManager.open(resourcePath);
+ LOGGER.debug("Successfully opened resource after re-registering");
+
+ LOGGER.info("Completed testCannotOpenUnregisteredResource");
+ }
+
+ /**
+ * Tests the full lifecycle of operation trackers to ensure they're properly created,
+ * maintained throughout the index lifecycle, and cleaned up at the appropriate time.
+ * This test verifies that:
+ * 1. Operation trackers are created when indexes are opened for the first time
+ * 2. Operation trackers remain stable during index usage
+ * 3. Operation trackers are properly shared among indexes of the same dataset/partition
+ * 4. Operation trackers are not prematurely nullified or cleared
+ * 5. Operation trackers are correctly removed when all indexes of a dataset/partition are unregistered
+ */
+ @Test
+ public void testOperationTrackerLifecycle() throws Exception {
+ // Register primary index
+ String dsName = "ds0";
+ int dsId = 101; // Updated from 0 to 101 to match our new datasetId scheme
+ int partition = 0;
+ String primaryIndexName = dsName;
+ String secondaryIndexName = dsName + "_idx";
+ String primaryResourcePath = getResourcePath(partition, dsName, 0, primaryIndexName);
+ String secondaryResourcePath = getResourcePath(partition, dsName, 1, secondaryIndexName);
+
+ LOGGER.debug("Registering primary index at {}", primaryResourcePath);
+ MockIndex mockPrimaryIndex = mockIndexes.get(primaryResourcePath);
+ IIndex registeredIndex = datasetLifecycleManager.registerIfAbsent(primaryResourcePath, mockPrimaryIndex);
+ assertNotNull("Index should be registered successfully", registeredIndex);
+
+ // Get the dataset resource and check if it has been created
+ DatasetResource datasetResource = datasetResourceMap.get(dsId);
+ assertNotNull("Dataset resource should be created during registration", datasetResource);
+
+ // Verify there's no operation tracker before opening
+ ILSMOperationTracker trackerBeforeOpen = getOperationTracker(datasetResource, partition);
+ LOGGER.debug("Operation tracker before open: {}", trackerBeforeOpen);
+
+ // Open primary index which should create and register an operation tracker
+ LOGGER.debug("Opening primary index at {}", primaryResourcePath);
+ datasetLifecycleManager.open(primaryResourcePath);
+
+ // Verify operation tracker exists after opening
+ ILSMOperationTracker trackerAfterOpen = getOperationTracker(datasetResource, partition);
+ LOGGER.debug("Operation tracker after primary open: {}", trackerAfterOpen);
+ assertNotNull("Operation tracker should be created after opening primary index", trackerAfterOpen);
+
+ // Verify the tracker is from our mock index
+ MockIndex mockIndex = mockIndexes.get(primaryResourcePath);
+ assertTrue("Mock index should be activated after open", mockIndex.isActivated());
+
+ // Compare with the mock tracker from the index
+ ILSMOperationTracker mockTracker = mockIndex.getOperationTracker();
+ LOGGER.debug("Mock tracker from index: {}", mockTracker);
+ assertEquals("Operation tracker should be the one from the mock index", mockTracker, trackerAfterOpen);
+
+ // Open secondary index which should reuse the same operation tracker
+ LOGGER.debug("Registering secondary index at {}", secondaryResourcePath);
+ MockIndex mockSecondaryIndex = mockIndexes.get(secondaryResourcePath);
+ datasetLifecycleManager.registerIfAbsent(secondaryResourcePath, mockSecondaryIndex);
+
+ LOGGER.debug("Opening secondary index at {}", secondaryResourcePath);
+ datasetLifecycleManager.open(secondaryResourcePath);
+
+ // Verify operation tracker is still the same after opening secondary
+ ILSMOperationTracker trackerAfterSecondaryOpen = getOperationTracker(datasetResource, partition);
+ LOGGER.debug("Operation tracker after secondary open: {}", trackerAfterSecondaryOpen);
+ assertNotNull("Operation tracker should still exist after opening secondary index", trackerAfterSecondaryOpen);
+ assertEquals("Should still be the same operation tracker", trackerAfterOpen, trackerAfterSecondaryOpen);
+
+ // Close primary index - should keep the tracker as secondary is still open
+ LOGGER.debug("Closing primary index at {}", primaryResourcePath);
+ datasetLifecycleManager.close(primaryResourcePath);
+
+ // Verify operation tracker still exists
+ ILSMOperationTracker trackerAfterPrimaryClose = getOperationTracker(datasetResource, partition);
+ LOGGER.debug("Operation tracker after primary close: {}", trackerAfterPrimaryClose);
+ assertNotNull("Operation tracker should still exist after closing primary index", trackerAfterPrimaryClose);
+ assertEquals("Should still be the same operation tracker", trackerAfterOpen, trackerAfterPrimaryClose);
+
+ // Close secondary index - should remove the tracker as all indexes are closed
+ LOGGER.debug("Closing secondary index at {}", secondaryResourcePath);
+ datasetLifecycleManager.close(secondaryResourcePath);
+
+ // Verify operation tracker is removed
+ ILSMOperationTracker trackerAfterSecondaryClose = getOperationTracker(datasetResource, partition);
+ LOGGER.debug("Operation tracker after secondary close: {}", trackerAfterSecondaryClose);
+
+ // Unregister indexes
+ LOGGER.debug("Unregistering primary index at {}", primaryResourcePath);
+ datasetLifecycleManager.unregister(primaryResourcePath);
+
+ LOGGER.debug("Unregistering secondary index at {}", secondaryResourcePath);
+ datasetLifecycleManager.unregister(secondaryResourcePath);
+
+ // Verify dataset resource is removed
+ DatasetResource datasetResourceAfterUnregister = datasetResourceMap.get(dsId);
+ LOGGER.debug("Dataset resource after unregister: {}", datasetResourceAfterUnregister);
+ }
+
+ /**
+ * Helper method to get the operation tracker for a dataset/partition.
+ * Uses reflection to access the private datasetPrimaryOpTrackers map in DatasetResource.
+ */
+ private ILSMOperationTracker getOperationTracker(DatasetResource datasetResource, int partition) {
+ try {
+ // Access the datasetPrimaryOpTrackers map through reflection
+ Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+ opTrackersField.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ Map<Integer, ILSMOperationTracker> opTrackers =
+ (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
+
+ // Return the operation tracker for this partition, if it exists
+ if (opTrackers != null) {
+ ILSMOperationTracker tracker = opTrackers.get(partition);
+ if (tracker != null) {
+ // Make sure we're getting a PrimaryIndexOperationTracker
+ if (!(tracker instanceof PrimaryIndexOperationTracker)) {
+ LOGGER.warn("Tracker is not a PrimaryIndexOperationTracker: {}", tracker.getClass().getName());
+ }
+ }
+ return tracker;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to get operation tracker via reflection", e);
+ }
+ return null;
+ }
+
+ /**
+ * Tests that operation trackers are properly attached to indexes and survive open and close operations.
+ * This test verifies:
+ * 1. Operation trackers are properly created and associated with indexes
+ * 2. Operation trackers remain valid during the index lifecycle
+ * 3. Multiple opens and closes don't invalidate the operation tracker
+ */
+ @Test
+ public void testOperationTrackerAttachment() throws Exception {
+ LOGGER.info("Starting testOperationTrackerAttachment");
+
+ // Choose a specific resource path to test with
+ String resourcePath = createdResourcePaths.get(0);
+
+ // Get dataset and partition information for this resource
+ LocalResource resource = mockResources.get(resourcePath);
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
+ int datasetId = datasetLocalResource.getDatasetId();
+ int partition = datasetLocalResource.getPartition();
+
+ // Register the resource first using our helper
+ IIndex index = registerIfAbsentIndex(resourcePath);
+ assertNotNull("Index should be registered successfully", index);
+ LOGGER.debug("Registered resource {}", resourcePath);
+
+ // Get the dataset resource
+ DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+ assertNotNull("DatasetResource should exist after registration", datasetResource);
+
+ // Verify no operation tracker exists before opening
+ ILSMOperationTracker trackerBeforeOpen = getOperationTracker(datasetResource, partition);
+ LOGGER.debug("Operation tracker before open: {}", trackerBeforeOpen);
+
+ // Open the index - this should create and attach an operation tracker
+ datasetLifecycleManager.open(resourcePath);
+ LOGGER.debug("Opened resource {}", resourcePath);
+
+ // Verify the operation tracker exists and is attached to the MockIndex
+ ILSMOperationTracker trackerAfterOpen = getOperationTracker(datasetResource, partition);
+ assertNotNull("Operation tracker should exist after open", trackerAfterOpen);
+ LOGGER.debug("Operation tracker after open: {}", trackerAfterOpen);
+
+ // Get the mock index and verify its tracker matches
+ MockIndex mockIndex = mockIndexes.get(resourcePath);
+ assertTrue("Mock index should be activated", mockIndex.isActivated());
+
+ // Verify the mock index has the same operation tracker
+ ILSMOperationTracker indexTracker = mockIndex.getOperationTracker();
+ assertSame("Mock index should have the same operation tracker", trackerAfterOpen, indexTracker);
+
+ // Close and unregister for cleanup
+ datasetLifecycleManager.close(resourcePath);
+ datasetLifecycleManager.unregister(resourcePath);
+
+ LOGGER.info("Completed testOperationTrackerAttachment");
+ }
+
+ /**
+ * A mock implementation of ILSMIndex for testing
+ */
+ private static class MockIndex implements ILSMIndex {
+ private final String resourcePath;
+ private final int datasetId;
+ private boolean activated = false;
+ // Create a mock of the specific PrimaryIndexOperationTracker class instead of the generic interface
+ private PrimaryIndexOperationTracker mockTracker = mock(PrimaryIndexOperationTracker.class);
+ private final Map<Integer, DatasetResource> datasetResourceMap;
+
+ public MockIndex(String resourcePath, int datasetId, Map<Integer, DatasetResource> datasetResourceMap) {
+ this.resourcePath = resourcePath;
+ this.datasetId = datasetId;
+ this.datasetResourceMap = datasetResourceMap;
+ }
+
+ @Override
+ public void create() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void activate() {
+ LOGGER.debug("Activating {}", this);
+ activated = true;
+ }
+
+ @Override
+ public void clear() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void deactivate() throws HyracksDataException {
+ LOGGER.debug("Deactivating {}", this);
+ activated = false;
+ }
+
+ @Override
+ public void destroy() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void purge() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void deactivate(boolean flushOnExit) {
+ LOGGER.debug("Deactivating {} with flushOnExit={}", this, flushOnExit);
+ activated = false;
+ }
+
+ @Override
+ public ILSMIndexAccessor createAccessor(IIndexAccessParameters iap) throws HyracksDataException {
+ return null;
+ }
+
+ @Override
+ public void validate() throws HyracksDataException {
+
+ }
+
+ @Override
+ public IBufferCache getBufferCache() {
+ return null;
+ }
+
+ @Override
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, IPageWriteCallback callback) throws HyracksDataException {
+ return null;
+ }
+
+ @Override
+ public int getNumOfFilterFields() {
+ return 0;
+ }
+
+ public boolean isActivated() {
+ return activated;
+ }
+
+ @Override
+ public String toString() {
+ // Ensure this matches exactly the resourcePath used for registration
+ return resourcePath;
+ }
+
+ @Override
+ public boolean isCurrentMutableComponentEmpty() {
+ return true;
+ }
+
+ @Override
+ public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> diskComponents,
+ IReplicationJob.ReplicationOperation operation, LSMOperationType opType) throws HyracksDataException {
+
+ }
+
+ @Override
+ public boolean isMemoryComponentsAllocated() {
+ return false;
+ }
+
+ @Override
+ public void allocateMemoryComponents() throws HyracksDataException {
+
+ }
+
+ @Override
+ public ILSMMemoryComponent getCurrentMemoryComponent() {
+ return null;
+ }
+
+ @Override
+ public int getCurrentMemoryComponentIndex() {
+ return 0;
+ }
+
+ @Override
+ public List<ILSMMemoryComponent> getMemoryComponents() {
+ return List.of();
+ }
+
+ @Override
+ public boolean isDurable() {
+ return false;
+ }
+
+ @Override
+ public void updateFilter(ILSMIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException {
+
+ }
+
+ @Override
+ public ILSMDiskComponent createBulkLoadTarget() throws HyracksDataException {
+ return null;
+ }
+
+ @Override
+ public int getNumberOfAllMemoryComponents() {
+ return 0;
+ }
+
+ @Override
+ public ILSMHarness getHarness() {
+ return null;
+ }
+
+ @Override
+ public String getIndexIdentifier() {
+ return "";
+ }
+
+ @Override
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, Map<String, Object> parameters) throws HyracksDataException {
+ return null;
+ }
+
+ @Override
+ public void resetCurrentComponentIndex() {
+
+ }
+
+ @Override
+ public IIndexDiskCacheManager getDiskCacheManager() {
+ return null;
+ }
+
+ @Override
+ public void scheduleCleanup(List<ILSMDiskComponent> inactiveDiskComponents) throws HyracksDataException {
+
+ }
+
+ @Override
+ public boolean isAtomic() {
+ return false;
+ }
+
+ @Override
+ public void commit() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+
+ }
+
+ @Override
+ public ILSMMergePolicy getMergePolicy() {
+ return null;
+ }
+
+ @Override
+ public ILSMOperationTracker getOperationTracker() {
+ // This is the key method that DatasetLifecycleManager calls during open()
+ // to get the operation tracker and register it
+
+ LOGGER.debug("getOperationTracker() called for index: {}", resourcePath);
+
+ // Get dataset ID and partition from the resource path
+ int partition = -1;
+
+ // Extract partition from resource path (storage/partition_X/...)
+ String[] parts = resourcePath.split("/");
+ for (int i = 0; i < parts.length; i++) {
+ if (parts[i].startsWith("partition_")) {
+ try {
+ partition = Integer.parseInt(parts[i].substring("partition_".length()));
+ break;
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Failed to parse partition number from {}", parts[i]);
+ }
+ }
+ }
+
+ if (partition != -1) {
+ // Find the dataset resource from our map
+ DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+ if (datasetResource != null) {
+ try {
+ // Access the datasetPrimaryOpTrackers map through reflection
+ Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+ opTrackersField.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ Map<Integer, ILSMOperationTracker> opTrackers =
+ (Map<Integer, ILSMOperationTracker>) opTrackersField.get(datasetResource);
+
+ // Manual registration - simulate what DatasetLifecycleManager would do
+ if (opTrackers != null && !opTrackers.containsKey(partition)) {
+ LOGGER.debug(
+ "Directly adding tracker to datasetPrimaryOpTrackers for dataset {}, partition {}: {}",
+ datasetId, partition, mockTracker);
+ opTrackers.put(partition, mockTracker);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to access datasetPrimaryOpTrackers via reflection", e);
+ }
+ }
+ }
+
+ return mockTracker;
+ }
+
+ @Override
+ public ILSMIOOperationCallback getIOOperationCallback() {
+ return null;
+ }
+
+ @Override
+ public List<ILSMDiskComponent> getDiskComponents() {
+ return List.of();
+ }
+
+ @Override
+ public boolean isPrimaryIndex() {
+ return false;
+ }
+
+ @Override
+ public void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
+ throws HyracksDataException {
+
+ }
+
+ @Override
+ public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException {
+
+ }
+
+ @Override
+ public ILSMIOOperation createFlushOperation(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ return null;
+ }
+
+ @Override
+ public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException {
+ return null;
+ }
+
+ @Override
+ public ILSMIOOperation createMergeOperation(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ return null;
+ }
+
+ @Override
+ public ILSMDiskComponent merge(ILSMIOOperation operation) throws HyracksDataException {
+ return null;
+ }
+
+ @Override
+ public void addDiskComponent(ILSMDiskComponent index) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void addBulkLoadedDiskComponent(ILSMDiskComponent c) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void subsumeMergedComponents(ILSMDiskComponent newComponent, List<ILSMComponent> mergedComponents)
+ throws HyracksDataException {
+
+ }
+
+ @Override
+ public void changeMutableComponent() {
+
+ }
+
+ @Override
+ public void changeFlushStatusForCurrentMutableCompoent(boolean needsFlush) {
+
+ }
+
+ @Override
+ public boolean hasFlushRequestForCurrentMutableComponent() {
+ return false;
+ }
+
+ @Override
+ public void getOperationalComponents(ILSMIndexOperationContext ctx) throws HyracksDataException {
+
+ }
+
+ @Override
+ public List<ILSMDiskComponent> getInactiveDiskComponents() {
+ return List.of();
+ }
+
+ @Override
+ public void addInactiveDiskComponent(ILSMDiskComponent diskComponent) {
+
+ }
+
+ @Override
+ public List<ILSMMemoryComponent> getInactiveMemoryComponents() {
+ return List.of();
+ }
+
+ @Override
+ public void addInactiveMemoryComponent(ILSMMemoryComponent memoryComponent) {
+
+ }
+
+ // Additional method implementations required by the interface
+ // would be implemented here with minimal functionality
+
+ /**
+ * Sets the operation tracker for this mock index.
+ * Added to support DatasetLifecycleManagerLazyRecoveryTest use cases.
+ */
+ public void setOperationTracker(PrimaryIndexOperationTracker tracker) {
+ this.mockTracker = tracker;
+ }
+ }
+
+ /**
+ * Enhanced version of the MockIndex implementation that provides better
+ * tracking of operation tracker state changes.
+ */
+ private static class EnhancedMockIndex extends MockIndex {
+ private PrimaryIndexOperationTracker assignedTracker;
+ private final List<String> trackerEvents = new ArrayList<>();
+
+ public EnhancedMockIndex(String resourcePath, int datasetId, Map<Integer, DatasetResource> datasetResourceMap) {
+ super(resourcePath, datasetId, datasetResourceMap);
+ }
+
+ @Override
+ public ILSMOperationTracker getOperationTracker() {
+ PrimaryIndexOperationTracker tracker = (PrimaryIndexOperationTracker) super.getOperationTracker();
+
+ if (assignedTracker == null) {
+ assignedTracker = tracker;
+ trackerEvents.add("Initial tracker assignment: " + tracker);
+ } else if (assignedTracker != tracker) {
+ trackerEvents.add("Tracker changed from " + assignedTracker + " to " + tracker);
+ assignedTracker = tracker;
+ }
+
+ return tracker;
+ }
+
+ public List<String> getTrackerEvents() {
+ return trackerEvents;
+ }
+ }
+
+ /**
+ * Tests specific race conditions that could lead to null operation trackers during unregistration.
+ * This test verifies:
+ * 1. Concurrent registration, opening, and unregistration don't create null operation trackers
+ * 2. The operation tracker remains valid throughout concurrent operations
+ * 3. Operation tracker cleanup occurs only when appropriate
+ */
+ @Test
+ public void testRaceConditionsWithOperationTracker() throws Exception {
+ LOGGER.info("Starting testRaceConditionsWithOperationTracker");
+
+ // Create a resource path for testing
+ String resourcePath = createdResourcePaths.get(0);
+
+ // Get dataset and partition information for this resource
+ LocalResource resource = mockResources.get(resourcePath);
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resource.getResource();
+ int datasetId = datasetLocalResource.getDatasetId();
+ int partition = datasetLocalResource.getPartition();
+
+ // Setup an enhanced mock index for this test to track operation tracker events
+ EnhancedMockIndex enhancedMockIndex = new EnhancedMockIndex(resourcePath, datasetId, datasetResourceMap);
+ mockIndexes.put(resourcePath, enhancedMockIndex);
+
+ // Register the resource to make it available
+ IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, enhancedMockIndex);
+ assertNotNull("Index should be registered successfully", index);
+ assertSame("Should get our enhanced mock index", enhancedMockIndex, index);
+
+ // Create multiple threads that will perform random operations on the same resource
+ final int NUM_CONCURRENT_THREADS = 10;
+ final int OPERATIONS_PER_THREAD = 20;
+ final CyclicBarrier startBarrier = new CyclicBarrier(NUM_CONCURRENT_THREADS);
+ final CountDownLatch completionLatch = new CountDownLatch(NUM_CONCURRENT_THREADS);
+
+ for (int i = 0; i < NUM_CONCURRENT_THREADS; i++) {
+ final int threadId = i;
+ executorService.submit(() -> {
+ try {
+ startBarrier.await();
+ Random random = new Random();
+
+ for (int op = 0; op < OPERATIONS_PER_THREAD; op++) {
+ // Perform a random operation:
+ // 0 = check index existence
+ // 1 = open
+ // 2 = close
+ // 3 = register
+ // 4 = check operation tracker
+ int operation = random.nextInt(5);
+
+ switch (operation) {
+ case 0:
+ // Check if index exists
+ IIndex existingIndex = datasetLifecycleManager.get(resourcePath);
+ LOGGER.debug("Thread {} checked index existence: {}", threadId, existingIndex != null);
+ break;
+ case 1:
+ // Open the index
+ try {
+ datasetLifecycleManager.open(resourcePath);
+ LOGGER.debug("Thread {} opened index", threadId);
+ } catch (Exception e) {
+ // This is expected occasionally since the resource might be unregistered
+ LOGGER.debug("Thread {} failed to open index: {}", threadId, e.getMessage());
+ }
+ break;
+ case 2:
+ // Close the index
+ try {
+ datasetLifecycleManager.close(resourcePath);
+ LOGGER.debug("Thread {} closed index", threadId);
+ } catch (Exception e) {
+ // This is expected occasionally
+ LOGGER.debug("Thread {} failed to close index: {}", threadId, e.getMessage());
+ }
+ break;
+ case 3:
+ // Register or re-register the index
+ try {
+ IIndex reregisteredIndex =
+ datasetLifecycleManager.registerIfAbsent(resourcePath, enhancedMockIndex);
+ LOGGER.debug("Thread {} registered index: {}", threadId, reregisteredIndex != null);
+ } catch (Exception e) {
+ LOGGER.debug("Thread {} failed to register index: {}", threadId, e.getMessage());
+ }
+ break;
+ case 4:
+ // Check operation tracker
+ try {
+ // Get the dataset resource
+ DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+ if (datasetResource != null) {
+ // Get and verify the operation tracker
+ ILSMOperationTracker tracker = getOperationTracker(datasetResource, partition);
+ if (tracker instanceof PrimaryIndexOperationTracker) {
+ LOGGER.debug("Thread {} found valid PrimaryIndexOperationTracker: {}",
+ threadId, tracker);
+ } else if (tracker != null) {
+ LOGGER.warn("Thread {} found non-PrimaryIndexOperationTracker: {}",
+ threadId, tracker.getClass().getName());
+ } else {
+ LOGGER.debug("Thread {} found no operation tracker", threadId);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.debug("Thread {} error checking operation tracker: {}", threadId,
+ e.getMessage());
+ }
+ break;
+ }
+
+ // Small delay to increase chance of race conditions
+ Thread.sleep(random.nextInt(5));
+ }
+
+ LOGGER.debug("Thread {} completed all operations", threadId);
+ completionLatch.countDown();
+ } catch (Exception e) {
+ LOGGER.error("Thread {} error during test: {}", threadId, e.getMessage(), e);
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ // Wait for all threads to complete
+ boolean allCompleted = completionLatch.await(30, TimeUnit.SECONDS);
+ if (!allCompleted) {
+ LOGGER.warn("Not all threads completed in time");
+ }
+
+ // Get the tracker events to verify behavior
+ List<String> trackerEvents = enhancedMockIndex.getTrackerEvents();
+ LOGGER.debug("Tracker events recorded: {}", trackerEvents.size());
+ for (String event : trackerEvents) {
+ LOGGER.debug("Tracker event: {}", event);
+ }
+
+ // Verify the index is still registered at the end
+ IIndex finalIndex = datasetLifecycleManager.get(resourcePath);
+ if (finalIndex != null) {
+ // Get the final operation tracker state if the index is still registered
+ DatasetResource datasetResource = datasetResourceMap.get(datasetId);
+ if (datasetResource != null) {
+ ILSMOperationTracker finalTracker = getOperationTracker(datasetResource, partition);
+ LOGGER.debug("Final operation tracker state: {}", finalTracker);
+
+ // If there's a valid tracker, it should be a PrimaryIndexOperationTracker
+ if (finalTracker != null) {
+ assertTrue("Final tracker should be a PrimaryIndexOperationTracker",
+ finalTracker instanceof PrimaryIndexOperationTracker);
+ }
+ }
+ }
+
+ LOGGER.info("Completed testRaceConditionsWithOperationTracker");
+ }
+
+ // Helper method to ensure consistent register approach
+ public IIndex registerIfAbsentIndex(String resourcePath) throws HyracksDataException {
+ MockIndex mockIndex = mockIndexes.get(resourcePath);
+ IIndex index = datasetLifecycleManager.registerIfAbsent(resourcePath, mockIndex);
+ LOGGER.debug("Registered {} with mock index {}", resourcePath, mockIndex);
+ return index;
+ }
+
+ @Test
+ public void testBalancedOpenCloseUnregister() throws Exception {
+ LOGGER.info("Starting testBalancedOpenCloseUnregister");
+
+ // Select a resource to test with
+ String resourcePath = createdResourcePaths.get(0);
+
+ // Register the index
+ IIndex index = registerIfAbsentIndex(resourcePath);
+ assertNotNull("Index should be registered successfully", index);
+ LOGGER.info("Registered index: {}", resourcePath);
+
+ // Number of times to open/close
+ final int numOperations = 5;
+
+ // Open the index multiple times
+ for (int i = 0; i < numOperations; i++) {
+ datasetLifecycleManager.open(resourcePath);
+ LOGGER.info("Opened index {} (#{}/{})", resourcePath, i + 1, numOperations);
+ }
+
+ // Close the index the same number of times
+ for (int i = 0; i < numOperations; i++) {
+ datasetLifecycleManager.close(resourcePath);
+ LOGGER.info("Closed index {} (#{}/{})", resourcePath, i + 1, numOperations);
+ }
+
+ // At this point, reference count should be balanced and unregistration should succeed
+ try {
+ datasetLifecycleManager.unregister(resourcePath);
+ LOGGER.info("Successfully unregistered index {} after balanced open/close operations", resourcePath);
+
+ // After unregistration, simulate real behavior by making the repository return null for this path
+ // This is what would happen in a real environment - the resource would be removed from storage
+ mockResources.remove(resourcePath);
+ LOGGER.info("Removed resource {} from mock resources", resourcePath);
+ } catch (HyracksDataException e) {
+ fail("Failed to unregister index after balanced open/close: " + e.getMessage());
+ }
+
+ // Verify the index is actually unregistered by attempting to get it
+ IIndex retrievedIndex = datasetLifecycleManager.get(resourcePath);
+ assertNull("Index should no longer be registered after unregistration", retrievedIndex);
+
+ // Verify the resource is no longer in our mock repository
+ LocalResource retrievedResource = mockResourceRepository.get(resourcePath);
+ assertNull("Resource should be removed from repository after unregistration", retrievedResource);
+
+ // Try to open the unregistered index - should fail
+ boolean openFailed = false;
+ try {
+ datasetLifecycleManager.open(resourcePath);
+ } catch (HyracksDataException e) {
+ openFailed = true;
+ LOGGER.info("As expected, failed to open unregistered index: {}", e.getMessage());
+ assertTrue("Error should indicate the index doesn't exist",
+ e.getMessage().contains("Index does not exist"));
+ }
+ assertTrue("Opening an unregistered index should fail", openFailed);
+ }
+
+ // Add these accessor methods at the end of the class
+
+ /**
+ * Returns the dataset lifecycle manager for testing
+ */
+ public DatasetLifecycleManager getDatasetLifecycleManager() {
+ return datasetLifecycleManager;
+ }
+
+ /**
+ * Returns the mock service context for testing
+ */
+ public INCServiceContext getMockServiceContext() {
+ return mockServiceContext;
+ }
+
+ /**
+ * Returns the list of created resource paths
+ */
+ public List<String> getCreatedResourcePaths() {
+ return createdResourcePaths;
+ }
+
+ /**
+ * Returns the map of mock resources
+ */
+ public Map<String, LocalResource> getMockResources() {
+ return mockResources;
+ }
+
+ /**
+ * Returns the mock recovery manager
+ */
+ public IRecoveryManager getMockRecoveryManager() {
+ return mockRecoveryManager;
+ }
+
+ /**
+ * Returns the mock index for a path
+ */
+ public MockIndex getMockIndex(String resourcePath) {
+ return mockIndexes.get(resourcePath);
+ }
+
+ /**
+ * Returns the mock resource repository
+ */
+ public ILocalResourceRepository getMockResourceRepository() {
+ return mockResourceRepository;
+ }
+
+ /**
+ * Creates a mock index for a resource path and adds it to the mockIndexes map.
+ * This is used by DatasetLifecycleManagerLazyRecoveryTest to ensure consistent mock creation.
+ */
+ public void createMockIndexForPath(String resourcePath, int datasetId, int partition,
+ PrimaryIndexOperationTracker opTracker) throws HyracksDataException {
+ // Extract needed dataset ID from the path or use provided one
+ MockIndex mockIndex = new MockIndex(resourcePath, datasetId, datasetResourceMap);
+
+ // Set the operation tracker directly if provided
+ if (opTracker != null) {
+ mockIndex.setOperationTracker(opTracker);
+ }
+
+ // Store the mock index
+ mockIndexes.put(resourcePath, mockIndex);
+
+ // Create an answer that returns this mock index
+ DatasetLocalResource mockDatasetResource = null;
+ for (Map.Entry<String, LocalResource> entry : mockResources.entrySet()) {
+ if (entry.getKey().equals(resourcePath)) {
+ mockDatasetResource = (DatasetLocalResource) entry.getValue().getResource();
+ break;
+ }
+ }
+
+ if (mockDatasetResource != null) {
+ when(mockDatasetResource.createInstance(any())).thenReturn(mockIndex);
+ }
+
+ LOGGER.debug("Created mock index for {}: {}", resourcePath, mockIndex);
+ }
+}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerLazyRecoveryTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerLazyRecoveryTest.java
new file mode 100644
index 0000000..a4b5a55
--- /dev/null
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/common/context/DatasetLifecycleManagerLazyRecoveryTest.java
@@ -0,0 +1,1223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests the behavior of the DatasetLifecycleManager with lazy recovery enabled.
+ * This test class focuses on verifying that lazy recovery works correctly.
+ */
+public class DatasetLifecycleManagerLazyRecoveryTest {
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private DatasetLifecycleManagerConcurrentTest concurrentTest;
+ private List<String> recoveredResourcePaths;
+ private Set<String> datasetPartitionPaths;
+ private Map<String, FileReference> pathToFileRefMap;
+ private ExecutorService executorService;
+
+ @Before
+ public void setUp() throws Exception {
+ // Create and initialize the base test
+ concurrentTest = new DatasetLifecycleManagerConcurrentTest();
+ concurrentTest.setUp();
+
+ // Enable lazy recovery
+ concurrentTest.setLazyRecoveryEnabled(true);
+ LOGGER.info("Lazy recovery enabled for all tests");
+
+ // Track recovered resources
+ recoveredResourcePaths = new ArrayList<>();
+ datasetPartitionPaths = new TreeSet<>();
+ pathToFileRefMap = new HashMap<>();
+
+ // Extract the dataset partition paths from the resource paths and log partition info
+ // These are the parent directories that contain multiple index files
+ // Format: storage/partition_X/DBName/ScopeName/DatasetName/ReplicationFactor/IndexName
+ // Note: The replication factor is ALWAYS 0 in production and should be 0 in all tests
+ Pattern pattern = Pattern.compile("(.*?/partition_(\\d+)/[^/]+/[^/]+/[^/]+/(\\d+))/.+");
+
+ // Map to collect resources by partition for logging
+ Map<Integer, List<String>> resourcesByPartition = new HashMap<>();
+
+ for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
+ Matcher matcher = pattern.matcher(resourcePath);
+ if (matcher.matches()) {
+ String partitionPath = matcher.group(1); // Now includes the replication factor
+ int partitionId = Integer.parseInt(matcher.group(2));
+ int replicationFactor = Integer.parseInt(matcher.group(3)); // Usually 0
+
+ LOGGER.info("Resource path: {}", resourcePath);
+ LOGGER.info(" Partition path: {}", partitionPath);
+ LOGGER.info(" Partition ID: {}", partitionId);
+ LOGGER.info(" Replication factor: {}", replicationFactor);
+
+ // Track resources by partition for logging
+ resourcesByPartition.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(resourcePath);
+
+ // Add to our partition paths for mocking
+ datasetPartitionPaths.add(partitionPath);
+
+ // Create a FileReference mock for this partition path
+ FileReference mockPartitionRef = mock(FileReference.class);
+ when(mockPartitionRef.getAbsolutePath()).thenReturn(partitionPath);
+ pathToFileRefMap.put(partitionPath, mockPartitionRef);
+ } else {
+ LOGGER.warn("Resource path doesn't match expected pattern: {}", resourcePath);
+ }
+ }
+
+ // Log resources by partition for debugging
+ LOGGER.info("Found {} dataset partition paths", datasetPartitionPaths.size());
+ for (Map.Entry<Integer, List<String>> entry : resourcesByPartition.entrySet()) {
+ LOGGER.info("Partition {}: {} resources", entry.getKey(), entry.getValue().size());
+ for (String path : entry.getValue()) {
+ LOGGER.info(" - {}", path);
+ }
+ }
+
+ // Mock the IOManager.resolve() method to return our mock FileReferences
+ IIOManager mockIOManager = concurrentTest.getMockServiceContext().getIoManager();
+ doAnswer(inv -> {
+ String path = inv.getArgument(0);
+ // For each possible partition path, check if this is the path being resolved
+ for (String partitionPath : datasetPartitionPaths) {
+ if (path.equals(partitionPath)) {
+ LOGGER.info("Resolving path {} to FileReference", path);
+ return pathToFileRefMap.get(partitionPath);
+ }
+ }
+ // If no match, create a new mock FileReference
+ FileReference mockFileRef = mock(FileReference.class);
+ when(mockFileRef.getAbsolutePath()).thenReturn(path);
+ when(mockFileRef.toString()).thenReturn(path);
+ return mockFileRef;
+ }).when(mockIOManager).resolve(anyString());
+
+ // Create executor service for concurrent tests
+ executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ }
+
+ /**
+ * Tests that when an index is opened with lazy recovery enabled,
+ * all resources under the same dataset partition path are recovered together.
+ * This verifies the batch recovery behavior for all resources with the same prefix path.
+ */
+ @Test
+ public void testLazyRecoveryForEntirePartition() throws Exception {
+ LOGGER.info("Starting testLazyRecoveryForEntirePartition");
+
+ // Get resources for one dataset partition
+ String datasetPartitionPath = datasetPartitionPaths.iterator().next();
+ List<String> resourcesInPartition = concurrentTest.getCreatedResourcePaths().stream()
+ .filter(path -> path.startsWith(datasetPartitionPath)).collect(Collectors.toList());
+
+ LOGGER.info("Testing partition path: {}", datasetPartitionPath);
+ LOGGER.info("Resources in this partition: {}", resourcesInPartition);
+
+ // Mock the resource repository to track calls to getResources
+ ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
+
+ // Setup capture of resources that get recovered - using FileReference
+ doAnswer(inv -> {
+ List<FileReference> refs = inv.getArgument(1);
+ if (refs != null && !refs.isEmpty()) {
+ FileReference fileRef = refs.get(0);
+ String rootPath = fileRef.getAbsolutePath();
+ LOGGER.info("Looking up resources with FileReference: {}", fileRef);
+ LOGGER.info("FileReference absolute path: {}", rootPath);
+
+ // The rootPath from DatasetLifecycleManager now includes the replication factor
+ // e.g., storage/partition_1/Default/SDefault/ds0/0
+ LOGGER.info("Using corrected rootPath for resource lookup: {}", rootPath);
+
+ // Get all resources that match this root path
+ Map<Long, LocalResource> matchingResources = new HashMap<>();
+
+ // Convert our string-keyed map to long-keyed map as expected by the API
+ // Note: We need to match exactly, not just prefix, because the rootPath now includes the replication factor
+ concurrentTest.getMockResources().entrySet().stream()
+ .filter(entry -> entry.getKey().equals(rootPath) || entry.getKey().startsWith(rootPath + "/"))
+ .forEach(entry -> {
+ LocalResource resource = entry.getValue();
+ matchingResources.put(resource.getId(), resource);
+ // Record which resources were recovered for our test verification
+ recoveredResourcePaths.add(entry.getKey());
+ LOGGER.info("Including resource for recovery: {} (ID: {})", entry.getKey(),
+ resource.getId());
+ });
+
+ LOGGER.info("Found {} resources to recover", matchingResources.size());
+
+ // Return resources mapped by ID as the real implementation would
+ return matchingResources;
+ }
+ return Map.of();
+ }).when(mockRepository).getResources(any(), anyList());
+
+ // Setup capture for recoverIndexes calls
+ doAnswer(inv -> {
+ List<ILSMIndex> indexes = inv.getArgument(0);
+ if (indexes != null && !indexes.isEmpty()) {
+ LOGGER.info("Recovering {} indexes", indexes.size());
+ for (ILSMIndex index : indexes) {
+ LOGGER.info("Recovering index: {}", index);
+ }
+ }
+ return null;
+ }).when(concurrentTest.getMockRecoveryManager()).recoverIndexes(anyList());
+
+ // Choose the first resource in the partition to register and open
+ String resourcePath = resourcesInPartition.get(0);
+
+ // Register the index
+ IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
+ assertNotNull("Index should be registered successfully", index);
+ LOGGER.info("Successfully registered index at {}", resourcePath);
+
+ // Open the index - should trigger lazy recovery
+ concurrentTest.getDatasetLifecycleManager().open(resourcePath);
+ LOGGER.info("Successfully opened index at {}", resourcePath);
+
+ // Verify the recovery manager was consulted
+ verify(concurrentTest.getMockRecoveryManager(), atLeastOnce()).isLazyRecoveryEnabled();
+
+ // Verify that all resources in the partition were considered for recovery
+ LOGGER.info("Verifying recovery for resources in partition: {}", resourcesInPartition);
+ LOGGER.info("Resources marked as recovered: {}", recoveredResourcePaths);
+
+ for (String path : resourcesInPartition) {
+ boolean wasRecovered = isResourcePathRecovered(path);
+ LOGGER.info("Resource {} - recovered: {}", path, wasRecovered);
+ assertTrue("Resource should have been considered for recovery: " + path, wasRecovered);
+ }
+
+ // Cleanup
+ concurrentTest.getDatasetLifecycleManager().close(resourcePath);
+ concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
+ concurrentTest.getMockResources().remove(resourcePath);
+
+ // Verify cleanup was successful
+ assertNull("Index should no longer be registered",
+ concurrentTest.getDatasetLifecycleManager().get(resourcePath));
+ }
+
+ /**
+ * Tests that lazy recovery is only performed the first time an index is opened
+ * and is skipped on subsequent opens.
+ */
+ @Test
+ public void testLazyRecoveryOnlyHappensOnce() throws Exception {
+ LOGGER.info("Starting testLazyRecoveryOnlyHappensOnce");
+
+ // Get a resource path
+ String resourcePath = concurrentTest.getCreatedResourcePaths().get(0);
+ LOGGER.info("Testing with resource: {}", resourcePath);
+
+ // Find the dataset partition path for this resource
+ String partitionPath = null;
+ for (String path : datasetPartitionPaths) {
+ if (resourcePath.startsWith(path)) {
+ partitionPath = path;
+ break;
+ }
+ }
+ LOGGER.info("Resource belongs to partition: {}", partitionPath);
+
+ // Track recovery calls
+ List<String> recoveredPaths = new ArrayList<>();
+
+ // Mock repository to track resource lookups during recovery
+ final Map<Integer, Integer> recoveryCount = new HashMap<>();
+ recoveryCount.put(0, 0); // First recovery count
+ recoveryCount.put(1, 0); // Second recovery count
+
+ ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
+
+ // Setup capture of resources that get recovered
+ doAnswer(inv -> {
+ List<FileReference> refs = inv.getArgument(1);
+ if (refs != null && !refs.isEmpty()) {
+ FileReference fileRef = refs.get(0);
+ String rootPath = fileRef.getAbsolutePath();
+
+ // Get all resources that match this root path
+ Map<Long, LocalResource> matchingResources = new HashMap<>();
+
+ concurrentTest.getMockResources().entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith(rootPath)).forEach(entry -> {
+ LocalResource resource = entry.getValue();
+ matchingResources.put(resource.getId(), resource);
+ recoveredPaths.add(entry.getKey());
+ });
+
+ // Increment recovery counter based on which open we're on
+ if (recoveryCount.get(1) == 0) {
+ recoveryCount.put(0, recoveryCount.get(0) + matchingResources.size());
+ LOGGER.info("First open recovery finding {} resources", matchingResources.size());
+ } else {
+ recoveryCount.put(1, recoveryCount.get(1) + matchingResources.size());
+ LOGGER.info("Second open recovery finding {} resources", matchingResources.size());
+ }
+
+ return matchingResources;
+ }
+ return Map.of();
+ }).when(mockRepository).getResources(any(), anyList());
+
+ // Mock recovery behavior to track calls
+ doAnswer(inv -> {
+ List<ILSMIndex> indexes = inv.getArgument(0);
+ if (indexes != null && !indexes.isEmpty()) {
+ for (ILSMIndex index : indexes) {
+ String path = index.toString();
+ LOGGER.info("Recovering index: {}", path);
+ }
+ }
+ return null;
+ }).when(concurrentTest.getMockRecoveryManager()).recoverIndexes(anyList());
+
+ // Register the index
+ IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
+ assertNotNull("Index should be registered successfully", index);
+
+ // First open - should trigger recovery
+ LOGGER.info("First open of index");
+ concurrentTest.getDatasetLifecycleManager().open(resourcePath);
+ LOGGER.info("First open recovery found {} resources", recoveryCount.get(0));
+
+ // Close the index
+ concurrentTest.getDatasetLifecycleManager().close(resourcePath);
+
+ // Second open - should NOT trigger recovery again
+ LOGGER.info("Second open of index");
+ recoveryCount.put(1, 0); // Reset second recovery counter
+ concurrentTest.getDatasetLifecycleManager().open(resourcePath);
+ LOGGER.info("Second open recovery found {} resources", recoveryCount.get(1));
+
+ // Verify recovery was skipped on second open
+ assertTrue("First open should recover resources", recoveryCount.get(0) > 0);
+ assertTrue("Recovery should not happen on subsequent opens",
+ recoveryCount.get(1) == 0 || recoveryCount.get(1) < recoveryCount.get(0));
+
+ // Cleanup
+ concurrentTest.getDatasetLifecycleManager().close(resourcePath);
+ concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
+ concurrentTest.getMockResources().remove(resourcePath);
+ }
+
+ /**
+ * A comprehensive test that runs multiple operations with lazy recovery enabled.
+ * This test verifies that lazy recovery works correctly across various scenarios:
+ * - Opening multiple resources in the same partition
+ * - Operations across multiple partitions
+ * - Reference counting with multiple open/close
+ * - Proper unregistration after use
+ */
+ @Test
+ public void testComprehensiveLazyRecoveryWithMultipleResources() throws Exception {
+ LOGGER.info("Starting testComprehensiveLazyRecoveryWithMultipleResources");
+
+ // Create sets to track which resources have been recovered
+ Set<String> recoveredResources = new TreeSet<>();
+ Set<String> recoveredPartitions = new TreeSet<>();
+
+ // Mock repository to track resource lookups during recovery
+ ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
+
+ // Setup capture of resources that get recovered
+ doAnswer(inv -> {
+ List<FileReference> refs = inv.getArgument(1);
+ if (refs != null && !refs.isEmpty()) {
+ FileReference fileRef = refs.get(0);
+ String rootPath = fileRef.getAbsolutePath();
+ LOGGER.info("Looking up resources with root path: {}", rootPath);
+
+ // Track this partition as being recovered
+ recoveredPartitions.add(rootPath);
+
+ // Get all resources that match this root path
+ Map<Long, LocalResource> matchingResources = new HashMap<>();
+
+ concurrentTest.getMockResources().entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith(rootPath)).forEach(entry -> {
+ LocalResource resource = entry.getValue();
+ matchingResources.put(resource.getId(), resource);
+ recoveredResources.add(entry.getKey());
+ LOGGER.info("Including resource for recovery: {} (ID: {})", entry.getKey(),
+ resource.getId());
+ });
+
+ return matchingResources;
+ }
+ return Map.of();
+ }).when(mockRepository).getResources(any(), anyList());
+
+ // Group resources by partition for easier testing
+ Map<String, List<String>> resourcesByPartition = new HashMap<>();
+
+ for (String partitionPath : datasetPartitionPaths) {
+ List<String> resourcesInPartition = concurrentTest.getCreatedResourcePaths().stream()
+ .filter(path -> path.startsWith(partitionPath)).collect(Collectors.toList());
+
+ resourcesByPartition.put(partitionPath, resourcesInPartition);
+ }
+
+ LOGGER.info("Found {} partitions with resources", resourcesByPartition.size());
+
+ // For each partition, perform a series of operations
+ for (Map.Entry<String, List<String>> entry : resourcesByPartition.entrySet()) {
+ String partitionPath = entry.getKey();
+ List<String> resources = entry.getValue();
+
+ LOGGER.info("Testing partition: {} with {} resources", partitionPath, resources.size());
+
+ // 1. Register and open the first resource - should trigger recovery for all resources in this partition
+ String firstResource = resources.get(0);
+ LOGGER.info("Registering and opening first resource: {}", firstResource);
+
+ IIndex firstIndex = concurrentTest.registerIfAbsentIndex(firstResource);
+ assertNotNull("First index should be registered successfully", firstIndex);
+
+ // Open the index - should trigger lazy recovery for all resources in this partition
+ concurrentTest.getDatasetLifecycleManager().open(firstResource);
+
+ // Verify partition was recovered
+ assertTrue("Partition should be marked as recovered", recoveredPartitions.contains(partitionPath));
+
+ // 2. Register and open a second resource if available - should NOT trigger recovery again
+ if (resources.size() > 1) {
+ // Clear recovery tracking before opening second resource
+ int beforeSize = recoveredResources.size();
+ String secondResource = resources.get(1);
+
+ LOGGER.info("Registering and opening second resource: {}", secondResource);
+ IIndex secondIndex = concurrentTest.registerIfAbsentIndex(secondResource);
+ assertNotNull("Second index should be registered successfully", secondIndex);
+
+ // Open the second index - should NOT trigger lazy recovery again for this partition
+ concurrentTest.getDatasetLifecycleManager().open(secondResource);
+
+ // Verify no additional recovery happened
+ int afterSize = recoveredResources.size();
+ LOGGER.info("Resources recovered before: {}, after: {}", beforeSize, afterSize);
+ // We might see some recovery of the specific resource, but not the whole partition again
+
+ // 3. Open and close the second resource multiple times to test reference counting
+ for (int i = 0; i < 3; i++) {
+ concurrentTest.getDatasetLifecycleManager().close(secondResource);
+ concurrentTest.getDatasetLifecycleManager().open(secondResource);
+ }
+
+ // Close and unregister the second resource
+ concurrentTest.getDatasetLifecycleManager().close(secondResource);
+ concurrentTest.getDatasetLifecycleManager().unregister(secondResource);
+ LOGGER.info("Successfully closed and unregistered second resource: {}", secondResource);
+ }
+
+ // 4. Close and unregister the first resource
+ concurrentTest.getDatasetLifecycleManager().close(firstResource);
+ concurrentTest.getDatasetLifecycleManager().unregister(firstResource);
+ LOGGER.info("Successfully closed and unregistered first resource: {}", firstResource);
+
+ // Verify both resources are truly unregistered
+ for (String resource : resources.subList(0, Math.min(2, resources.size()))) {
+ assertNull("Resource should be unregistered: " + resource,
+ concurrentTest.getDatasetLifecycleManager().get(resource));
+ }
+ }
+
+ // Summary of recovery
+ LOGGER.info("Recovered {} partitions out of {}", recoveredPartitions.size(), datasetPartitionPaths.size());
+ LOGGER.info("Recovered {} resources out of {}", recoveredResources.size(),
+ concurrentTest.getCreatedResourcePaths().size());
+ }
+
+ /**
+ * Tests that open operation succeeds after unregister + register sequence.
+ * This verifies the operation tracker is properly reset when re-registering.
+ */
+ @Test
+ public void testUnregisterRegisterIfAbsentOpenSequence() throws Exception {
+ LOGGER.info("Starting testUnregisterRegisterOpenSequence");
+
+ // Get a resource path
+ String resourcePath = concurrentTest.getCreatedResourcePaths().get(0);
+ LOGGER.info("Testing with resource: {}", resourcePath);
+
+ // First lifecycle - register, open, close, unregister
+ IIndex firstIndex = concurrentTest.registerIfAbsentIndex(resourcePath);
+ assertNotNull("Index should be registered successfully in first lifecycle", firstIndex);
+
+ concurrentTest.getDatasetLifecycleManager().open(resourcePath);
+ LOGGER.info("Successfully opened index in first lifecycle");
+
+ concurrentTest.getDatasetLifecycleManager().close(resourcePath);
+ concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
+ LOGGER.info("Successfully closed and unregistered index in first lifecycle");
+
+ // Second lifecycle - register again, open, close, unregister
+ IIndex secondIndex = concurrentTest.registerIfAbsentIndex(resourcePath);
+ assertNotNull("Index should be registered successfully in second lifecycle", secondIndex);
+
+ // This open should succeed if the operation tracker was properly reset
+ try {
+ concurrentTest.getDatasetLifecycleManager().open(resourcePath);
+ LOGGER.info("Successfully opened index in second lifecycle");
+ } catch (Exception e) {
+ LOGGER.error("Failed to open index in second lifecycle", e);
+ fail("Open should succeed in second lifecycle: " + e.getMessage());
+ }
+
+ // Cleanup
+ concurrentTest.getDatasetLifecycleManager().close(resourcePath);
+ concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
+ LOGGER.info("Successfully completed two full lifecycles with the same resource");
+ }
+
+ /**
+ * Tests that recovery works correctly across different partitions.
+ * This test specifically verifies that partition IDs are correctly propagated
+ * during the recovery process.
+ */
+ @Test
+ public void testRecoveryAcrossPartitions() throws Exception {
+ LOGGER.info("Starting testRecoveryAcrossPartitions");
+
+ // Skip test if we don't have at least 2 partitions
+ if (datasetPartitionPaths.size() < 2) {
+ LOGGER.info("Skipping testRecoveryAcrossPartitions - need at least 2 partitions");
+ return;
+ }
+
+ // Create maps to track recovery by partition
+ Map<Integer, Boolean> partitionRecovered = new HashMap<>();
+ Map<Integer, Set<String>> resourcesRecoveredByPartition = new HashMap<>();
+
+ // Find resources from different partitions
+ Map<Integer, String> resourcesByPartition = new HashMap<>();
+ // Format: storage/partition_X/DBName/ScopeName/DatasetName/ReplicationFactor/IndexName
+ Pattern pattern = Pattern.compile(".*?/partition_(\\d+)/.+");
+
+ for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
+ Matcher matcher = pattern.matcher(resourcePath);
+ if (matcher.matches()) {
+ int partitionId = Integer.parseInt(matcher.group(1));
+ resourcesByPartition.putIfAbsent(partitionId, resourcePath);
+ partitionRecovered.put(partitionId, false);
+ resourcesRecoveredByPartition.put(partitionId, new TreeSet<>());
+ }
+ }
+
+ LOGGER.info("Found resources in {} different partitions: {}", resourcesByPartition.size(),
+ resourcesByPartition.keySet());
+
+ // Mock repository to track which partition's resources are being recovered
+ ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
+
+ // Setup capture of resources that get recovered
+ doAnswer(inv -> {
+ List<FileReference> refs = inv.getArgument(1);
+ if (refs != null && !refs.isEmpty()) {
+ FileReference fileRef = refs.get(0);
+ String rootPath = fileRef.getAbsolutePath();
+ LOGGER.info("Looking up resources with root path: {}", rootPath);
+
+ // The rootPath now includes the replication factor
+ LOGGER.info("Using corrected rootPath for resource lookup: {}", rootPath);
+
+ // Get the partition ID from the path
+ Matcher matcher = Pattern.compile(".*?/partition_(\\d+)/.+").matcher(rootPath);
+ if (matcher.matches()) {
+ int partitionId = Integer.parseInt(matcher.group(1));
+ partitionRecovered.put(partitionId, true);
+ LOGGER.info("Recovery requested for partition {}", partitionId);
+ } else {
+ LOGGER.warn("Could not extract partition ID from path: {}", rootPath);
+ }
+
+ // Get all resources that match this root path
+ Map<Long, LocalResource> matchingResources = new HashMap<>();
+
+ concurrentTest.getMockResources().entrySet().stream()
+ .filter(entry -> entry.getKey().equals(rootPath) || entry.getKey().startsWith(rootPath + "/"))
+ .forEach(entry -> {
+ LocalResource resource = entry.getValue();
+ matchingResources.put(resource.getId(), resource);
+
+ // Extract partition ID from path
+ Matcher m = Pattern.compile(".*?/partition_(\\d+)/.+").matcher(entry.getKey());
+ if (m.matches()) {
+ int partitionId = Integer.parseInt(m.group(1));
+ resourcesRecoveredByPartition.get(partitionId).add(entry.getKey());
+ LOGGER.info("Resource {} added to recovery for partition {}", entry.getKey(),
+ partitionId);
+ }
+ });
+
+ LOGGER.info("Found {} resources to recover", matchingResources.size());
+ return matchingResources;
+ }
+ return Map.of();
+ }).when(mockRepository).getResources(any(), anyList());
+
+ // Open resources from each partition
+ for (Map.Entry<Integer, String> entry : resourcesByPartition.entrySet()) {
+ int partitionId = entry.getKey();
+ String resourcePath = entry.getValue();
+
+ LOGGER.info("Testing recovery for partition {} with resource {}", partitionId, resourcePath);
+
+ // Register and open the resource
+ IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
+ assertNotNull("Index should be registered successfully", index);
+
+ // Get the LocalResource and verify its partition ID
+ LocalResource localResource = concurrentTest.getMockResources().get(resourcePath);
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
+ int resourcePartitionId = datasetLocalResource.getPartition();
+
+ LOGGER.info("Resource {} has partition ID {} (expected {})", resourcePath, resourcePartitionId,
+ partitionId);
+
+ // Open the index - should trigger lazy recovery for this partition
+ concurrentTest.getDatasetLifecycleManager().open(resourcePath);
+ LOGGER.info("Successfully opened index for partition {}", partitionId);
+
+ // Verify this partition was recovered
+ assertTrue("Partition " + partitionId + " should be marked as recovered",
+ partitionRecovered.get(partitionId));
+
+ // Verify resources for this partition were recovered
+ Set<String> recoveredResources = resourcesRecoveredByPartition.get(partitionId);
+ assertTrue("At least one resource should be recovered for partition " + partitionId,
+ !recoveredResources.isEmpty());
+
+ // Cleanup
+ concurrentTest.getDatasetLifecycleManager().close(resourcePath);
+ concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
+ }
+
+ // Summary of recovery by partition
+ for (Map.Entry<Integer, Boolean> entry : partitionRecovered.entrySet()) {
+ int partitionId = entry.getKey();
+ boolean recovered = entry.getValue();
+ int resourceCount = resourcesRecoveredByPartition.get(partitionId).size();
+
+ LOGGER.info("Partition {}: Recovered={}, Resources recovered={}", partitionId, recovered, resourceCount);
+ }
+ }
+
+ /**
+ * Tests that the partition ID in DatasetLocalResource objects is correctly set.
+ * This test specifically verifies that partition IDs match the resource path.
+ */
+ @Test
+ public void testPartitionIdMatchesPath() throws Exception {
+ LOGGER.info("Starting testPartitionIdMatchesPath");
+
+ // Find resources from different partitions
+ Map<Integer, Set<String>> resourcesByPartition = new HashMap<>();
+ Pattern pattern = Pattern.compile(".*?/partition_(\\d+)/.+");
+
+ // First, collect all resources by partition
+ for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
+ Matcher matcher = pattern.matcher(resourcePath);
+ if (matcher.matches()) {
+ int partitionId = Integer.parseInt(matcher.group(1));
+ resourcesByPartition.computeIfAbsent(partitionId, k -> new TreeSet<>()).add(resourcePath);
+ }
+ }
+
+ boolean incorrectPartitionIdFound = false;
+
+ // Examine each resource and check its partition ID
+ for (Map.Entry<Integer, Set<String>> entry : resourcesByPartition.entrySet()) {
+ int expectedPartitionId = entry.getKey();
+ Set<String> resources = entry.getValue();
+
+ LOGGER.info("Checking {} resources for partition {}", resources.size(), expectedPartitionId);
+
+ for (String resourcePath : resources) {
+ LocalResource localResource = concurrentTest.getMockResources().get(resourcePath);
+ if (localResource != null) {
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
+ int actualPartitionId = datasetLocalResource.getPartition();
+
+ LOGGER.info("Resource: {}, Expected partition: {}, Actual partition: {}", resourcePath,
+ expectedPartitionId, actualPartitionId);
+
+ if (expectedPartitionId != actualPartitionId) {
+ LOGGER.error("PARTITION ID MISMATCH for resource {}! Expected: {}, Actual: {}", resourcePath,
+ expectedPartitionId, actualPartitionId);
+ incorrectPartitionIdFound = true;
+ }
+ } else {
+ LOGGER.warn("Resource not found in mockResources: {}", resourcePath);
+ }
+ }
+ }
+
+ // This assertion will fail if any partition ID is incorrect
+ assertFalse("Incorrect partition IDs found in resources", incorrectPartitionIdFound);
+
+ // Now test DatasetLifecycleManager.getDatasetLocalResource directly
+ LOGGER.info("Testing DatasetLifecycleManager.getDatasetLocalResource directly");
+
+ // Find a resource in partition 1 (if available)
+ Set<String> partition1Resources = resourcesByPartition.getOrDefault(1, Collections.emptySet());
+ if (!partition1Resources.isEmpty()) {
+ String resourcePath = partition1Resources.iterator().next();
+ LOGGER.info("Testing getDatasetLocalResource with partition 1 resource: {}", resourcePath);
+
+ // Register the resource
+ IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
+ assertNotNull("Index should be registered successfully", index);
+
+ // Directly access the DatasetLocalResource to check partition ID
+ // We'll need to use reflection since getDatasetLocalResource is private
+ try {
+ // Get the private method
+ Method getDatasetLocalResourceMethod =
+ DatasetLifecycleManager.class.getDeclaredMethod("getDatasetLocalResource", String.class);
+ getDatasetLocalResourceMethod.setAccessible(true);
+
+ // Invoke the method
+ DatasetLocalResource result = (DatasetLocalResource) getDatasetLocalResourceMethod
+ .invoke(concurrentTest.getDatasetLifecycleManager(), resourcePath);
+
+ assertNotNull("DatasetLocalResource should not be null", result);
+ int actualPartitionId = result.getPartition();
+ LOGGER.info("getDatasetLocalResource for {} returned partition ID: {}", resourcePath,
+ actualPartitionId);
+ assertEquals("Partition ID should be 1", 1, actualPartitionId);
+ } catch (Exception e) {
+ LOGGER.error("Error accessing getDatasetLocalResource method", e);
+ fail("Failed to access getDatasetLocalResource method: " + e.getMessage());
+ }
+
+ // Clean up
+ concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
+ } else {
+ LOGGER.info("No resources found in partition 1, skipping direct test");
+ }
+ }
+
+ /**
+ * Tests that the mock resources are correctly set up with the proper partition IDs.
+ * This test directly examines the mockResources map to verify the mocks.
+ */
+ @Test
+ public void testMockResourcesSetup() throws Exception {
+ LOGGER.info("Starting testMockResourcesSetup");
+
+ // Extract partition IDs from paths and compare with DatasetLocalResource partitions
+ Pattern pattern = Pattern.compile(".*?/partition_(\\d+)/.+");
+ int correctResources = 0;
+ int incorrectResources = 0;
+
+ LOGGER.info("Total mock resources: {}", concurrentTest.getMockResources().size());
+
+ // Print out all setupMockResource calls that would be needed to recreate the resources
+ for (Map.Entry<String, LocalResource> entry : concurrentTest.getMockResources().entrySet()) {
+ String resourcePath = entry.getKey();
+ LocalResource localResource = entry.getValue();
+
+ // Extract the expected partition ID from the path
+ Matcher matcher = pattern.matcher(resourcePath);
+ if (matcher.matches()) {
+ int expectedPartitionId = Integer.parseInt(matcher.group(1));
+
+ // Get the actual partition ID from the resource
+ Object resourceObj = localResource.getResource();
+ if (resourceObj instanceof DatasetLocalResource) {
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) resourceObj;
+ int actualPartitionId = datasetLocalResource.getPartition();
+ int datasetId = datasetLocalResource.getDatasetId();
+ long resourceId = localResource.getId();
+
+ // Log resource details
+ LOGGER.info(
+ "Resource: {}, Expected Partition: {}, Actual Partition: {}, Dataset ID: {}, Resource ID: {}",
+ resourcePath, expectedPartitionId, actualPartitionId, datasetId, resourceId);
+
+ // Generate the setupMockResource call that would create this resource
+ LOGGER.info("setupMockResource(\"{}\", {}, {}, {});", resourcePath, datasetId, expectedPartitionId,
+ resourceId);
+
+ if (expectedPartitionId == actualPartitionId) {
+ correctResources++;
+ } else {
+ incorrectResources++;
+ LOGGER.error("PARTITION MISMATCH in mock resources: {} (expected: {}, actual: {})",
+ resourcePath, expectedPartitionId, actualPartitionId);
+ }
+ } else {
+ LOGGER.warn("Resource object is not a DatasetLocalResource: {}", resourceObj);
+ }
+ } else {
+ LOGGER.warn("Resource path doesn't match expected pattern: {}", resourcePath);
+ }
+ }
+
+ LOGGER.info("Resources with correct partition IDs: {}", correctResources);
+ LOGGER.info("Resources with incorrect partition IDs: {}", incorrectResources);
+
+ // Fail the test if any resources have incorrect partition IDs
+ assertEquals("All resources should have correct partition IDs", 0, incorrectResources);
+
+ // Now test a direct mock creation and retrieval to see if that works correctly
+ LOGGER.info("Testing direct mock creation and retrieval");
+
+ // Create a test mock resource with partition 1
+ String testPath = "test/partition_1/Default/SDefault/testDs/0/testIndex"; // 0 is replication factor (always 0)
+ int testDatasetId = 999;
+ int testPartition = 1;
+ long testResourceId = 12345;
+
+ // Create a mock dataset local resource
+ DatasetLocalResource mockDatasetResource = mock(DatasetLocalResource.class);
+ when(mockDatasetResource.getDatasetId()).thenReturn(testDatasetId);
+ when(mockDatasetResource.getPartition()).thenReturn(testPartition);
+
+ // Create a mock LSMINDEX (not just IIndex) since recovery expects ILSMIndex
+ ILSMIndex mockIndex = mock(ILSMIndex.class);
+ when(mockIndex.toString()).thenReturn(testPath);
+
+ // Add necessary behavior for LSM operations
+ when(mockIndex.isDurable()).thenReturn(true);
+ when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
+
+ // Allow this resource to be created
+ when(mockDatasetResource.createInstance(any())).thenReturn(mockIndex);
+
+ // Create a mock local resource
+ LocalResource mockLocalResource = mock(LocalResource.class);
+ when(mockLocalResource.getId()).thenReturn(testResourceId);
+ when(mockLocalResource.getPath()).thenReturn(testPath);
+ when(mockLocalResource.getResource()).thenReturn(mockDatasetResource);
+
+ // Add to mock resources map
+ concurrentTest.getMockResources().put(testPath, mockLocalResource);
+
+ // Create a mock file reference
+ FileReference mockFileRef = mock(FileReference.class);
+ when(mockFileRef.getRelativePath()).thenReturn(testPath);
+ when(mockFileRef.getAbsolutePath()).thenReturn(testPath);
+ when(concurrentTest.getMockServiceContext().getIoManager().resolveAbsolutePath(testPath))
+ .thenReturn(mockFileRef);
+ }
+
+ /**
+ * Creates a mock resource for testing.
+ */
+ private void setupMockResourceForTest(String resourcePath, int datasetId, int partition, long resourceId)
+ throws HyracksDataException {
+ // Create a mock dataset local resource
+ DatasetLocalResource mockDatasetResource = mock(DatasetLocalResource.class);
+ when(mockDatasetResource.getDatasetId()).thenReturn(datasetId);
+ when(mockDatasetResource.getPartition()).thenReturn(partition);
+
+ // Ensure we have a dataset resource
+ DatasetResource datasetResource = concurrentTest.getDatasetLifecycleManager().getDatasetLifecycle(datasetId);
+
+ // Create op tracker if needed
+ PrimaryIndexOperationTracker opTracker = datasetResource.getOpTracker(partition);
+ if (opTracker == null) {
+ opTracker = mock(PrimaryIndexOperationTracker.class);
+ datasetResource.setPrimaryIndexOperationTracker(partition, opTracker);
+ LOGGER.info("Created operation tracker for dataset {} partition {}", datasetId, partition);
+ }
+
+ // Create a mock local resource
+ LocalResource mockLocalResource = mock(LocalResource.class);
+ when(mockLocalResource.getId()).thenReturn(resourceId);
+ when(mockLocalResource.getPath()).thenReturn(resourcePath);
+ when(mockLocalResource.getResource()).thenReturn(mockDatasetResource);
+
+ // Add to mock resources map
+ concurrentTest.getMockResources().put(resourcePath, mockLocalResource);
+
+ // Create a mock file reference
+ FileReference mockFileRef = mock(FileReference.class);
+ when(mockFileRef.getRelativePath()).thenReturn(resourcePath);
+ when(mockFileRef.getAbsolutePath()).thenReturn(resourcePath);
+ when(concurrentTest.getMockServiceContext().getIoManager().resolveAbsolutePath(resourcePath))
+ .thenReturn(mockFileRef);
+
+ // Ensure the mock index is the one that will be returned during registration
+ concurrentTest.createMockIndexForPath(resourcePath, datasetId, partition, opTracker);
+ }
+
+ /**
+ * Tests that unregister operation succeeds during lazy recovery.
+ * This test verifies that resources are correctly unregistered during lazy recovery.
+ */
+ @Test
+ public void testConcurrentUnregisterDuringLazyRecovery() throws Exception {
+ LOGGER.info("Starting testConcurrentUnregisterDuringLazyRecovery");
+
+ // We need at least 6 resources for a good test
+ final int REQUIRED_RESOURCES = 6;
+
+ // Get multiple resources from the same dataset and partition
+ List<String> resourcesInSamePartition = findResourcesInSamePartition(REQUIRED_RESOURCES);
+
+ // If we don't have enough resources, create more
+ if (resourcesInSamePartition.size() < REQUIRED_RESOURCES) {
+ LOGGER.info("Only found {} resources, creating additional resources", resourcesInSamePartition.size());
+
+ // Get dataset and partition info from the existing resources, or create new ones
+ int datasetId;
+ int partitionId;
+ String datasetName;
+
+ if (!resourcesInSamePartition.isEmpty()) {
+ // Use the same dataset and partition as existing resources
+ String existingResource = resourcesInSamePartition.get(0);
+ LocalResource localResource = concurrentTest.getMockResources().get(existingResource);
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
+ datasetId = datasetLocalResource.getDatasetId();
+ partitionId = datasetLocalResource.getPartition();
+
+ // Extract dataset name from the existing resource path
+ Pattern datasetPattern = Pattern.compile(".*?/partition_\\d+/[^/]+/[^/]+/([^/]+)/\\d+/.*");
+ Matcher matcher = datasetPattern.matcher(existingResource);
+ if (matcher.matches()) {
+ datasetName = matcher.group(1);
+ LOGGER.info("Using dataset name from existing resource: {}", datasetName);
+ } else {
+ datasetName = "testDs";
+ LOGGER.warn("Could not extract dataset name from {}, using default: {}", existingResource,
+ datasetName);
+ }
+ } else {
+ // Create new dataset and partition
+ datasetId = 999; // Use a unique dataset ID
+ partitionId = 1; // Use partition 1
+ datasetName = "testDs";
+ }
+
+ LOGGER.info("Creating resources for dataset {} partition {}", datasetId, partitionId);
+
+ // Create additional resources until we reach the required count
+ for (int i = resourcesInSamePartition.size(); i < REQUIRED_RESOURCES; i++) {
+ String indexName = "test_index_" + i;
+ // Use the dataset name extracted from existing resources
+ String resourcePath = String.format("%s/partition_%d/%s/%s/%s/0/%s", "storage", partitionId, "Default",
+ "SDefault", datasetName, indexName);
+
+ // Create new mock resources
+ setupMockResourceForTest(resourcePath, datasetId, partitionId, datasetId * 1000L + i);
+ resourcesInSamePartition.add(resourcePath);
+ // LOGGER.info("Created additional resource: {}", resourcePath);
+ }
+ }
+
+ LOGGER.info("Have {} resources for test", resourcesInSamePartition.size());
+
+ // Register ALL resources
+ List<String> registeredResources = new ArrayList<>();
+ for (String resourcePath : resourcesInSamePartition) {
+ IIndex index = concurrentTest.registerIfAbsentIndex(resourcePath);
+ assertNotNull("Index should be registered successfully: " + resourcePath, index);
+ registeredResources.add(resourcePath);
+ LOGGER.info("Registered resource: {}", resourcePath);
+ }
+
+ // We'll use 1/3 of resources for unregistering
+ int unregisterCount = Math.max(2, resourcesInSamePartition.size() / 3);
+
+ // Split resources: one for opening, several for unregistering, rest for recovery
+ String resourceForOpen = registeredResources.get(0);
+ List<String> resourcesToUnregister = registeredResources.subList(1, 1 + unregisterCount);
+ List<String> otherResources = registeredResources.subList(1 + unregisterCount, registeredResources.size());
+
+ LOGGER.info("Using {} resources: 1 for open, {} for unregistering, {} for verification",
+ registeredResources.size(), resourcesToUnregister.size(), otherResources.size());
+
+ // Get the dataset and partition ID for tracking
+ LocalResource localResourceA = concurrentTest.getMockResources().get(resourceForOpen);
+ DatasetLocalResource datasetLocalResourceA = (DatasetLocalResource) localResourceA.getResource();
+ int datasetId = datasetLocalResourceA.getDatasetId();
+ int partitionId = datasetLocalResourceA.getPartition();
+
+ LOGGER.info("Test using dataset {} partition {}", datasetId, partitionId);
+
+ // Set up mock for getResources to return actual resources during recovery
+ ILocalResourceRepository mockRepository = concurrentTest.getMockResourceRepository();
+ doAnswer(inv -> {
+ List<FileReference> refs = inv.getArgument(1);
+ if (refs != null && !refs.isEmpty()) {
+ // Create a map of resources to return
+ Map<Long, LocalResource> result = new HashMap<>();
+ // Add our test resources from the same partition
+ for (String path : registeredResources) {
+ LocalResource res = concurrentTest.getMockResources().get(path);
+ if (res != null) {
+ result.put(res.getId(), res);
+ LOGGER.info("Adding resource to recovery batch: {}", path);
+ }
+ }
+ return result;
+ }
+ return Collections.emptyMap();
+ }).when(mockRepository).getResources(any(), any());
+
+ // Ensure operation tracker is properly initialized for the dataset resources
+ DatasetResource datasetResource = concurrentTest.getDatasetLifecycleManager().getDatasetLifecycle(datasetId);
+ if (datasetResource.getOpTracker(partitionId) == null) {
+ PrimaryIndexOperationTracker opTracker = mock(PrimaryIndexOperationTracker.class);
+ datasetResource.setPrimaryIndexOperationTracker(partitionId, opTracker);
+ LOGGER.info("Created and set operation tracker for dataset {} partition {}", datasetId, partitionId);
+ }
+
+ // Verify operation tracker exists before proceeding
+ assertNotNull("Operation tracker should be initialized for partition " + partitionId,
+ datasetResource.getOpTracker(partitionId));
+
+ // Execute operations with controlled timing
+ CountDownLatch recoveryStartedLatch = new CountDownLatch(1);
+ CountDownLatch unregisterCompleteLatch = new CountDownLatch(1);
+
+ // Mock the recovery to signal when it starts
+ IRecoveryManager mockRecoveryManager = concurrentTest.getMockRecoveryManager();
+ doAnswer(invocation -> {
+ List<ILSMIndex> indexes = invocation.getArgument(0);
+ LOGGER.info("Recovery started with {} indexes", indexes.size());
+
+ // Signal that recovery has started
+ recoveryStartedLatch.countDown();
+
+ // Wait for unregister to complete before continuing
+ LOGGER.info("Waiting for unregister to complete");
+ unregisterCompleteLatch.await(10, TimeUnit.SECONDS);
+ LOGGER.info("Continuing with recovery after unregister");
+
+ // Mark resources as recovered
+ for (ILSMIndex index : indexes) {
+ String path = index.toString();
+ recoveredResourcePaths.add(path);
+ // Remove individual resource logs
+ }
+ // Add a summary after the loop
+ LOGGER.info("Marked {} resources as recovered", indexes.size());
+
+ return null;
+ }).when(mockRecoveryManager).recoverIndexes(anyList());
+
+ // Thread 1: Open resource for open (will trigger recovery for all)
+ Future<?> openFuture = executorService.submit(() -> {
+ try {
+ LOGGER.info("Thread 1: Opening resource to trigger recovery...");
+ concurrentTest.getDatasetLifecycleManager().open(resourceForOpen);
+ LOGGER.info("Thread 1: Successfully opened resource");
+ } catch (Exception e) {
+ LOGGER.error("Thread 1: Error opening resource", e);
+ }
+ });
+
+ // Wait for recovery to start
+ LOGGER.info("Waiting for recovery to start");
+ boolean recoveryStarted = recoveryStartedLatch.await(10, TimeUnit.SECONDS);
+ assertTrue("Recovery should have started", recoveryStarted);
+ LOGGER.info("Recovery has started");
+
+ // Thread 2: Unregister multiple resources while recovery is happening
+ Future<?> unregisterFuture = executorService.submit(() -> {
+ try {
+ LOGGER.info("Thread 2: Unregistering {} resources...", resourcesToUnregister.size());
+
+ for (String resourcePath : resourcesToUnregister) {
+ concurrentTest.getDatasetLifecycleManager().unregister(resourcePath);
+ }
+
+ // Signal that unregister is complete
+ unregisterCompleteLatch.countDown();
+ LOGGER.info("Thread 2: Successfully unregistered all resources");
+ } catch (Exception e) {
+ LOGGER.error("Thread 2: Error unregistering resources", e);
+ // Ensure we don't deadlock in case of failure
+ unregisterCompleteLatch.countDown();
+ }
+ });
+
+ // Wait for unregister to complete
+ unregisterFuture.get(50, TimeUnit.SECONDS);
+
+ // Wait for open to complete
+ openFuture.get(10, TimeUnit.SECONDS);
+
+ // Verify expectations
+ LOGGER.info("Checking final state");
+
+ // Check which resources were recovered
+ List<String> recoveredResources = new ArrayList<>();
+ List<String> nonRecoveredResources = new ArrayList<>();
+
+ for (String resourcePath : registeredResources) {
+ if (isResourcePathRecovered(resourcePath)) {
+ recoveredResources.add(resourcePath);
+ } else {
+ nonRecoveredResources.add(resourcePath);
+ }
+ }
+
+ LOGGER.info("Results: {} resources recovered, {} resources not recovered", recoveredResources.size(),
+ nonRecoveredResources.size());
+
+ // Resource for open should have been opened successfully
+ assertTrue("Resource for open should be recovered", isResourcePathRecovered(resourceForOpen));
+
+ // Resources to unregister should be unregistered
+ for (String resourcePath : resourcesToUnregister) {
+ IIndex indexAfter = concurrentTest.getDatasetLifecycleManager().get(resourcePath);
+ assertNull("Resource should be unregistered: " + resourcePath, indexAfter);
+ }
+
+ // Other resources should either be recovered or not, depending on timing
+ LOGGER.info("Completed testConcurrentUnregisterDuringLazyRecovery");
+ }
+
+ /**
+ * Helper method to find resources that belong to the same dataset and partition.
+ *
+ * @param minCount Minimum number of resources needed
+ * @return List of resource paths from the same dataset and partition
+ */
+ private List<String> findResourcesInSamePartition(int minCount) {
+ // Group resources by dataset ID and partition ID
+ Map<String, List<String>> resourcesByDatasetAndPartition = new HashMap<>();
+
+ LOGGER.info("Looking for at least {} resources in the same partition", minCount);
+
+ for (String resourcePath : concurrentTest.getCreatedResourcePaths()) {
+ LocalResource localResource = concurrentTest.getMockResources().get(resourcePath);
+ if (localResource != null) {
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
+ int datasetId = datasetLocalResource.getDatasetId();
+ int partitionId = datasetLocalResource.getPartition();
+
+ // Create a composite key using datasetId and partitionId
+ String key = datasetId + "_" + partitionId;
+ resourcesByDatasetAndPartition.computeIfAbsent(key, k -> new ArrayList<>()).add(resourcePath);
+ }
+ }
+
+ // Just log summary, not details
+ LOGGER.info("Found {} groups of resources by dataset/partition", resourcesByDatasetAndPartition.size());
+
+ // Find a group with enough resources
+ for (List<String> resources : resourcesByDatasetAndPartition.values()) {
+ if (resources.size() >= minCount) {
+ String firstResource = resources.get(0);
+ LocalResource localResource = concurrentTest.getMockResources().get(firstResource);
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
+
+ LOGGER.info("Found suitable group with {} resources (dataset={}, partition={})", resources.size(),
+ datasetLocalResource.getDatasetId(), datasetLocalResource.getPartition());
+ return resources;
+ }
+ }
+
+ // If no group has enough resources, return the one with the most
+ List<String> bestGroup = resourcesByDatasetAndPartition.values().stream()
+ .max(Comparator.comparingInt(List::size)).orElse(Collections.emptyList());
+
+ if (!bestGroup.isEmpty()) {
+ String firstResource = bestGroup.get(0);
+ LocalResource localResource = concurrentTest.getMockResources().get(firstResource);
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) localResource.getResource();
+
+ LOGGER.info("Using best available group: {} resources (dataset={}, partition={})", bestGroup.size(),
+ datasetLocalResource.getDatasetId(), datasetLocalResource.getPartition());
+ } else {
+ LOGGER.warn("Could not find any resources in the same partition");
+ }
+
+ return bestGroup;
+ }
+
+ /**
+ * Helper method to check if a resource path is contained in the recovered paths.
+ * This handles potential differences between the resource path string and the index.toString() format.
+ */
+ private boolean isResourcePathRecovered(String resourcePath) {
+ // Direct match check
+ if (recoveredResourcePaths.contains(resourcePath)) {
+ return true;
+ }
+
+ // For each recovered path, check if it contains the resource path
+ for (String recoveredPath : recoveredResourcePaths) {
+ if (recoveredPath.equals(resourcePath) ||
+ // Check if the recoveredPath contains the resourcePath (for formatted toString values)
+ recoveredPath.contains(resourcePath)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index cb4e068..320d089 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -138,9 +138,13 @@
@Override
public LocalResource get(String relativePath) throws HyracksDataException {
+ LocalResource resource = getLocalResourceFromCache(relativePath);
+ if (resource != null) {
+ return resource;
+ }
beforeReadAccess();
try {
- LocalResource resource = resourceCache.getIfPresent(relativePath);
+ resource = resourceCache.getIfPresent(relativePath);
if (resource == null) {
FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
resource = readLocalResource(resourceFile);
@@ -154,6 +158,20 @@
}
}
+ private LocalResource getLocalResourceFromCache(String relativePath) {
+ LocalResource resource;
+ beforeWriteAccess();
+ try {
+ resource = resourceCache.getIfPresent(relativePath);
+ if (resource != null) {
+ return resource;
+ }
+ } finally {
+ afterWriteAccess();
+ }
+ return null;
+ }
+
@SuppressWarnings("squid:S1181")
@Override
public void insert(LocalResource resource) throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
index f5edc74..3f87c6d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
@@ -68,7 +68,7 @@
ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory();
localResourceRepository = localResourceRepositoryFactory.createRepository();
resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
- lcManager = new IndexLifecycleManager();
+ lcManager = new IndexLifecycleManager(appCtx, localResourceRepository);
}
public void close() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
index 8a67c71..b03f54f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
@@ -109,6 +109,11 @@
} catch (IOException e) {
throw HyracksDataException.create(e);
}
- lcManager.register(resourceRelPath, index);
+ IIndex registeredIndex = lcManager.registerIfAbsent(resourceRelPath, index);
+ if (registeredIndex != index) {
+ // some other thread has registered the index
+ // indicating this is not the first time, the index is being created
+ throw new HyracksDataException("Index with resource ID " + resourceId + " already exists.");
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index b79c3b1..550ad24 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -20,32 +20,25 @@
package org.apache.hyracks.storage.am.common.dataflow;
import org.apache.hyracks.api.application.INCServiceContext;
-import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.IResource;
import org.apache.hyracks.storage.common.IResourceLifecycleManager;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.storage.common.LocalResource;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+@NotThreadSafe
public class IndexDataflowHelper implements IIndexDataflowHelper {
- private static final Logger LOGGER = LogManager.getLogger();
- private final INCServiceContext ctx;
private final IResourceLifecycleManager<IIndex> lcManager;
private final ILocalResourceRepository localResourceRepository;
private final FileReference resourceRef;
private IIndex index;
- public IndexDataflowHelper(final INCServiceContext ctx, IStorageManager storageMgr, FileReference resourceRef)
- throws HyracksDataException {
- this.ctx = ctx;
+ public IndexDataflowHelper(final INCServiceContext ctx, IStorageManager storageMgr, FileReference resourceRef) {
this.lcManager = storageMgr.getLifecycleManager(ctx);
this.localResourceRepository = storageMgr.getLocalResourceRepository(ctx);
this.resourceRef = resourceRef;
@@ -58,56 +51,18 @@
@Override
public void open() throws HyracksDataException {
- //Get local resource file
- synchronized (lcManager) {
- index = lcManager.get(resourceRef.getRelativePath());
- if (index == null) {
- LocalResource lr = readIndex();
- lcManager.register(lr.getPath(), index);
- }
- lcManager.open(resourceRef.getRelativePath());
- }
- }
-
- private LocalResource readIndex() throws HyracksDataException {
- // Get local resource
- LocalResource lr = getResource();
- if (lr == null) {
- throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourceRef.getRelativePath());
- }
- IResource resource = lr.getResource();
- index = resource.createInstance(ctx);
- return lr;
+ index = lcManager.registerIfAbsent(resourceRef.getRelativePath(), index);
+ lcManager.open(resourceRef.getRelativePath());
}
@Override
public void close() throws HyracksDataException {
- synchronized (lcManager) {
- lcManager.close(resourceRef.getRelativePath());
- }
+ lcManager.close(resourceRef.getRelativePath());
}
@Override
public void destroy() throws HyracksDataException {
- LOGGER.log(Level.INFO, "Dropping index " + resourceRef.getRelativePath() + " on node " + ctx.getNodeId());
- synchronized (lcManager) {
- index = lcManager.get(resourceRef.getRelativePath());
- if (index != null) {
- lcManager.unregister(resourceRef.getRelativePath());
- } else {
- readIndex();
- }
-
- if (getResourceId() != -1) {
- localResourceRepository.delete(resourceRef.getRelativePath());
- }
- index.destroy();
- }
- }
-
- private long getResourceId() throws HyracksDataException {
- LocalResource lr = localResourceRepository.get(resourceRef.getRelativePath());
- return lr == null ? -1 : lr.getId();
+ lcManager.destroy(resourceRef.getRelativePath());
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index ab301a9..dd80bcb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -25,24 +25,33 @@
import java.util.List;
import java.util.Map;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.IResource;
import org.apache.hyracks.storage.common.IResourceLifecycleManager;
+import org.apache.hyracks.storage.common.LocalResource;
public class IndexLifecycleManager implements IResourceLifecycleManager<IIndex>, ILifeCycleComponent {
private static final long DEFAULT_MEMORY_BUDGET = 1024 * 1024 * 100; // 100 megabytes
private final Map<String, IndexInfo> indexInfos;
private final long memoryBudget;
+ private final INCServiceContext appCtx;
+ private final ILocalResourceRepository resourceRepository;
private long memoryUsed;
- public IndexLifecycleManager() {
- this(DEFAULT_MEMORY_BUDGET);
+ public IndexLifecycleManager(INCServiceContext appCtx, ILocalResourceRepository resourceRepository) {
+ this(appCtx, resourceRepository, DEFAULT_MEMORY_BUDGET);
}
- public IndexLifecycleManager(long memoryBudget) {
+ public IndexLifecycleManager(INCServiceContext appCtx, ILocalResourceRepository resourceRepository,
+ long memoryBudget) {
+ this.appCtx = appCtx;
+ this.resourceRepository = resourceRepository;
this.indexInfos = new HashMap<>();
this.memoryBudget = memoryBudget;
this.memoryUsed = 0;
@@ -159,11 +168,34 @@
}
@Override
- public void register(String resourcePath, IIndex index) throws HyracksDataException {
+ public IIndex registerIfAbsent(String resourcePath, IIndex index) throws HyracksDataException {
if (indexInfos.containsKey(resourcePath)) {
- throw new HyracksDataException("Index with resource name " + resourcePath + " already exists.");
+ return indexInfos.get(resourcePath).index;
+ }
+ if (index == null) {
+ index = getOrCreate(resourcePath);
}
indexInfos.put(resourcePath, new IndexInfo(index));
+ return index;
+ }
+
+ public IIndex getOrCreate(String resourcePath) throws HyracksDataException {
+ IIndex index = get(resourcePath);
+ if (index == null) {
+ index = readIndex(resourcePath);
+ registerIfAbsent(resourcePath, index);
+ }
+ return index;
+ }
+
+ private IIndex readIndex(String resourcePath) throws HyracksDataException {
+ // Get local resource
+ LocalResource lr = resourceRepository.get(resourcePath);
+ if (lr == null) {
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
+ }
+ IResource resource = lr.getResource();
+ return resource.createInstance(appCtx);
}
@Override
@@ -209,4 +241,24 @@
}
indexInfos.remove(resourcePath);
}
+
+ @Override
+ public void destroy(String resourcePath) throws HyracksDataException {
+ IIndex index = get(resourcePath);
+ if (index != null) {
+ unregister(resourcePath);
+ } else {
+ readIndex(resourcePath);
+ }
+
+ if (getResourceId(resourcePath) != -1) {
+ resourceRepository.delete(resourcePath);
+ }
+ index.destroy();
+ }
+
+ private long getResourceId(String resourcePath) throws HyracksDataException {
+ LocalResource lr = resourceRepository.get(resourcePath);
+ return lr == null ? -1 : lr.getId();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
index 6200680..1d74379 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
@@ -41,10 +41,8 @@
*
* @param resourceId
* @param resource
- * @throws HyracksDataException
- * if a resource is already registered with this resourceId
*/
- public void register(String resourceId, R resource) throws HyracksDataException;
+ public R registerIfAbsent(String resourceId, R resource) throws HyracksDataException;
/**
* Opens a resource. The resource is moved to the open state
@@ -75,8 +73,15 @@
/**
* unregister a resource removing its resources in memory and on disk
*
- * @param resourceId
+ * @param resourcePath
* @throws HyracksDataException
*/
- public void unregister(String resourceId) throws HyracksDataException;
+ public void unregister(String resourcePath) throws HyracksDataException;
+
+ /**
+ * delete the resource
+ * @param resourcePath
+ * @throws HyracksDataException
+ */
+ public void destroy(String resourcePath) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
index 0b4d2ed..1328bc4 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
@@ -59,7 +59,7 @@
@Override
public IResourceLifecycleManager<IIndex> getLifecycleManager(INCServiceContext ctx) {
- return TestStorageManagerComponentHolder.getIndexLifecycleManager();
+ return TestStorageManagerComponentHolder.getIndexLifecycleManager(ctx);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 7f696b4..1f7c0ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -78,9 +78,9 @@
lcManager = null;
}
- public synchronized static IResourceLifecycleManager<IIndex> getIndexLifecycleManager() {
+ public synchronized static IResourceLifecycleManager<IIndex> getIndexLifecycleManager(INCServiceContext ctx) {
if (lcManager == null) {
- lcManager = new IndexLifecycleManager();
+ lcManager = new IndexLifecycleManager(ctx, getLocalResourceRepository());
}
return lcManager;
}